Implementation of Replayer functionality. More...
#include <ReplayerImpl.h>
Implementation of Replayer functionality.
This class is the implementation of the Replayer. Inheritance is used to limit the applications access to underlying system methods.
Definition at line 61 of file ReplayerImpl.h.
OpenDDS::DCPS::ReplayerImpl::ReplayerImpl | ( | ) |
Definition at line 51 of file ReplayerImpl.cpp.
References DDS::PublicationMatchedStatus::current_count, DDS::PublicationMatchedStatus::current_count_change, DDS::HANDLE_NIL, DDS::OfferedIncompatibleQosStatus::last_policy_id, DDS::PublicationMatchedStatus::last_subscription_handle, offered_incompatible_qos_status_, DDS::OfferedIncompatibleQosStatus::policies, publication_match_status_, DDS::PublicationMatchedStatus::total_count, DDS::OfferedIncompatibleQosStatus::total_count, DDS::PublicationMatchedStatus::total_count_change, and DDS::OfferedIncompatibleQosStatus::total_count_change.
00052 : data_dropped_count_(0), 00053 data_delivered_count_(0), 00054 n_chunks_(TheServiceParticipant->n_chunks()), 00055 association_chunk_multiplier_(TheServiceParticipant->association_chunk_multiplier()), 00056 qos_(TheServiceParticipant->initial_DataWriterQos()), 00057 participant_servant_(0), 00058 topic_id_(GUID_UNKNOWN), 00059 topic_servant_(0), 00060 listener_mask_(DEFAULT_STATUS_MASK), 00061 domain_id_(0), 00062 publisher_servant_(0), 00063 publication_id_(GUID_UNKNOWN), 00064 sequence_number_(SequenceNumber::SEQUENCENUMBER_UNKNOWN()), 00065 // data_container_(0), 00066 // liveliness_lost_(false), 00067 // last_deadline_missed_total_count_(0), 00068 is_bit_(false), 00069 empty_condition_(lock_), 00070 pending_write_count_(0) 00071 { 00072 // liveliness_lost_status_.total_count = 0; 00073 // liveliness_lost_status_.total_count_change = 0; 00074 // 00075 // offered_deadline_missed_status_.total_count = 0; 00076 // offered_deadline_missed_status_.total_count_change = 0; 00077 // offered_deadline_missed_status_.last_instance_handle = DDS::HANDLE_NIL; 00078 00079 offered_incompatible_qos_status_.total_count = 0; 00080 offered_incompatible_qos_status_.total_count_change = 0; 00081 offered_incompatible_qos_status_.last_policy_id = 0; 00082 offered_incompatible_qos_status_.policies.length(0); 00083 00084 publication_match_status_.total_count = 0; 00085 publication_match_status_.total_count_change = 0; 00086 publication_match_status_.current_count = 0; 00087 publication_match_status_.current_count_change = 0; 00088 publication_match_status_.last_subscription_handle = DDS::HANDLE_NIL; 00089 00090 }
OpenDDS::DCPS::ReplayerImpl::~ReplayerImpl | ( | ) |
Definition at line 94 of file ReplayerImpl.cpp.
References DBG_ENTRY_LVL.
00095 { 00096 DBG_ENTRY_LVL("ReplayerImpl","~ReplayerImpl",6); 00097 }
void OpenDDS::DCPS::ReplayerImpl::add_association | ( | const RepoId & | yourId, | |
const ReaderAssociation & | reader, | |||
bool | active | |||
) | [virtual] |
Implements OpenDDS::DCPS::DataWriterCallbacks.
Definition at line 385 of file ReplayerImpl.cpp.
References ACE_TEXT(), assoc_complete_readers_, OpenDDS::DCPS::TransportClient::associate(), association_complete_i(), DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, domain_id_, DDS::DataReaderQos::durability, OpenDDS::DCPS::ReaderAssociation::exprParams, OpenDDS::DCPS::ReaderAssociation::filterExpression, OpenDDS::DCPS::DomainParticipantImpl::get_id(), OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::insert(), is_bit_, LM_DEBUG, LM_ERROR, lock_, OPENDDS_STRING, participant_servant_, pending_readers_, publication_id_, qos_, reader_info_, OpenDDS::DCPS::ReaderAssociation::readerId, OpenDDS::DCPS::ReaderAssociation::readerQos, OpenDDS::DCPS::ReaderAssociation::readerTransInfo, DDS::DataReaderQos::reliability, DDS::RELIABLE_RELIABILITY_QOS, OpenDDS::DCPS::AssociationData::remote_data_, OpenDDS::DCPS::AssociationData::remote_durable_, OpenDDS::DCPS::AssociationData::remote_id_, OpenDDS::DCPS::AssociationData::remote_reliable_, TheServiceParticipant, DDS::DataWriterQos::transport_priority, and DDS::VOLATILE_DURABILITY_QOS.
00388 { 00389 DBG_ENTRY_LVL("ReplayerImpl", "add_association", 6); 00390 00391 if (DCPS_debug_level >= 1) { 00392 GuidConverter writer_converter(yourId); 00393 GuidConverter reader_converter(reader.readerId); 00394 ACE_DEBUG((LM_DEBUG, 00395 ACE_TEXT("(%P|%t) ReplayerImpl::add_association - ") 00396 ACE_TEXT("bit %d local %C remote %C\n"), 00397 is_bit_, 00398 OPENDDS_STRING(writer_converter).c_str(), 00399 OPENDDS_STRING(reader_converter).c_str())); 00400 } 00401 00402 // if (entity_deleted_ == true) { 00403 // if (DCPS_debug_level >= 1) 00404 // ACE_DEBUG((LM_DEBUG, 00405 // ACE_TEXT("(%P|%t) ReplayerImpl::add_association") 00406 // ACE_TEXT(" This is a deleted datawriter, ignoring add.\n"))); 00407 // 00408 // return; 00409 // } 00410 00411 if (GUID_UNKNOWN == publication_id_) { 00412 publication_id_ = yourId; 00413 } 00414 00415 { 00416 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_); 00417 reader_info_.insert(std::make_pair(reader.readerId, 00418 ReaderInfo(TheServiceParticipant->publisher_content_filter() ? reader.filterExpression : "", 00419 reader.exprParams, participant_servant_, 00420 reader.readerQos.durability.kind > DDS::VOLATILE_DURABILITY_QOS))); 00421 } 00422 00423 if (DCPS_debug_level > 4) { 00424 GuidConverter converter(publication_id_); 00425 ACE_DEBUG((LM_DEBUG, 00426 ACE_TEXT("(%P|%t) ReplayerImpl::add_association(): ") 00427 ACE_TEXT("adding subscription to publication %C with priority %d.\n"), 00428 OPENDDS_STRING(converter).c_str(), 00429 qos_.transport_priority.value)); 00430 } 00431 00432 AssociationData data; 00433 data.remote_id_ = reader.readerId; 00434 data.remote_data_ = reader.readerTransInfo; 00435 data.remote_reliable_ = 00436 (reader.readerQos.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS); 00437 data.remote_durable_ = 00438 (reader.readerQos.durability.kind > DDS::VOLATILE_DURABILITY_QOS); 00439 00440 if (!this->associate(data, active)) { 00441 //FUTURE: inform inforepo and try again as passive peer 00442 if (DCPS_debug_level) { 00443 ACE_ERROR((LM_ERROR, 00444 ACE_TEXT("(%P|%t) ReplayerImpl::add_association: ") 00445 ACE_TEXT("ERROR: transport layer failed to associate.\n"))); 00446 } 00447 return; 00448 } 00449 00450 if (active) { 00451 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_); 00452 00453 // Have we already received an association_complete() callback? 00454 if (assoc_complete_readers_.count(reader.readerId)) { 00455 assoc_complete_readers_.erase(reader.readerId); 00456 association_complete_i(reader.readerId); 00457 00458 // Add to pending_readers_ -> pending means we are waiting 00459 // for the association_complete() callback. 00460 } else if (OpenDDS::DCPS::insert(pending_readers_, reader.readerId) == -1) { 00461 GuidConverter converter(reader.readerId); 00462 ACE_ERROR((LM_ERROR, 00463 ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::add_association: ") 00464 ACE_TEXT("failed to mark %C as pending.\n"), 00465 OPENDDS_STRING(converter).c_str())); 00466 00467 } else { 00468 if (DCPS_debug_level > 0) { 00469 GuidConverter converter(reader.readerId); 00470 ACE_DEBUG((LM_DEBUG, 00471 ACE_TEXT("(%P|%t) ReplayerImpl::add_association: ") 00472 ACE_TEXT("marked %C as pending.\n"), 00473 OPENDDS_STRING(converter).c_str())); 00474 } 00475 } 00476 } else { 00477 // In the current implementation, DataWriter is always active, so this 00478 // code will not be applicable. 00479 Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_); 00480 disco->association_complete(this->domain_id_, 00481 this->participant_servant_->get_id(), 00482 this->publication_id_, reader.readerId); 00483 } 00484 }
void OpenDDS::DCPS::ReplayerImpl::association_complete | ( | const RepoId & | remote_id | ) | [virtual] |
Implements OpenDDS::DCPS::DataWriterCallbacks.
Definition at line 506 of file ReplayerImpl.cpp.
References ACE_TEXT(), assoc_complete_readers_, association_complete_i(), DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, is_bit_, LM_DEBUG, lock_, OPENDDS_STRING, pending_readers_, publication_id_, and OpenDDS::DCPS::remove().
00507 { 00508 DBG_ENTRY_LVL("ReplayerImpl", "association_complete", 6); 00509 00510 if (DCPS_debug_level >= 1) { 00511 GuidConverter writer_converter(this->publication_id_); 00512 GuidConverter reader_converter(remote_id); 00513 ACE_DEBUG((LM_DEBUG, 00514 ACE_TEXT("(%P|%t) ReplayerImpl::association_complete - ") 00515 ACE_TEXT("bit %d local %C remote %C\n"), 00516 is_bit_, 00517 OPENDDS_STRING(writer_converter).c_str(), 00518 OPENDDS_STRING(reader_converter).c_str())); 00519 } 00520 00521 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_); 00522 if (OpenDDS::DCPS::remove(pending_readers_, remote_id) == -1) { 00523 // Not found in pending_readers_, defer calling association_complete_i() 00524 // until add_association() resumes and sees this ID in assoc_complete_readers_. 00525 assoc_complete_readers_.insert(remote_id); 00526 } else { 00527 association_complete_i(remote_id); 00528 } 00529 }
void OpenDDS::DCPS::ReplayerImpl::association_complete_i | ( | const RepoId & | remote_id | ) | [private] |
Definition at line 532 of file ReplayerImpl.cpp.
References ACE_TEXT(), OpenDDS::DCPS::bind(), DDS::PublicationMatchedStatus::current_count, DDS::PublicationMatchedStatus::current_count_change, DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::DomainParticipantImpl::id_to_handle(), id_to_handle_map_, OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::insert(), is_bit_, DDS::PublicationMatchedStatus::last_subscription_handle, listener_, LM_DEBUG, LM_ERROR, LM_WARNING, lock_, OPENDDS_STRING, participant_servant_, publication_match_status_, readers_, DDS::PublicationMatchedStatus::total_count, and DDS::PublicationMatchedStatus::total_count_change.
Referenced by add_association(), and association_complete().
00533 { 00534 DBG_ENTRY_LVL("ReplayerImpl", "association_complete_i", 6); 00535 // bool reader_durable = false; 00536 { 00537 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_); 00538 if (OpenDDS::DCPS::insert(readers_, remote_id) == -1) { 00539 GuidConverter converter(remote_id); 00540 ACE_ERROR((LM_ERROR, 00541 ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::association_complete_i: ") 00542 ACE_TEXT("insert %C from pending failed.\n"), 00543 OPENDDS_STRING(converter).c_str())); 00544 } 00545 // RepoIdToReaderInfoMap::const_iterator it = reader_info_.find(remote_id); 00546 // if (it != reader_info_.end()) { 00547 // reader_durable = it->second.durable_; 00548 // } 00549 } 00550 00551 if (!is_bit_) { 00552 00553 DDS::InstanceHandle_t handle = 00554 this->participant_servant_->id_to_handle(remote_id); 00555 00556 { 00557 // protect publication_match_status_ and status changed flags. 00558 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_); 00559 00560 // update the publication_match_status_ 00561 ++publication_match_status_.total_count; 00562 ++publication_match_status_.total_count_change; 00563 ++publication_match_status_.current_count; 00564 ++publication_match_status_.current_count_change; 00565 00566 if (OpenDDS::DCPS::bind(id_to_handle_map_, remote_id, handle) != 0) { 00567 GuidConverter converter(remote_id); 00568 ACE_DEBUG((LM_WARNING, 00569 ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::association_complete_i: ") 00570 ACE_TEXT("id_to_handle_map_%C = 0x%x failed.\n"), 00571 OPENDDS_STRING(converter).c_str(), 00572 handle)); 00573 return; 00574 00575 } else if (DCPS_debug_level > 4) { 00576 GuidConverter converter(remote_id); 00577 ACE_DEBUG((LM_DEBUG, 00578 ACE_TEXT("(%P|%t) ReplayerImpl::association_complete_i: ") 00579 ACE_TEXT("id_to_handle_map_%C = 0x%x.\n"), 00580 OPENDDS_STRING(converter).c_str(), 00581 handle)); 00582 } 00583 00584 publication_match_status_.last_subscription_handle = handle; 00585 00586 } 00587 00588 00589 if (listener_.in()) { 00590 listener_->on_replayer_matched(this, 00591 publication_match_status_); 00592 00593 // TBD - why does the spec say to change this but not 00594 // change the ChangeFlagStatus after a listener call? 00595 publication_match_status_.total_count_change = 0; 00596 publication_match_status_.current_count_change = 0; 00597 } 00598 00599 } 00600 00601 }
bool OpenDDS::DCPS::ReplayerImpl::check_transport_qos | ( | const TransportInst & | inst | ) | [virtual] |
Implements OpenDDS::DCPS::TransportClient.
Definition at line 835 of file ReplayerImpl.cpp.
DDS::ReturnCode_t OpenDDS::DCPS::ReplayerImpl::cleanup | ( | void | ) |
cleanup the DataWriter.
Definition at line 101 of file ReplayerImpl.cpp.
References CORBA::LocalObject::_nil(), ACE_TEXT(), domain_id_, empty_condition_, LM_ERROR, lock_, pending_write_count_, publication_id_, remove_all_associations(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, TheServiceParticipant, topic_objref_, topic_servant_, and ACE_Condition< ACE_Recursive_Thread_Mutex >::wait().
Referenced by OpenDDS::DCPS::Service_Participant::delete_replayer(), and OpenDDS::DCPS::DomainParticipantImpl::handle_exception().
00102 { 00103 00104 // // Unregister all registered instances prior to deletion. 00105 // // DDS::Time_t source_timestamp = time_value_to_time(ACE_OS::gettimeofday()); 00106 // // this->unregister_instances(source_timestamp); 00107 // 00108 // // CORBA::String_var topic_name = this->get_Atopic_name(); 00109 { 00110 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, DDS::RETCODE_ERROR); 00111 00112 // Wait for pending samples to drain prior to removing associations 00113 // and unregistering the publication. 00114 while (this->pending_write_count_) 00115 this->empty_condition_.wait(); 00116 00117 // Call remove association before unregistering the datawriter 00118 // with the transport, otherwise some callbacks resulted from 00119 // remove_association may lost. 00120 this->remove_all_associations(); 00121 00122 // release our Topic_var 00123 topic_objref_ = DDS::Topic::_nil(); 00124 topic_servant_ = 0; 00125 00126 } 00127 00128 // not just unregister but remove any pending writes/sends. 00129 // this->unregister_all(); 00130 00131 Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_); 00132 if (!disco->remove_publication( 00133 this->domain_id_, 00134 this->participant_servant_->get_id(), 00135 this->publication_id_)) { 00136 ACE_ERROR_RETURN((LM_ERROR, 00137 ACE_TEXT("(%P|%t) ERROR: ") 00138 ACE_TEXT("PublisherImpl::delete_datawriter, ") 00139 ACE_TEXT("publication not removed from discovery.\n")), 00140 DDS::RETCODE_ERROR); 00141 } 00142 return DDS::RETCODE_OK; 00143 }
void OpenDDS::DCPS::ReplayerImpl::control_delivered | ( | const Message_Block_Ptr & | sample | ) | [virtual] |
Reimplemented from OpenDDS::DCPS::TransportSendListener.
Definition at line 883 of file ReplayerImpl.cpp.
void OpenDDS::DCPS::ReplayerImpl::control_dropped | ( | const Message_Block_Ptr & | sample, | |
bool | dropped_by_transport | |||
) | [virtual] |
Reimplemented from OpenDDS::DCPS::TransportSendListener.
Definition at line 907 of file ReplayerImpl.cpp.
DDS::ReturnCode_t OpenDDS::DCPS::ReplayerImpl::create_sample_data_message | ( | Message_Block_Ptr | data, | |
DataSampleHeader & | header_data, | |||
Message_Block_Ptr & | message, | |||
const DDS::Time_t & | source_timestamp, | |||
bool | content_filter | |||
) | [private] |
Definition at line 1027 of file ReplayerImpl.cpp.
References OpenDDS::DCPS::TransportClient::cdr_encapsulation(), OpenDDS::DCPS::DataSampleHeader::cdr_encapsulation_, OpenDDS::DCPS::DataSampleHeader::coherent_change_, OpenDDS::DCPS::DataSampleHeader::content_filter_, db_allocator_, DDS::DURATION_INFINITE_NSEC, DDS::DURATION_INFINITE_SEC, OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), header_allocator_, DDS::DataWriterQos::lifespan, OpenDDS::DCPS::DataSampleHeader::lifespan_duration_, OpenDDS::DCPS::DataSampleHeader::lifespan_duration_nanosec_, OpenDDS::DCPS::DataSampleHeader::lifespan_duration_sec_, OpenDDS::DCPS::DataSampleHeader::max_marshaled_size(), ACE_Time_Value::max_time, mb_allocator_, ACE_Message_Block::MB_DATA, OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::DataSampleHeader::message_length_, DDS::Time_t::nanosec, need_sequence_repair(), qos_, OpenDDS::DCPS::unique_ptr< T, Deleter >::release(), OpenDDS::DCPS::unique_ptr< T, Deleter >::reset(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::SAMPLE_DATA, DDS::Time_t::sec, OpenDDS::DCPS::DataSampleHeader::sequence_, sequence_number_, OpenDDS::DCPS::DataSampleHeader::sequence_repair_, OpenDDS::DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN(), OpenDDS::DCPS::DataSampleHeader::source_timestamp_nanosec_, OpenDDS::DCPS::DataSampleHeader::source_timestamp_sec_, and ACE_Time_Value::zero.
Referenced by write().
01032 { 01033 header_data.message_id_ = SAMPLE_DATA; 01034 header_data.coherent_change_ = content_filter; 01035 01036 header_data.content_filter_ = 0; 01037 header_data.cdr_encapsulation_ = this->cdr_encapsulation(); 01038 header_data.message_length_ = static_cast<ACE_UINT32>(data->total_length()); 01039 header_data.sequence_repair_ = need_sequence_repair(); 01040 if (this->sequence_number_ == SequenceNumber::SEQUENCENUMBER_UNKNOWN()) { 01041 this->sequence_number_ = SequenceNumber(); 01042 } else { 01043 ++this->sequence_number_; 01044 } 01045 header_data.sequence_ = this->sequence_number_; 01046 header_data.source_timestamp_sec_ = source_timestamp.sec; 01047 header_data.source_timestamp_nanosec_ = source_timestamp.nanosec; 01048 01049 if (qos_.lifespan.duration.sec != DDS::DURATION_INFINITE_SEC 01050 || qos_.lifespan.duration.nanosec != DDS::DURATION_INFINITE_NSEC) { 01051 header_data.lifespan_duration_ = true; 01052 header_data.lifespan_duration_sec_ = qos_.lifespan.duration.sec; 01053 header_data.lifespan_duration_nanosec_ = qos_.lifespan.duration.nanosec; 01054 } 01055 01056 // header_data.publication_id_ = publication_id_; 01057 // header_data.publisher_id_ = this->publisher_servant_->publisher_id_; 01058 size_t max_marshaled_size = header_data.max_marshaled_size(); 01059 ACE_Message_Block* tmp; 01060 ACE_NEW_MALLOC_RETURN(tmp, 01061 static_cast<ACE_Message_Block*>( 01062 mb_allocator_->malloc(sizeof(ACE_Message_Block))), 01063 ACE_Message_Block(max_marshaled_size, 01064 ACE_Message_Block::MB_DATA, 01065 data.release(), //cont 01066 0, //data 01067 header_allocator_.get(), //alloc_strategy 01068 0, //locking_strategy 01069 ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY, 01070 ACE_Time_Value::zero, 01071 ACE_Time_Value::max_time, 01072 db_allocator_.get(), 01073 mb_allocator_.get()), 01074 DDS::RETCODE_ERROR); 01075 message.reset(tmp); 01076 *message << header_data; 01077 return DDS::RETCODE_OK; 01078 }
void OpenDDS::DCPS::ReplayerImpl::data_delivered | ( | const DataSampleElement * | sample | ) | [virtual] |
Reimplemented from OpenDDS::DCPS::TransportSendListener.
Definition at line 855 of file ReplayerImpl.cpp.
References ACE_TEXT(), ACE_Condition< ACE_Recursive_Thread_Mutex >::broadcast(), data_delivered_count_, DBG_ENTRY_LVL, empty_condition_, OpenDDS::DCPS::DataSampleElement::get_pub_id(), LM_ERROR, lock_, OPENDDS_STRING, pending_write_count_, publication_id_, and sample_list_element_allocator_.
00856 { 00857 DBG_ENTRY_LVL("ReplayerImpl","data_delivered",6); 00858 if (!(sample->get_pub_id() == this->publication_id_)) { 00859 GuidConverter sample_converter(sample->get_pub_id()); 00860 GuidConverter writer_converter(publication_id_); 00861 ACE_ERROR((LM_ERROR, 00862 ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::data_delivered: ") 00863 ACE_TEXT(" The publication id %C from delivered element ") 00864 ACE_TEXT("does not match the datawriter's id %C\n"), 00865 OPENDDS_STRING(sample_converter).c_str(), 00866 OPENDDS_STRING(writer_converter).c_str())); 00867 return; 00868 } 00869 DataSampleElement* elem = const_cast<DataSampleElement*>(sample); 00870 // this->data_container_->data_delivered(sample); 00871 ACE_DES_FREE(elem, sample_list_element_allocator_->free, DataSampleElement); 00872 ++data_delivered_count_; 00873 00874 { 00875 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_); 00876 if ((--pending_write_count_) == 0) { 00877 empty_condition_.broadcast(); 00878 } 00879 } 00880 }
void OpenDDS::DCPS::ReplayerImpl::data_dropped | ( | const DataSampleElement * | sample, | |
bool | dropped_by_transport | |||
) | [virtual] |
Reimplemented from OpenDDS::DCPS::TransportSendListener.
Definition at line 889 of file ReplayerImpl.cpp.
References ACE_Condition< ACE_Recursive_Thread_Mutex >::broadcast(), data_dropped_count_, DBG_ENTRY_LVL, empty_condition_, lock_, pending_write_count_, and sample_list_element_allocator_.
00891 { 00892 DBG_ENTRY_LVL("ReplayerImpl","data_dropped",6); 00893 // this->data_container_->data_dropped(element, dropped_by_transport); 00894 ACE_UNUSED_ARG(dropped_by_transport); 00895 DataSampleElement* elem = const_cast<DataSampleElement*>(sample); 00896 ACE_DES_FREE(elem, sample_list_element_allocator_->free, DataSampleElement); 00897 ++data_dropped_count_; 00898 { 00899 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_); 00900 if ((--pending_write_count_) == 0) { 00901 empty_condition_.broadcast(); 00902 } 00903 } 00904 }
DDS::DomainId_t OpenDDS::DCPS::ReplayerImpl::domain_id | ( | ) | const [inline, virtual] |
Implements OpenDDS::DCPS::TransportClient.
Definition at line 108 of file ReplayerImpl.h.
00108 { return this->domain_id_; }
DDS::ReturnCode_t OpenDDS::DCPS::ReplayerImpl::enable | ( | ) |
Implements DDS::Entity.
Definition at line 295 of file ReplayerImpl.cpp.
References ACE_TEXT(), association_chunk_multiplier_, OpenDDS::DCPS::TransportClient::connection_info(), db_allocator_, OpenDDS::DCPS::DCPS_debug_level, domain_id_, DDS::DataWriterQos::durability, OpenDDS::DCPS::TransportClient::enable_transport(), OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), OpenDDS::DCPS::DomainParticipantImpl::get_id(), OpenDDS::DCPS::GUID_UNKNOWN, header_allocator_, OpenDDS::DCPS::EntityImpl::is_enabled(), DDS::LENGTH_UNLIMITED, LM_DEBUG, LM_ERROR, mb_allocator_, n_chunks_, participant_servant_, publication_id_, publisher_qos_, qos_, DDS::DataWriterQos::reliability, DDS::RELIABLE_RELIABILITY_QOS, OpenDDS::DCPS::unique_ptr< T, Deleter >::reset(), DDS::DataWriterQos::resource_limits, DDS::RETCODE_ERROR, DDS::RETCODE_OK, sample_list_element_allocator_, OpenDDS::DCPS::EntityImpl::set_enabled(), TheServiceParticipant, topic_servant_, and DDS::VOLATILE_DURABILITY_QOS.
Referenced by OpenDDS::DCPS::DomainParticipantImpl::create_replayer().
00296 { 00297 //According spec: 00298 // - Calling enable on an already enabled Entity returns OK and has no 00299 // effect. 00300 // - Calling enable on an Entity whose factory is not enabled will fail 00301 // and return PRECONDITION_NOT_MET. 00302 00303 if (this->is_enabled()) { 00304 return DDS::RETCODE_OK; 00305 } 00306 00307 // if (this->publisher_servant_->is_enabled() == false) { 00308 // return DDS::RETCODE_PRECONDITION_NOT_MET; 00309 // } 00310 // 00311 const bool reliable = qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS; 00312 00313 if (qos_.resource_limits.max_samples != DDS::LENGTH_UNLIMITED) { 00314 n_chunks_ = qos_.resource_limits.max_samples; 00315 } 00316 // +1 because we might allocate one before releasing another 00317 // TBD - see if this +1 can be removed. 00318 mb_allocator_.reset(new MessageBlockAllocator(n_chunks_ * association_chunk_multiplier_)); 00319 db_allocator_.reset(new DataBlockAllocator(n_chunks_+1)); 00320 header_allocator_.reset(new DataSampleHeaderAllocator(n_chunks_+1)); 00321 00322 sample_list_element_allocator_.reset(new DataSampleElementAllocator(2 * n_chunks_)); 00323 00324 00325 if (DCPS_debug_level >= 2) { 00326 ACE_DEBUG((LM_DEBUG, 00327 "(%P|%t) ReplayerImpl::enable-mb" 00328 " Cached_Allocator_With_Overflow %x with %d chunks\n", 00329 mb_allocator_.get(), 00330 n_chunks_)); 00331 00332 ACE_DEBUG((LM_DEBUG, 00333 "(%P|%t) ReplayerImpl::enable-db" 00334 " Cached_Allocator_With_Overflow %x with %d chunks\n", 00335 db_allocator_.get(), 00336 n_chunks_)); 00337 00338 ACE_DEBUG((LM_DEBUG, 00339 "(%P|%t) ReplayerImpl::enable-header" 00340 " Cached_Allocator_With_Overflow %x with %d chunks\n", 00341 header_allocator_.get(), 00342 n_chunks_)); 00343 } 00344 00345 this->set_enabled(); 00346 00347 try { 00348 this->enable_transport(reliable, 00349 this->qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS); 00350 00351 } catch (const Transport::Exception&) { 00352 ACE_ERROR((LM_ERROR, 00353 ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::enable, ") 00354 ACE_TEXT("Transport Exception.\n"))); 00355 return DDS::RETCODE_ERROR; 00356 00357 } 00358 00359 const TransportLocatorSeq& trans_conf_info = connection_info(); 00360 00361 00362 Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_); 00363 this->publication_id_ = 00364 disco->add_publication(this->domain_id_, 00365 this->participant_servant_->get_id(), 00366 this->topic_servant_->get_id(), 00367 this, 00368 this->qos_, 00369 trans_conf_info, 00370 this->publisher_qos_); 00371 00372 if (this->publication_id_ == GUID_UNKNOWN) { 00373 ACE_ERROR((LM_ERROR, 00374 ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::enable, ") 00375 ACE_TEXT("add_publication returned invalid id. \n"))); 00376 return DDS::RETCODE_ERROR; 00377 } 00378 00379 return DDS::RETCODE_OK; 00380 }
DDS::InstanceHandle_t OpenDDS::DCPS::ReplayerImpl::get_instance_handle | ( | ) | [virtual] |
Implements OpenDDS::DCPS::EntityImpl.
Definition at line 1121 of file ReplayerImpl.cpp.
References OpenDDS::DCPS::DomainParticipantImpl::id_to_handle(), participant_servant_, and publication_id_.
01122 { 01123 return this->participant_servant_->id_to_handle(publication_id_); 01124 }
ReplayerListener_rch OpenDDS::DCPS::ReplayerImpl::get_listener | ( | ) | [virtual] |
Get the listener for this Replayer.
Implements OpenDDS::DCPS::Replayer.
Definition at line 289 of file ReplayerImpl.cpp.
References listener_.
00290 { 00291 return listener_; 00292 }
CORBA::Long OpenDDS::DCPS::ReplayerImpl::get_priority_value | ( | const AssociationData & | data | ) | const [virtual] |
Implements OpenDDS::DCPS::TransportClient.
Definition at line 849 of file ReplayerImpl.cpp.
References qos_, and DDS::DataWriterQos::transport_priority.
00850 { 00851 return this->qos_.transport_priority.value; 00852 }
DDS::ReturnCode_t OpenDDS::DCPS::ReplayerImpl::get_qos | ( | DDS::PublisherQos & | publisher_qos, | |
DDS::DataWriterQos & | datawriter_qos | |||
) | [virtual] |
Get the Quality of Service settings for the Replayer.
Implements OpenDDS::DCPS::Replayer.
Definition at line 272 of file ReplayerImpl.cpp.
References publisher_qos_, qos_, and DDS::RETCODE_OK.
00274 { 00275 qos = qos_; 00276 publisher_qos = publisher_qos_; 00277 return DDS::RETCODE_OK; 00278 }
const RepoId & OpenDDS::DCPS::ReplayerImpl::get_repo_id | ( | ) | const [virtual] |
Implements OpenDDS::DCPS::TransportClient.
Definition at line 843 of file ReplayerImpl.cpp.
References publication_id_.
00844 { 00845 return this->publication_id_; 00846 }
void OpenDDS::DCPS::ReplayerImpl::inconsistent_topic | ( | ) | [virtual] |
Implements OpenDDS::DCPS::DataWriterCallbacks.
Definition at line 829 of file ReplayerImpl.cpp.
References topic_servant_.
00830 { 00831 topic_servant_->inconsistent_topic(); 00832 }
void OpenDDS::DCPS::ReplayerImpl::init | ( | DDS::Topic_ptr | topic, | |
TopicImpl * | topic_servant, | |||
const DDS::DataWriterQos & | qos, | |||
ReplayerListener_rch | a_listener, | |||
const DDS::StatusMask & | mask, | |||
OpenDDS::DCPS::DomainParticipantImpl * | participant_servant, | |||
const DDS::PublisherQos & | publisher_qos | |||
) | [virtual] |
Initialize the data members.
Definition at line 146 of file ReplayerImpl.cpp.
References OpenDDS::DCPS::Replayer::_duplicate(), DBG_ENTRY_LVL, domain_id_, OpenDDS::DCPS::DomainParticipantImpl::get_domain_id(), is_bit_, listener_, listener_mask_, participant_servant_, publisher_qos_, qos_, topic_id_, topic_name_, topic_objref_, topic_servant_, OpenDDS::DCPS::topicIsBIT(), and type_name_.
Referenced by OpenDDS::DCPS::DomainParticipantImpl::create_replayer().
00154 { 00155 DBG_ENTRY_LVL("ReplayerImpl","init",6); 00156 topic_objref_ = DDS::Topic::_duplicate(topic); 00157 topic_servant_ = topic_servant; 00158 topic_name_ = topic_servant_->get_name(); 00159 topic_id_ = topic_servant_->get_id(); 00160 type_name_ = topic_servant_->get_type_name(); 00161 00162 #if !defined (DDS_HAS_MINIMUM_BIT) 00163 is_bit_ = topicIsBIT(topic_name_.in(), type_name_.in()); 00164 #endif // !defined (DDS_HAS_MINIMUM_BIT) 00165 00166 qos_ = qos; 00167 00168 //Note: OK to _duplicate(nil). 00169 listener_ = a_listener; 00170 listener_mask_ = mask; 00171 00172 // Only store the participant pointer, since it is our "grand" 00173 // parent, we will exist as long as it does. 00174 participant_servant_ = participant_servant; 00175 domain_id_ = participant_servant_->get_domain_id(); 00176 00177 publisher_qos_ = publisher_qos; 00178 }
void OpenDDS::DCPS::ReplayerImpl::lookup_instance_handles | ( | const ReaderIdSeq & | ids, | |
DDS::InstanceHandleSeq & | hdls | |||
) | [private] |
Lookup the instance handles by the subscription repo ids.
Definition at line 1081 of file ReplayerImpl.cpp.
References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::DomainParticipantImpl::id_to_handle(), LM_DEBUG, OPENDDS_STRING, and participant_servant_.
Referenced by remove_associations().
01083 { 01084 CORBA::ULong const num_rds = ids.length(); 01085 01086 if (DCPS_debug_level > 9) { 01087 OPENDDS_STRING separator; 01088 OPENDDS_STRING buffer; 01089 01090 for (CORBA::ULong i = 0; i < num_rds; ++i) { 01091 buffer += separator + OPENDDS_STRING(GuidConverter(ids[i])); 01092 separator = ", "; 01093 } 01094 01095 ACE_DEBUG((LM_DEBUG, 01096 ACE_TEXT("(%P|%t) ReplayerImpl::lookup_instance_handles: ") 01097 ACE_TEXT("searching for handles for reader Ids: %C.\n"), 01098 buffer.c_str())); 01099 } 01100 01101 hdls.length(num_rds); 01102 01103 for (CORBA::ULong i = 0; i < num_rds; ++i) { 01104 hdls[i] = this->participant_servant_->id_to_handle(ids[i]); 01105 } 01106 }
bool OpenDDS::DCPS::ReplayerImpl::need_sequence_repair | ( | ) | const [private] |
Definition at line 1109 of file ReplayerImpl.cpp.
References reader_info_, and sequence_number_.
Referenced by create_sample_data_message().
01110 { 01111 for (RepoIdToReaderInfoMap::const_iterator it = reader_info_.begin(), 01112 end = reader_info_.end(); it != end; ++it) { 01113 if (it->second.expected_sequence_ != sequence_number_) { 01114 return true; 01115 } 01116 } 01117 return false; 01118 }
void OpenDDS::DCPS::ReplayerImpl::notify_publication_disconnected | ( | const ReaderIdSeq & | subids | ) | [virtual] |
Implements OpenDDS::DCPS::TransportSendListener.
Definition at line 914 of file ReplayerImpl.cpp.
void OpenDDS::DCPS::ReplayerImpl::notify_publication_lost | ( | const DDS::InstanceHandleSeq & | handles | ) | [private] |
Definition at line 932 of file ReplayerImpl.cpp.
void OpenDDS::DCPS::ReplayerImpl::notify_publication_lost | ( | const ReaderIdSeq & | subids | ) | [virtual] |
Implements OpenDDS::DCPS::TransportSendListener.
Definition at line 926 of file ReplayerImpl.cpp.
Referenced by remove_associations().
void OpenDDS::DCPS::ReplayerImpl::notify_publication_reconnected | ( | const ReaderIdSeq & | subids | ) | [virtual] |
Implements OpenDDS::DCPS::TransportSendListener.
Definition at line 920 of file ReplayerImpl.cpp.
typedef OpenDDS::DCPS::ReplayerImpl::OPENDDS_MAP_CMP | ( | RepoId | , | |
SequenceNumber | , | |||
GUID_tKeyLessThan | ||||
) | [private] |
typedef OpenDDS::DCPS::ReplayerImpl::OPENDDS_MAP_CMP | ( | RepoId | , | |
DDS::InstanceHandle_t | , | |||
GUID_tKeyLessThan | ||||
) | [private] |
typedef OpenDDS::DCPS::ReplayerImpl::OPENDDS_MAP_CMP | ( | RepoId | , | |
ReaderInfo | , | |||
GUID_tKeyLessThan | ||||
) | [private] |
DomainParticipantImpl* OpenDDS::DCPS::ReplayerImpl::participant | ( | ) | [inline] |
Definition at line 162 of file ReplayerImpl.h.
Referenced by OpenDDS::DCPS::Service_Participant::delete_replayer().
00162 { 00163 return participant_servant_; 00164 }
void OpenDDS::DCPS::ReplayerImpl::register_for_reader | ( | const RepoId & | participant, | |
const RepoId & | writerid, | |||
const RepoId & | readerid, | |||
const TransportLocatorSeq & | locators, | |||
DiscoveryListener * | listener | |||
) | [virtual] |
Reimplemented from OpenDDS::DCPS::DataWriterCallbacks.
Definition at line 787 of file ReplayerImpl.cpp.
00792 { 00793 TransportClient::register_for_reader(participant, writerid, readerid, locators, listener); 00794 }
void OpenDDS::DCPS::ReplayerImpl::remove_all_associations | ( | ) |
Definition at line 742 of file ReplayerImpl.cpp.
References ACE_TEXT(), LM_WARNING, lock_, pending_readers_, readers_, remove_associations(), size, and OpenDDS::DCPS::TransportClient::stop_associating().
Referenced by cleanup().
00743 { 00744 this->stop_associating(); 00745 00746 OpenDDS::DCPS::ReaderIdSeq readers; 00747 CORBA::ULong size; 00748 CORBA::ULong num_pending_readers; 00749 { 00750 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, lock_); 00751 00752 num_pending_readers = static_cast<CORBA::ULong>(pending_readers_.size()); 00753 size = static_cast<CORBA::ULong>(readers_.size()) + num_pending_readers; 00754 readers.length(size); 00755 00756 RepoIdSet::iterator itEnd = readers_.end(); 00757 int i = 0; 00758 00759 for (RepoIdSet::iterator it = readers_.begin(); it != itEnd; ++it) { 00760 readers[i++] = *it; 00761 } 00762 00763 itEnd = pending_readers_.end(); 00764 for (RepoIdSet::iterator it = pending_readers_.begin(); it != itEnd; ++it) { 00765 readers[i++] = *it; 00766 } 00767 00768 if (num_pending_readers > 0) { 00769 ACE_DEBUG((LM_WARNING, 00770 ACE_TEXT("(%P|%t) WARNING: ReplayerImpl::remove_all_associations() - ") 00771 ACE_TEXT("%d subscribers were pending and never fully associated.\n"), 00772 num_pending_readers)); 00773 } 00774 } 00775 00776 try { 00777 if (0 < size) { 00778 CORBA::Boolean dont_notify_lost = false; 00779 this->remove_associations(readers, dont_notify_lost); 00780 } 00781 00782 } catch (const CORBA::Exception&) { 00783 } 00784 }
void OpenDDS::DCPS::ReplayerImpl::remove_associations | ( | const ReaderIdSeq & | readers, | |
CORBA::Boolean | callback | |||
) | [virtual] |
Section 7.1.4.1: total_count will not decrement.
: Reconcile this with the verbiage in section 7.1.4.1 TODO: Should rds_len really be fully_associated_len here??
Implements OpenDDS::DCPS::DataWriterCallbacks.
Definition at line 604 of file ReplayerImpl.cpp.
References ACE_TEXT(), DDS::PublicationMatchedStatus::current_count, DDS::PublicationMatchedStatus::current_count_change, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::TransportClient::disassociate(), id_to_handle_map_, idToSequence_, OpenDDS::DCPS::RcHandle< T >::in(), is_bit_, DDS::PublicationMatchedStatus::last_subscription_handle, len, listener_, LM_DEBUG, LM_WARNING, lock_, lookup_instance_handles(), notify_publication_lost(), OPENDDS_STRING, pending_readers_, publication_id_, publication_match_status_, reader_info_, readers_, OpenDDS::DCPS::remove(), OpenDDS::DCPS::TransportClient::stop_associating(), and DDS::PublicationMatchedStatus::total_count_change.
Referenced by remove_all_associations().
00606 { 00607 if (DCPS_debug_level >= 1) { 00608 GuidConverter writer_converter(publication_id_); 00609 GuidConverter reader_converter(readers[0]); 00610 ACE_DEBUG((LM_DEBUG, 00611 ACE_TEXT("(%P|%t) ReplayerImpl::remove_associations: ") 00612 ACE_TEXT("bit %d local %C remote %C num remotes %d\n"), 00613 is_bit_, 00614 OPENDDS_STRING(writer_converter).c_str(), 00615 OPENDDS_STRING(reader_converter).c_str(), 00616 readers.length())); 00617 } 00618 00619 this->stop_associating(readers.get_buffer(), readers.length()); 00620 00621 ReaderIdSeq fully_associated_readers; 00622 CORBA::ULong fully_associated_len = 0; 00623 ReaderIdSeq rds; 00624 CORBA::ULong rds_len = 0; 00625 DDS::InstanceHandleSeq handles; 00626 00627 { 00628 // Ensure the same acquisition order as in wait_for_acknowledgments(). 00629 // ACE_GUARD(ACE_SYNCH_MUTEX, wfaGuard, this->wfaLock_); 00630 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_); 00631 00632 //Remove the readers from fully associated reader list. 00633 //If the supplied reader is not in the cached reader list then it is 00634 //already removed. We just need remove the readers in the list that have 00635 //not been removed. 00636 00637 CORBA::ULong len = readers.length(); 00638 00639 for (CORBA::ULong i = 0; i < len; ++i) { 00640 //Remove the readers from fully associated reader list. If it's not 00641 //in there, the association_complete() is not called yet and remove it 00642 //from pending list. 00643 00644 if (OpenDDS::DCPS::remove(readers_, readers[i]) == 0) { 00645 ++fully_associated_len; 00646 fully_associated_readers.length(fully_associated_len); 00647 fully_associated_readers [fully_associated_len - 1] = readers[i]; 00648 00649 // Remove this reader from the ACK sequence map if its there. 00650 // This is where we need to be holding the wfaLock_ obtained 00651 // above. 00652 RepoIdToSequenceMap::iterator where 00653 = this->idToSequence_.find(readers[i]); 00654 00655 if (where != this->idToSequence_.end()) { 00656 this->idToSequence_.erase(where); 00657 00658 // It is possible that this subscription was causing the wait 00659 // to continue, so give the opportunity to find out. 00660 // this->wfaCondition_.broadcast(); 00661 } 00662 00663 ++rds_len; 00664 rds.length(rds_len); 00665 rds [rds_len - 1] = readers[i]; 00666 00667 } else if (OpenDDS::DCPS::remove(pending_readers_, readers[i]) == 0) { 00668 ++rds_len; 00669 rds.length(rds_len); 00670 rds [rds_len - 1] = readers[i]; 00671 00672 GuidConverter converter(readers[i]); 00673 ACE_DEBUG((LM_WARNING, 00674 ACE_TEXT("(%P|%t) WARNING: ReplayerImpl::remove_associations: ") 00675 ACE_TEXT("removing reader %C before association_complete() call.\n"), 00676 OPENDDS_STRING(converter).c_str())); 00677 } 00678 reader_info_.erase(readers[i]); 00679 //else reader is already removed which indicates remove_association() 00680 //is called multiple times. 00681 } 00682 00683 if (fully_associated_len > 0 && !is_bit_) { 00684 // The reader should be in the id_to_handle map at this time 00685 this->lookup_instance_handles(fully_associated_readers, handles); 00686 00687 for (CORBA::ULong i = 0; i < fully_associated_len; ++i) { 00688 id_to_handle_map_.erase(fully_associated_readers[i]); 00689 } 00690 } 00691 00692 // wfaGuard.release(); 00693 00694 // Mirror the PUBLICATION_MATCHED_STATUS processing from 00695 // association_complete() here. 00696 if (!this->is_bit_) { 00697 00698 // Derive the change in the number of subscriptions reading this writer. 00699 int matchedSubscriptions = 00700 static_cast<int>(this->id_to_handle_map_.size()); 00701 this->publication_match_status_.current_count_change = 00702 matchedSubscriptions - this->publication_match_status_.current_count; 00703 00704 // Only process status if the number of subscriptions has changed. 00705 if (this->publication_match_status_.current_count_change != 0) { 00706 this->publication_match_status_.current_count = matchedSubscriptions; 00707 00708 /// Section 7.1.4.1: total_count will not decrement. 00709 00710 /// @TODO: Reconcile this with the verbiage in section 7.1.4.1 00711 /// TODO: Should rds_len really be fully_associated_len here?? 00712 this->publication_match_status_.last_subscription_handle = 00713 handles[rds_len - 1]; 00714 00715 00716 if (listener_.in()) { 00717 listener_->on_replayer_matched( 00718 this, 00719 this->publication_match_status_); 00720 00721 // Listener consumes the change. 00722 this->publication_match_status_.total_count_change = 0; 00723 this->publication_match_status_.current_count_change = 0; 00724 } 00725 00726 } 00727 } 00728 } 00729 00730 for (CORBA::ULong i = 0; i < rds.length(); ++i) { 00731 this->disassociate(rds[i]); 00732 } 00733 00734 // If this remove_association is invoked when the InfoRepo 00735 // detects a lost reader then make a callback to notify 00736 // subscription lost. 00737 if (notify_lost && handles.length() > 0) { 00738 this->notify_publication_lost(handles); 00739 } 00740 }
void OpenDDS::DCPS::ReplayerImpl::retrieve_inline_qos_data | ( | TransportSendListener::InlineQosData & | qos_data | ) | const [virtual] |
Reimplemented from OpenDDS::DCPS::TransportSendListener.
Definition at line 939 of file ReplayerImpl.cpp.
References OpenDDS::DCPS::TransportSendListener::InlineQosData::dw_qos, OpenDDS::DCPS::TransportSendListener::InlineQosData::pub_qos, publisher_qos_, qos_, OpenDDS::DCPS::TransportSendListener::InlineQosData::topic_name, and topic_name_.
00940 { 00941 qos_data.pub_qos = this->publisher_qos_; 00942 qos_data.dw_qos = this->qos_; 00943 qos_data.topic_name = this->topic_name_.in(); 00944 }
DDS::ReturnCode_t OpenDDS::DCPS::ReplayerImpl::set_listener | ( | const ReplayerListener_rch & | a_listener, | |
DDS::StatusMask | mask | |||
) | [virtual] |
Change the listener for this Replayer.
Implements OpenDDS::DCPS::Replayer.
Definition at line 281 of file ReplayerImpl.cpp.
References listener_, listener_mask_, and DDS::RETCODE_OK.
00283 { 00284 listener_ = a_listener; 00285 listener_mask_ = mask; 00286 return DDS::RETCODE_OK; 00287 }
DDS::ReturnCode_t OpenDDS::DCPS::ReplayerImpl::set_qos | ( | const DDS::PublisherQos & | publisher_qos, | |
const DDS::DataWriterQos & | datawriter_qos | |||
) | [virtual] |
Set the Quality of Service settings for the Replayer.
Implements OpenDDS::DCPS::Replayer.
Definition at line 181 of file ReplayerImpl.cpp.
References ACE_TEXT(), OpenDDS::DCPS::Qos_Helper::changeable(), OpenDDS::DCPS::Qos_Helper::consistent(), domain_id_, OpenDDS::DCPS::EntityImpl::enabled_, OpenDDS::DCPS::DomainParticipantImpl::get_domain_id(), OpenDDS::DCPS::DomainParticipantImpl::get_id(), LM_ERROR, OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK, OPENDDS_NO_DURABILITY_SERVICE_COMPATIBILITY_CHECK, OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK, OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK, OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK, OPENDDS_NO_OWNERSHIP_STRENGTH_COMPATIBILITY_CHECK, participant_servant_, publication_id_, publisher_qos_, qos_, DDS::RETCODE_ERROR, DDS::RETCODE_IMMUTABLE_POLICY, DDS::RETCODE_INCONSISTENT_POLICY, DDS::RETCODE_OK, DDS::RETCODE_UNSUPPORTED, status, TheServiceParticipant, and OpenDDS::DCPS::Qos_Helper::valid().
00183 { 00184 00185 OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK(publisher_qos, DDS::RETCODE_UNSUPPORTED); 00186 00187 if (Qos_Helper::valid(publisher_qos) && Qos_Helper::consistent(publisher_qos)) { 00188 if (publisher_qos_ == publisher_qos) 00189 return DDS::RETCODE_OK; 00190 00191 // for the not changeable qos, it can be changed before enable 00192 if (!Qos_Helper::changeable(publisher_qos_, publisher_qos) && enabled_ == true) { 00193 return DDS::RETCODE_IMMUTABLE_POLICY; 00194 00195 } else { 00196 publisher_qos_ = publisher_qos; 00197 } 00198 } else { 00199 return DDS::RETCODE_INCONSISTENT_POLICY; 00200 } 00201 00202 OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED); 00203 OPENDDS_NO_OWNERSHIP_STRENGTH_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED); 00204 OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED); 00205 OPENDDS_NO_DURABILITY_SERVICE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED); 00206 OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED); 00207 00208 if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) { 00209 if (qos_ == qos) 00210 return DDS::RETCODE_OK; 00211 00212 if (!Qos_Helper::changeable(qos_, qos) && enabled_ == true) { 00213 return DDS::RETCODE_IMMUTABLE_POLICY; 00214 00215 } else { 00216 Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_); 00217 // DDS::PublisherQos publisherQos; 00218 // this->publisher_servant_->get_qos(publisherQos); 00219 DDS::PublisherQos publisherQos = this->publisher_qos_; 00220 const bool status 00221 = disco->update_publication_qos(this->participant_servant_->get_domain_id(), 00222 this->participant_servant_->get_id(), 00223 this->publication_id_, 00224 qos, 00225 publisherQos); 00226 00227 if (!status) { 00228 ACE_ERROR_RETURN((LM_ERROR, 00229 ACE_TEXT("(%P|%t) DataWriterImpl::set_qos, ") 00230 ACE_TEXT("qos not updated. \n")), 00231 DDS::RETCODE_ERROR); 00232 } 00233 } 00234 00235 if (!(qos_ == qos)) { 00236 // Reset the deadline timer if the period has changed. 00237 // if (qos_.deadline.period.sec != qos.deadline.period.sec 00238 // || qos_.deadline.period.nanosec != qos.deadline.period.nanosec) { 00239 // if (qos_.deadline.period.sec == DDS::DURATION_INFINITE_SEC 00240 // && qos_.deadline.period.nanosec == DDS::DURATION_INFINITE_NSEC) { 00241 // ACE_auto_ptr_reset(this->watchdog_, 00242 // new OfferedDeadlineWatchdog( 00243 // this->reactor_, 00244 // this->lock_, 00245 // qos.deadline, 00246 // this, 00247 // this, 00248 // this->offered_deadline_missed_status_, 00249 // this->last_deadline_missed_total_count_)); 00250 // 00251 // } else if (qos.deadline.period.sec == DDS::DURATION_INFINITE_SEC 00252 // && qos.deadline.period.nanosec == DDS::DURATION_INFINITE_NSEC) { 00253 // this->watchdog_->cancel_all(); 00254 // this->watchdog_.reset(); 00255 // 00256 // } else { 00257 // this->watchdog_->reset_interval( 00258 // duration_to_time_value(qos.deadline.period)); 00259 // } 00260 // } 00261 00262 qos_ = qos; 00263 } 00264 00265 return DDS::RETCODE_OK; 00266 00267 } else { 00268 return DDS::RETCODE_INCONSISTENT_POLICY; 00269 } 00270 }
void OpenDDS::DCPS::ReplayerImpl::unregister_for_reader | ( | const RepoId & | participant, | |
const RepoId & | writerid, | |||
const RepoId & | readerid | |||
) | [virtual] |
Reimplemented from OpenDDS::DCPS::DataWriterCallbacks.
Definition at line 797 of file ReplayerImpl.cpp.
00800 { 00801 TransportClient::unregister_for_reader(participant, writerid, readerid); 00802 }
void OpenDDS::DCPS::ReplayerImpl::update_incompatible_qos | ( | const IncompatibleQosStatus & | status | ) | [virtual] |
Implements OpenDDS::DCPS::DataWriterCallbacks.
Definition at line 805 of file ReplayerImpl.cpp.
References OpenDDS::DCPS::IncompatibleQosStatus::count_since_last_send, OpenDDS::DCPS::IncompatibleQosStatus::last_policy_id, DDS::OfferedIncompatibleQosStatus::last_policy_id, lock_, offered_incompatible_qos_status_, OpenDDS::DCPS::IncompatibleQosStatus::policies, DDS::OfferedIncompatibleQosStatus::policies, OpenDDS::DCPS::IncompatibleQosStatus::total_count, DDS::OfferedIncompatibleQosStatus::total_count, and DDS::OfferedIncompatibleQosStatus::total_count_change.
00806 { 00807 00808 00809 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_); 00810 00811 // copy status and increment change 00812 offered_incompatible_qos_status_.total_count = status.total_count; 00813 offered_incompatible_qos_status_.total_count_change += 00814 status.count_since_last_send; 00815 offered_incompatible_qos_status_.last_policy_id = status.last_policy_id; 00816 offered_incompatible_qos_status_.policies = status.policies; 00817 00818 }
void OpenDDS::DCPS::ReplayerImpl::update_subscription_params | ( | const RepoId & | readerId, | |
const DDS::StringSeq & | exprParams | |||
) | [virtual] |
Implements OpenDDS::DCPS::DataWriterCallbacks.
Definition at line 821 of file ReplayerImpl.cpp.
DDS::ReturnCode_t OpenDDS::DCPS::ReplayerImpl::write | ( | const RawDataSample * | sample_array, | |
int | array_size, | |||
DDS::InstanceHandle_t * | reader | |||
) | [private] |
Definition at line 947 of file ReplayerImpl.cpp.
References ACE_TEXT(), OpenDDS::DCPS::DataSampleHeader::byte_order_, create_sample_data_message(), DBG_ENTRY_LVL, OpenDDS::DCPS::SendStateDataSampleList::dequeue(), OpenDDS::DCPS::SendStateDataSampleList::enqueue_tail(), OpenDDS::DCPS::DataSampleElement::get_header(), OpenDDS::DCPS::DomainParticipantImpl::get_repoid(), OpenDDS::DCPS::GUID_UNKNOWN, LM_ERROR, lock_, OpenDDS::DCPS::move(), participant_servant_, pending_write_count_, OpenDDS::DCPS::DataSampleHeader::publication_id_, publication_id_, reader_info_, DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::RawDataSample::sample_byte_order_, sample_list_element_allocator_, OpenDDS::DCPS::TransportClient::send(), sequence_number_, OpenDDS::DCPS::DataSampleElement::set_num_subs(), OpenDDS::DCPS::DataSampleElement::set_sample(), OpenDDS::DCPS::DataSampleElement::set_sub_id(), and OpenDDS::DCPS::RawDataSample::source_timestamp_.
00950 { 00951 DBG_ENTRY_LVL("ReplayerImpl","write",6); 00952 00953 OpenDDS::DCPS::RepoId repo_id; 00954 if (reader_ih_ptr) { 00955 repo_id = this->participant_servant_->get_repoid(*reader_ih_ptr); 00956 if (repo_id == GUID_UNKNOWN) { 00957 ACE_ERROR_RETURN((LM_ERROR, 00958 ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::write: ") 00959 ACE_TEXT("Invalid reader instance handle (%d)\n"), *reader_ih_ptr), 00960 DDS::RETCODE_ERROR); 00961 } 00962 } 00963 00964 SendStateDataSampleList list; 00965 00966 for (int i = 0; i < num_samples; ++i) { 00967 DataSampleElement* element = 0; 00968 00969 ACE_NEW_MALLOC_RETURN( 00970 element, 00971 static_cast<DataSampleElement*>( 00972 sample_list_element_allocator_->malloc( 00973 sizeof(DataSampleElement))), 00974 DataSampleElement(publication_id_, 00975 this, 00976 PublicationInstance_rch()), 00977 DDS::RETCODE_ERROR); 00978 00979 element->get_header().byte_order_ = samples[i].sample_byte_order_; 00980 element->get_header().publication_id_ = this->publication_id_; 00981 list.enqueue_tail(element); 00982 Message_Block_Ptr temp; 00983 Message_Block_Ptr sample(samples[i].sample_->duplicate()); 00984 DDS::ReturnCode_t ret = create_sample_data_message(move(sample), 00985 element->get_header(), 00986 temp, 00987 samples[i].source_timestamp_, 00988 false); 00989 element->set_sample(move(temp)); 00990 if (reader_ih_ptr) { 00991 element->set_num_subs(1); 00992 element->set_sub_id(0, repo_id); 00993 } 00994 00995 if (ret != DDS::RETCODE_OK) { 00996 // we need to free the list 00997 while (list.dequeue(element)) { 00998 ACE_DES_FREE(element, sample_list_element_allocator_->free, DataSampleElement); 00999 } 01000 01001 return ret; 01002 } 01003 } 01004 01005 { 01006 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, DDS::RETCODE_ERROR); 01007 ++pending_write_count_; 01008 } 01009 01010 this->send(list); 01011 01012 for (RepoIdToReaderInfoMap::iterator iter = reader_info_.begin(), 01013 end = reader_info_.end(); iter != end; ++iter) { 01014 iter->second.expected_sequence_ = sequence_number_; 01015 } 01016 01017 return DDS::RETCODE_OK; 01018 }
DDS::ReturnCode_t OpenDDS::DCPS::ReplayerImpl::write | ( | const RawDataSample & | sample | ) | [virtual] |
Send the sample to all associated DataReaders.
Implements OpenDDS::DCPS::Replayer.
Definition at line 1021 of file ReplayerImpl.cpp.
Referenced by write_to_reader().
01022 { 01023 return this->write(&sample, 1, 0); 01024 }
DDS::ReturnCode_t OpenDDS::DCPS::ReplayerImpl::write_to_reader | ( | DDS::InstanceHandle_t | subscription, | |
const RawDataSampleList & | samples | |||
) | [virtual] |
Send the samples to the specified DataReader.
Implements OpenDDS::DCPS::Replayer.
Definition at line 1134 of file ReplayerImpl.cpp.
References DDS::RETCODE_ERROR, and write().
01136 { 01137 if (samples.size()) 01138 return write(&samples[0], static_cast<int>(samples.size()), &subscription); 01139 return DDS::RETCODE_ERROR; 01140 }
DDS::ReturnCode_t OpenDDS::DCPS::ReplayerImpl::write_to_reader | ( | DDS::InstanceHandle_t | subscription, | |
const RawDataSample & | sample | |||
) | [virtual] |
Send the sample to the specified DataReader.
Implements OpenDDS::DCPS::Replayer.
Definition at line 1127 of file ReplayerImpl.cpp.
References write().
01129 { 01130 return write(&sample, 1, &subscription); 01131 }
friend class ::DDS_TEST [friend] |
Reimplemented from OpenDDS::DCPS::TransportClient.
Definition at line 214 of file ReplayerImpl.h.
RepoIdSet OpenDDS::DCPS::ReplayerImpl::assoc_complete_readers_ [private] |
Definition at line 312 of file ReplayerImpl.h.
Referenced by add_association(), and association_complete().
The multiplier for allocators affected by associations.
Definition at line 189 of file ReplayerImpl.h.
Referenced by enable().
Definition at line 126 of file ReplayerImpl.h.
Referenced by data_delivered().
Definition at line 278 of file ReplayerImpl.h.
Referenced by create_sample_data_message(), and enable().
The domain id.
Definition at line 231 of file ReplayerImpl.h.
Referenced by add_association(), cleanup(), enable(), init(), and set_qos().
Definition at line 314 of file ReplayerImpl.h.
Referenced by cleanup(), data_delivered(), and data_dropped().
Definition at line 280 of file ReplayerImpl.h.
Referenced by create_sample_data_message(), and enable().
RepoIdToHandleMap OpenDDS::DCPS::ReplayerImpl::id_to_handle_map_ [private] |
Definition at line 249 of file ReplayerImpl.h.
Referenced by association_complete_i(), and remove_associations().
RepoIdToSequenceMap OpenDDS::DCPS::ReplayerImpl::idToSequence_ [private] |
Definition at line 310 of file ReplayerImpl.h.
Referenced by remove_associations().
bool OpenDDS::DCPS::ReplayerImpl::is_bit_ [private] |
The time interval for sending liveliness message.
The orb's reactor to be used to register the liveliness timer. Timestamp of last write/dispose/assert_liveliness. Total number of offered deadlines missed during last offered deadline status check. Watchdog responsible for reporting missed offered deadlines. The flag indicates whether the liveliness timer is scheduled and needs be cancelled. Flag indicates that this datawriter is a builtin topic datawriter.
Definition at line 305 of file ReplayerImpl.h.
Referenced by add_association(), association_complete(), association_complete_i(), init(), and remove_associations().
Used to notify the entity for relevant events.
Definition at line 229 of file ReplayerImpl.h.
Referenced by association_complete_i(), get_listener(), init(), remove_associations(), and set_listener().
The StatusKind bit mask indicates which status condition change can be notified by the listener of this entity.
Definition at line 227 of file ReplayerImpl.h.
Referenced by init(), and set_listener().
The sample data container.
The lock to protect the activate subscriptions and status changes.
Reimplemented from OpenDDS::DCPS::EntityImpl.
Definition at line 245 of file ReplayerImpl.h.
Referenced by add_association(), association_complete(), association_complete_i(), cleanup(), data_delivered(), data_dropped(), remove_all_associations(), remove_associations(), update_incompatible_qos(), and write().
True if the writer failed to actively signal its liveliness within its offered liveliness period.
Definition at line 276 of file ReplayerImpl.h.
Referenced by create_sample_data_message(), and enable().
size_t OpenDDS::DCPS::ReplayerImpl::n_chunks_ [private] |
The number of chunks for the cached allocator.
Definition at line 186 of file ReplayerImpl.h.
Referenced by enable().
DDS::OfferedIncompatibleQosStatus OpenDDS::DCPS::ReplayerImpl::offered_incompatible_qos_status_ [private] |
Status conditions.
Definition at line 256 of file ReplayerImpl.h.
Referenced by ReplayerImpl(), and update_incompatible_qos().
The participant servant which creats the publisher that creates this datawriter.
Definition at line 199 of file ReplayerImpl.h.
Referenced by add_association(), association_complete_i(), enable(), get_instance_handle(), init(), lookup_instance_handles(), set_qos(), and write().
RepoIdSet OpenDDS::DCPS::ReplayerImpl::pending_readers_ [private] |
Definition at line 312 of file ReplayerImpl.h.
Referenced by add_association(), association_complete(), remove_all_associations(), and remove_associations().
int OpenDDS::DCPS::ReplayerImpl::pending_write_count_ [private] |
Definition at line 315 of file ReplayerImpl.h.
Referenced by cleanup(), data_delivered(), data_dropped(), and write().
The repository id of this datawriter/publication.
Definition at line 237 of file ReplayerImpl.h.
Referenced by add_association(), association_complete(), cleanup(), data_delivered(), enable(), get_instance_handle(), get_repo_id(), remove_associations(), set_qos(), and write().
Definition at line 257 of file ReplayerImpl.h.
Referenced by association_complete_i(), remove_associations(), and ReplayerImpl().
Definition at line 234 of file ReplayerImpl.h.
Referenced by enable(), get_qos(), init(), retrieve_inline_qos_data(), and set_qos().
The publisher servant which creates this datawriter.
Definition at line 233 of file ReplayerImpl.h.
The qos policy list of this datawriter.
Definition at line 195 of file ReplayerImpl.h.
Referenced by add_association(), create_sample_data_message(), enable(), get_priority_value(), get_qos(), init(), retrieve_inline_qos_data(), and set_qos().
RepoIdToReaderInfoMap OpenDDS::DCPS::ReplayerImpl::reader_info_ [private] |
Definition at line 210 of file ReplayerImpl.h.
Referenced by add_association(), need_sequence_repair(), remove_associations(), and write().
RepoIdSet OpenDDS::DCPS::ReplayerImpl::readers_ [private] |
Definition at line 251 of file ReplayerImpl.h.
Referenced by association_complete_i(), remove_all_associations(), and remove_associations().
unique_ptr<DataSampleElementAllocator> OpenDDS::DCPS::ReplayerImpl::sample_list_element_allocator_ [private] |
The cached allocator to allocate DataSampleElement objects.
Definition at line 284 of file ReplayerImpl.h.
Referenced by data_delivered(), data_dropped(), enable(), and write().
The sequence number unique in DataWriter scope.
Definition at line 239 of file ReplayerImpl.h.
Referenced by create_sample_data_message(), need_sequence_repair(), and write().
RepoId OpenDDS::DCPS::ReplayerImpl::topic_id_ [private] |
The associated topic repository id.
Definition at line 219 of file ReplayerImpl.h.
Referenced by init().
The name of associated topic.
Definition at line 217 of file ReplayerImpl.h.
Referenced by init(), and retrieve_inline_qos_data().
DDS::Topic_var OpenDDS::DCPS::ReplayerImpl::topic_objref_ [private] |
The object reference of the associated topic.
Definition at line 221 of file ReplayerImpl.h.
The topic servant.
Definition at line 223 of file ReplayerImpl.h.
Referenced by cleanup(), enable(), inconsistent_topic(), and init().
The type name of associated topic.
Definition at line 192 of file ReplayerImpl.h.
Referenced by init().