#include <ReplayerImpl.h>
Inheritance diagram for OpenDDS::DCPS::ReplayerImpl:
Public Member Functions | |
ReplayerImpl () | |
~ReplayerImpl () | |
DDS::ReturnCode_t | cleanup () |
virtual void | init (DDS::Topic_ptr topic, TopicImpl *topic_servant, const DDS::DataWriterQos &qos, ReplayerListener_rch a_listener, const DDS::StatusMask &mask, OpenDDS::DCPS::DomainParticipantImpl *participant_servant, const DDS::PublisherQos &publisher_qos) |
virtual DDS::ReturnCode_t | write (const RawDataSample &sample) |
virtual DDS::ReturnCode_t | write_to_reader (DDS::InstanceHandle_t subscription, const RawDataSample &sample) |
virtual DDS::ReturnCode_t | write_to_reader (DDS::InstanceHandle_t subscription, const RawDataSampleList &samples) |
virtual DDS::ReturnCode_t | set_qos (const ::DDS::PublisherQos &publisher_qos, const DDS::DataWriterQos &datawriter_qos) |
virtual DDS::ReturnCode_t | get_qos (DDS::PublisherQos &publisher_qos, DDS::DataWriterQos &datawriter_qos) |
virtual DDS::ReturnCode_t | set_listener (const ReplayerListener_rch &a_listener, DDS::StatusMask mask) |
virtual ReplayerListener_rch | get_listener () |
virtual bool | check_transport_qos (const TransportInst &inst) |
virtual const RepoId & | get_repo_id () const |
DDS::DomainId_t | domain_id () const |
virtual CORBA::Long | get_priority_value (const AssociationData &data) const |
virtual void | data_delivered (const DataSampleElement *sample) |
virtual void | data_dropped (const DataSampleElement *sample, bool dropped_by_transport) |
virtual void | control_delivered (ACE_Message_Block *sample) |
virtual void | control_dropped (ACE_Message_Block *sample, bool dropped_by_transport) |
virtual void | notify_publication_disconnected (const ReaderIdSeq &subids) |
virtual void | notify_publication_reconnected (const ReaderIdSeq &subids) |
virtual void | notify_publication_lost (const ReaderIdSeq &subids) |
virtual void | notify_connection_deleted (const RepoId &) |
virtual void | retrieve_inline_qos_data (InlineQosData &qos_data) const |
virtual void | add_association (const RepoId &yourId, const ReaderAssociation &reader, bool active) |
virtual void | association_complete (const RepoId &remote_id) |
virtual void | remove_associations (const ReaderIdSeq &readers, CORBA::Boolean callback) |
virtual void | update_incompatible_qos (const IncompatibleQosStatus &status) |
virtual void | update_subscription_params (const RepoId &readerId, const DDS::StringSeq &exprParams) |
virtual void | inconsistent_topic () |
void | remove_all_associations () |
virtual void | register_for_reader (const RepoId &participant, const RepoId &writerid, const RepoId &readerid, const TransportLocatorSeq &locators, DiscoveryListener *listener) |
virtual void | unregister_for_reader (const RepoId &participant, const RepoId &writerid, const RepoId &readerid) |
DDS::ReturnCode_t | enable () |
DomainParticipantImpl * | participant () |
virtual DDS::InstanceHandle_t | get_instance_handle () |
Public Attributes | |
int | data_dropped_count_ |
Statistics counter. | |
int | data_delivered_count_ |
Private Member Functions | |
void | _add_ref () |
void | _remove_ref () |
void | notify_publication_lost (const DDS::InstanceHandleSeq &handles) |
DDS::ReturnCode_t | write (const RawDataSample *sample_array, int array_size, DDS::InstanceHandle_t *reader) |
DDS::ReturnCode_t | create_sample_data_message (DataSample *data, DataSampleHeader &header_data, ACE_Message_Block *&message, const DDS::Time_t &source_timestamp, bool content_filter) |
bool | need_sequence_repair () const |
bool | lookup_instance_handles (const ReaderIdSeq &ids, DDS::InstanceHandleSeq &hdls) |
Lookup the instance handles by the subscription repo ids. | |
typedef | OPENDDS_MAP_CMP (RepoId, ReaderInfo, GUID_tKeyLessThan) RepoIdToReaderInfoMap |
void | association_complete_i (const RepoId &remote_id) |
typedef | OPENDDS_MAP_CMP (RepoId, DDS::InstanceHandle_t, GUID_tKeyLessThan) RepoIdToHandleMap |
typedef | OPENDDS_MAP_CMP (RepoId, SequenceNumber, GUID_tKeyLessThan) RepoIdToSequenceMap |
Flag indicates that the init() is called. | |
Private Attributes | |
size_t | n_chunks_ |
The number of chunks for the cached allocator. | |
size_t | association_chunk_multiplier_ |
The multiplier for allocators affected by associations. | |
CORBA::String_var | type_name_ |
The type name of associated topic. | |
DDS::DataWriterQos | qos_ |
The qos policy list of this datawriter. | |
DomainParticipantImpl * | participant_servant_ |
RepoIdToReaderInfoMap | reader_info_ |
CORBA::String_var | topic_name_ |
The name of associated topic. | |
RepoId | topic_id_ |
The associated topic repository id. | |
DDS::Topic_var | topic_objref_ |
The object reference of the associated topic. | |
TopicImpl * | topic_servant_ |
The topic servant. | |
DDS::StatusMask | listener_mask_ |
ReplayerListener_rch | listener_ |
Used to notify the entity for relevant events. | |
DDS::DomainId_t | domain_id_ |
The domain id. | |
PublisherImpl * | publisher_servant_ |
The publisher servant which creates this datawriter. | |
DDS::PublisherQos | publisher_qos_ |
PublicationId | publication_id_ |
The repository id of this datawriter/publication. | |
SequenceNumber | sequence_number_ |
The sequence number unique in DataWriter scope. | |
ACE_Recursive_Thread_Mutex | lock_ |
The sample data container. | |
RepoIdToHandleMap | id_to_handle_map_ |
RepoIdSet | readers_ |
DDS::OfferedIncompatibleQosStatus | offered_incompatible_qos_status_ |
Status conditions. | |
DDS::PublicationMatchedStatus | publication_match_status_ |
std::auto_ptr< MessageBlockAllocator > | mb_allocator_ |
std::auto_ptr< DataBlockAllocator > | db_allocator_ |
std::auto_ptr< DataSampleHeaderAllocator > | header_allocator_ |
std::auto_ptr< DataSampleElementAllocator > | sample_list_element_allocator_ |
std::auto_ptr< TransportSendElementAllocator > | transport_send_element_allocator_ |
std::auto_ptr< TransportCustomizedElementAllocator > | transport_customized_element_allocator_ |
bool | is_bit_ |
Timestamp of last write/dispose/assert_liveliness. | |
RepoIdToSequenceMap | idToSequence_ |
RepoIdSet | pending_readers_ |
RepoIdSet | assoc_complete_readers_ |
ACE_Condition< ACE_Recursive_Thread_Mutex > | empty_condition_ |
int | pending_write_count_ |
Friends | |
class | ::DDS_TEST |
Classes | |
struct | ReaderInfo |
This class is the implmentation of the Replayer. Inheritance is used to limit the applications access to underlying system methods.
Definition at line 58 of file ReplayerImpl.h.
OpenDDS::DCPS::ReplayerImpl::ReplayerImpl | ( | ) |
Definition at line 52 of file ReplayerImpl.cpp.
References DDS::PublicationMatchedStatus::current_count, DDS::PublicationMatchedStatus::current_count_change, DDS::HANDLE_NIL, DDS::OfferedIncompatibleQosStatus::last_policy_id, DDS::PublicationMatchedStatus::last_subscription_handle, offered_incompatible_qos_status_, DDS::OfferedIncompatibleQosStatus::policies, publication_match_status_, DDS::PublicationMatchedStatus::total_count, DDS::OfferedIncompatibleQosStatus::total_count, DDS::PublicationMatchedStatus::total_count_change, and DDS::OfferedIncompatibleQosStatus::total_count_change.
00053 : data_dropped_count_(0), 00054 data_delivered_count_(0), 00055 n_chunks_(TheServiceParticipant->n_chunks()), 00056 association_chunk_multiplier_(TheServiceParticipant->association_chunk_multiplier()), 00057 qos_(TheServiceParticipant->initial_DataWriterQos()), 00058 participant_servant_(0), 00059 topic_id_(GUID_UNKNOWN), 00060 topic_servant_(0), 00061 listener_mask_(DEFAULT_STATUS_MASK), 00062 domain_id_(0), 00063 publisher_servant_(0), 00064 publication_id_(GUID_UNKNOWN), 00065 sequence_number_(SequenceNumber::SEQUENCENUMBER_UNKNOWN()), 00066 // data_container_(0), 00067 // liveliness_lost_(false), 00068 // last_deadline_missed_total_count_(0), 00069 is_bit_(false), 00070 empty_condition_(lock_), 00071 pending_write_count_(0) 00072 { 00073 // liveliness_lost_status_.total_count = 0; 00074 // liveliness_lost_status_.total_count_change = 0; 00075 // 00076 // offered_deadline_missed_status_.total_count = 0; 00077 // offered_deadline_missed_status_.total_count_change = 0; 00078 // offered_deadline_missed_status_.last_instance_handle = DDS::HANDLE_NIL; 00079 00080 offered_incompatible_qos_status_.total_count = 0; 00081 offered_incompatible_qos_status_.total_count_change = 0; 00082 offered_incompatible_qos_status_.last_policy_id = 0; 00083 offered_incompatible_qos_status_.policies.length(0); 00084 00085 publication_match_status_.total_count = 0; 00086 publication_match_status_.total_count_change = 0; 00087 publication_match_status_.current_count = 0; 00088 publication_match_status_.current_count_change = 0; 00089 publication_match_status_.last_subscription_handle = DDS::HANDLE_NIL; 00090 00091 }
OpenDDS::DCPS::ReplayerImpl::~ReplayerImpl | ( | ) |
Definition at line 95 of file ReplayerImpl.cpp.
References DBG_ENTRY_LVL.
00096 { 00097 DBG_ENTRY_LVL("ReplayerImpl","~ReplayerImpl",6); 00098 }
void OpenDDS::DCPS::ReplayerImpl::_add_ref | ( | ) | [inline, private, virtual] |
void OpenDDS::DCPS::ReplayerImpl::_remove_ref | ( | ) | [inline, private, virtual] |
void OpenDDS::DCPS::ReplayerImpl::add_association | ( | const RepoId & | yourId, | |
const ReaderAssociation & | reader, | |||
bool | active | |||
) | [virtual] |
Implements OpenDDS::DCPS::DataWriterCallbacks.
Definition at line 403 of file ReplayerImpl.cpp.
References assoc_complete_readers_, association_complete_i(), DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, DDS::DataReaderQos::durability, OpenDDS::DCPS::ReaderAssociation::exprParams, OpenDDS::DCPS::ReaderAssociation::filterExpression, OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::insert(), is_bit_, OPENDDS_STRING, participant_servant_, pending_readers_, publication_id_, qos_, reader_info_, OpenDDS::DCPS::ReaderAssociation::readerId, OpenDDS::DCPS::ReaderAssociation::readerQos, OpenDDS::DCPS::ReaderAssociation::readerTransInfo, DDS::DataReaderQos::reliability, DDS::RELIABLE_RELIABILITY_QOS, OpenDDS::DCPS::AssociationData::remote_data_, OpenDDS::DCPS::AssociationData::remote_durable_, OpenDDS::DCPS::AssociationData::remote_id_, OpenDDS::DCPS::AssociationData::remote_reliable_, TheServiceParticipant, DDS::DataWriterQos::transport_priority, and DDS::VOLATILE_DURABILITY_QOS.
00406 { 00407 DBG_ENTRY_LVL("ReplayerImpl", "add_association", 6); 00408 00409 if (DCPS_debug_level >= 1) { 00410 GuidConverter writer_converter(yourId); 00411 GuidConverter reader_converter(reader.readerId); 00412 ACE_DEBUG((LM_DEBUG, 00413 ACE_TEXT("(%P|%t) ReplayerImpl::add_association - ") 00414 ACE_TEXT("bit %d local %C remote %C\n"), 00415 is_bit_, 00416 OPENDDS_STRING(writer_converter).c_str(), 00417 OPENDDS_STRING(reader_converter).c_str())); 00418 } 00419 00420 // if (entity_deleted_ == true) { 00421 // if (DCPS_debug_level >= 1) 00422 // ACE_DEBUG((LM_DEBUG, 00423 // ACE_TEXT("(%P|%t) ReplayerImpl::add_association") 00424 // ACE_TEXT(" This is a deleted datawriter, ignoring add.\n"))); 00425 // 00426 // return; 00427 // } 00428 00429 if (GUID_UNKNOWN == publication_id_) { 00430 publication_id_ = yourId; 00431 } 00432 00433 { 00434 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_); 00435 reader_info_.insert(std::make_pair(reader.readerId, 00436 ReaderInfo(TheServiceParticipant->publisher_content_filter() ? reader.filterExpression : "", 00437 reader.exprParams, participant_servant_, 00438 reader.readerQos.durability.kind > DDS::VOLATILE_DURABILITY_QOS))); 00439 } 00440 00441 if (DCPS_debug_level > 4) { 00442 GuidConverter converter(publication_id_); 00443 ACE_DEBUG((LM_DEBUG, 00444 ACE_TEXT("(%P|%t) ReplayerImpl::add_association(): ") 00445 ACE_TEXT("adding subscription to publication %C with priority %d.\n"), 00446 OPENDDS_STRING(converter).c_str(), 00447 qos_.transport_priority.value)); 00448 } 00449 00450 AssociationData data; 00451 data.remote_id_ = reader.readerId; 00452 data.remote_data_ = reader.readerTransInfo; 00453 data.remote_reliable_ = 00454 (reader.readerQos.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS); 00455 data.remote_durable_ = 00456 (reader.readerQos.durability.kind > DDS::VOLATILE_DURABILITY_QOS); 00457 00458 if (!this->associate(data, active)) { 00459 //FUTURE: inform inforepo and try again as passive peer 00460 if (DCPS_debug_level) { 00461 ACE_DEBUG((LM_ERROR, 00462 ACE_TEXT("(%P|%t) ReplayerImpl::add_association: ") 00463 ACE_TEXT("ERROR: transport layer failed to associate.\n"))); 00464 } 00465 return; 00466 } 00467 00468 if (active) { 00469 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_); 00470 00471 // Have we already received an association_complete() callback? 00472 if (assoc_complete_readers_.count(reader.readerId)) { 00473 assoc_complete_readers_.erase(reader.readerId); 00474 association_complete_i(reader.readerId); 00475 00476 // Add to pending_readers_ -> pending means we are waiting 00477 // for the association_complete() callback. 00478 } else if (OpenDDS::DCPS::insert(pending_readers_, reader.readerId) == -1) { 00479 GuidConverter converter(reader.readerId); 00480 ACE_ERROR((LM_ERROR, 00481 ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::add_association: ") 00482 ACE_TEXT("failed to mark %C as pending.\n"), 00483 OPENDDS_STRING(converter).c_str())); 00484 00485 } else { 00486 if (DCPS_debug_level > 0) { 00487 GuidConverter converter(reader.readerId); 00488 ACE_DEBUG((LM_DEBUG, 00489 ACE_TEXT("(%P|%t) ReplayerImpl::add_association: ") 00490 ACE_TEXT("marked %C as pending.\n"), 00491 OPENDDS_STRING(converter).c_str())); 00492 } 00493 } 00494 } else { 00495 // In the current implementation, DataWriter is always active, so this 00496 // code will not be applicable. 00497 Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_); 00498 disco->association_complete(this->domain_id_, 00499 this->participant_servant_->get_id(), 00500 this->publication_id_, reader.readerId); 00501 } 00502 }
void OpenDDS::DCPS::ReplayerImpl::association_complete | ( | const RepoId & | remote_id | ) | [virtual] |
Implements OpenDDS::DCPS::DataWriterCallbacks.
Definition at line 524 of file ReplayerImpl.cpp.
References assoc_complete_readers_, association_complete_i(), DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, is_bit_, OPENDDS_STRING, pending_readers_, and OpenDDS::DCPS::remove().
00525 { 00526 DBG_ENTRY_LVL("ReplayerImpl", "association_complete", 6); 00527 00528 if (DCPS_debug_level >= 1) { 00529 GuidConverter writer_converter(this->publication_id_); 00530 GuidConverter reader_converter(remote_id); 00531 ACE_DEBUG((LM_DEBUG, 00532 ACE_TEXT("(%P|%t) ReplayerImpl::association_complete - ") 00533 ACE_TEXT("bit %d local %C remote %C\n"), 00534 is_bit_, 00535 OPENDDS_STRING(writer_converter).c_str(), 00536 OPENDDS_STRING(reader_converter).c_str())); 00537 } 00538 00539 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_); 00540 if (OpenDDS::DCPS::remove(pending_readers_, remote_id) == -1) { 00541 // Not found in pending_readers_, defer calling association_complete_i() 00542 // until add_association() resumes and sees this ID in assoc_complete_readers_. 00543 assoc_complete_readers_.insert(remote_id); 00544 } else { 00545 association_complete_i(remote_id); 00546 } 00547 }
void OpenDDS::DCPS::ReplayerImpl::association_complete_i | ( | const RepoId & | remote_id | ) | [private] |
Definition at line 550 of file ReplayerImpl.cpp.
References OpenDDS::DCPS::bind(), DDS::PublicationMatchedStatus::current_count, DDS::PublicationMatchedStatus::current_count_change, DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::DomainParticipantImpl::id_to_handle(), id_to_handle_map_, OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::insert(), is_bit_, DDS::PublicationMatchedStatus::last_subscription_handle, listener_, OPENDDS_STRING, participant_servant_, publication_match_status_, readers_, DDS::PublicationMatchedStatus::total_count, and DDS::PublicationMatchedStatus::total_count_change.
Referenced by add_association(), and association_complete().
00551 { 00552 DBG_ENTRY_LVL("ReplayerImpl", "association_complete_i", 6); 00553 // bool reader_durable = false; 00554 { 00555 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_); 00556 if (OpenDDS::DCPS::insert(readers_, remote_id) == -1) { 00557 GuidConverter converter(remote_id); 00558 ACE_ERROR((LM_ERROR, 00559 ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::association_complete_i: ") 00560 ACE_TEXT("insert %C from pending failed.\n"), 00561 OPENDDS_STRING(converter).c_str())); 00562 } 00563 // RepoIdToReaderInfoMap::const_iterator it = reader_info_.find(remote_id); 00564 // if (it != reader_info_.end()) { 00565 // reader_durable = it->second.durable_; 00566 // } 00567 } 00568 00569 if (!is_bit_) { 00570 00571 DDS::InstanceHandle_t handle = 00572 this->participant_servant_->id_to_handle(remote_id); 00573 00574 { 00575 // protect publication_match_status_ and status changed flags. 00576 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_); 00577 00578 // update the publication_match_status_ 00579 ++publication_match_status_.total_count; 00580 ++publication_match_status_.total_count_change; 00581 ++publication_match_status_.current_count; 00582 ++publication_match_status_.current_count_change; 00583 00584 if (OpenDDS::DCPS::bind(id_to_handle_map_, remote_id, handle) != 0) { 00585 GuidConverter converter(remote_id); 00586 ACE_DEBUG((LM_WARNING, 00587 ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::association_complete_i: ") 00588 ACE_TEXT("id_to_handle_map_%C = 0x%x failed.\n"), 00589 OPENDDS_STRING(converter).c_str(), 00590 handle)); 00591 return; 00592 00593 } else if (DCPS_debug_level > 4) { 00594 GuidConverter converter(remote_id); 00595 ACE_DEBUG((LM_DEBUG, 00596 ACE_TEXT("(%P|%t) ReplayerImpl::association_complete_i: ") 00597 ACE_TEXT("id_to_handle_map_%C = 0x%x.\n"), 00598 OPENDDS_STRING(converter).c_str(), 00599 handle)); 00600 } 00601 00602 publication_match_status_.last_subscription_handle = handle; 00603 00604 } 00605 00606 00607 if (listener_.in()) { 00608 listener_->on_replayer_matched(this, 00609 publication_match_status_); 00610 00611 // TBD - why does the spec say to change this but not 00612 // change the ChangeFlagStatus after a listener call? 00613 publication_match_status_.total_count_change = 0; 00614 publication_match_status_.current_count_change = 0; 00615 } 00616 00617 } 00618 00619 }
bool OpenDDS::DCPS::ReplayerImpl::check_transport_qos | ( | const TransportInst & | inst | ) | [virtual] |
Implements OpenDDS::DCPS::TransportClient.
Definition at line 858 of file ReplayerImpl.cpp.
00859 { 00860 // DataWriter does not impose any constraints on which transports 00861 // may be used based on QoS. 00862 return true; 00863 }
DDS::ReturnCode_t OpenDDS::DCPS::ReplayerImpl::cleanup | ( | ) |
cleanup the DataWriter.
Definition at line 102 of file ReplayerImpl.cpp.
References empty_condition_, publication_id_, remove_all_associations(), OpenDDS::DCPS::TopicImpl::remove_entity_ref(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, TheServiceParticipant, topic_objref_, and topic_servant_.
Referenced by OpenDDS::DCPS::Service_Participant::delete_replayer().
00103 { 00104 00105 // // Unregister all registered instances prior to deletion. 00106 // // DDS::Time_t source_timestamp = time_value_to_time(ACE_OS::gettimeofday()); 00107 // // this->unregister_instances(source_timestamp); 00108 // 00109 // // CORBA::String_var topic_name = this->get_Atopic_name(); 00110 { 00111 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, DDS::RETCODE_ERROR); 00112 00113 // Wait for pending samples to drain prior to removing associations 00114 // and unregistering the publication. 00115 while (this->pending_write_count_) 00116 this->empty_condition_.wait(); 00117 00118 // Call remove association before unregistering the datawriter 00119 // with the transport, otherwise some callbacks resulted from 00120 // remove_association may lost. 00121 this->remove_all_associations(); 00122 00123 // release our Topic_var 00124 topic_objref_ = DDS::Topic::_nil(); 00125 topic_servant_->remove_entity_ref(); 00126 topic_servant_->_remove_ref(); 00127 topic_servant_ = 0; 00128 00129 } 00130 00131 // not just unregister but remove any pending writes/sends. 00132 // this->unregister_all(); 00133 00134 Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_); 00135 if (!disco->remove_publication( 00136 this->domain_id_, 00137 this->participant_servant_->get_id(), 00138 this->publication_id_)) { 00139 ACE_ERROR_RETURN((LM_ERROR, 00140 ACE_TEXT("(%P|%t) ERROR: ") 00141 ACE_TEXT("PublisherImpl::delete_datawriter, ") 00142 ACE_TEXT("publication not removed from discovery.\n")), 00143 DDS::RETCODE_ERROR); 00144 } 00145 return DDS::RETCODE_OK; 00146 }
void OpenDDS::DCPS::ReplayerImpl::control_delivered | ( | ACE_Message_Block * | sample | ) | [virtual] |
Reimplemented from OpenDDS::DCPS::TransportSendListener.
Definition at line 906 of file ReplayerImpl.cpp.
void OpenDDS::DCPS::ReplayerImpl::control_dropped | ( | ACE_Message_Block * | sample, | |
bool | dropped_by_transport | |||
) | [virtual] |
Reimplemented from OpenDDS::DCPS::TransportSendListener.
Definition at line 930 of file ReplayerImpl.cpp.
DDS::ReturnCode_t OpenDDS::DCPS::ReplayerImpl::create_sample_data_message | ( | DataSample * | data, | |
DataSampleHeader & | header_data, | |||
ACE_Message_Block *& | message, | |||
const DDS::Time_t & | source_timestamp, | |||
bool | content_filter | |||
) | [private] |
Definition at line 1055 of file ReplayerImpl.cpp.
References OpenDDS::DCPS::TransportClient::cdr_encapsulation(), OpenDDS::DCPS::DataSampleHeader::cdr_encapsulation_, OpenDDS::DCPS::DataSampleHeader::coherent_change_, OpenDDS::DCPS::DataSampleHeader::content_filter_, db_allocator_, DDS::DURATION_INFINITE_NSEC, DDS::DURATION_INFINITE_SEC, header_allocator_, DDS::DataWriterQos::lifespan, OpenDDS::DCPS::DataSampleHeader::lifespan_duration_, OpenDDS::DCPS::DataSampleHeader::lifespan_duration_nanosec_, OpenDDS::DCPS::DataSampleHeader::lifespan_duration_sec_, OpenDDS::DCPS::DataSampleHeader::max_marshaled_size(), max_marshaled_size(), mb_allocator_, OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::DataSampleHeader::message_length_, DDS::Time_t::nanosec, need_sequence_repair(), qos_, DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::SAMPLE_DATA, DDS::Time_t::sec, OpenDDS::DCPS::DataSampleHeader::sequence_, sequence_number_, OpenDDS::DCPS::DataSampleHeader::sequence_repair_, OpenDDS::DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN(), OpenDDS::DCPS::DataSampleHeader::source_timestamp_nanosec_, and OpenDDS::DCPS::DataSampleHeader::source_timestamp_sec_.
Referenced by write().
01060 { 01061 header_data.message_id_ = SAMPLE_DATA; 01062 header_data.coherent_change_ = content_filter; 01063 01064 header_data.content_filter_ = 0; 01065 header_data.cdr_encapsulation_ = this->cdr_encapsulation(); 01066 header_data.message_length_ = static_cast<ACE_UINT32>(data->total_length()); 01067 header_data.sequence_repair_ = need_sequence_repair(); 01068 if (this->sequence_number_ == SequenceNumber::SEQUENCENUMBER_UNKNOWN()) { 01069 this->sequence_number_ = SequenceNumber(); 01070 } else { 01071 ++this->sequence_number_; 01072 } 01073 header_data.sequence_ = this->sequence_number_; 01074 header_data.source_timestamp_sec_ = source_timestamp.sec; 01075 header_data.source_timestamp_nanosec_ = source_timestamp.nanosec; 01076 01077 if (qos_.lifespan.duration.sec != DDS::DURATION_INFINITE_SEC 01078 || qos_.lifespan.duration.nanosec != DDS::DURATION_INFINITE_NSEC) { 01079 header_data.lifespan_duration_ = true; 01080 header_data.lifespan_duration_sec_ = qos_.lifespan.duration.sec; 01081 header_data.lifespan_duration_nanosec_ = qos_.lifespan.duration.nanosec; 01082 } 01083 01084 // header_data.publication_id_ = publication_id_; 01085 // header_data.publisher_id_ = this->publisher_servant_->publisher_id_; 01086 size_t max_marshaled_size = header_data.max_marshaled_size(); 01087 01088 ACE_NEW_MALLOC_RETURN(message, 01089 static_cast<ACE_Message_Block*>( 01090 mb_allocator_->malloc(sizeof(ACE_Message_Block))), 01091 ACE_Message_Block(max_marshaled_size, 01092 ACE_Message_Block::MB_DATA, 01093 data, //cont 01094 0, //data 01095 header_allocator_.get(), //alloc_strategy 01096 0, //locking_strategy 01097 ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY, 01098 ACE_Time_Value::zero, 01099 ACE_Time_Value::max_time, 01100 db_allocator_.get(), 01101 mb_allocator_.get()), 01102 DDS::RETCODE_ERROR); 01103 01104 *message << header_data; 01105 return DDS::RETCODE_OK; 01106 }
void OpenDDS::DCPS::ReplayerImpl::data_delivered | ( | const DataSampleElement * | sample | ) | [virtual] |
Reimplemented from OpenDDS::DCPS::TransportSendListener.
Definition at line 878 of file ReplayerImpl.cpp.
References data_delivered_count_, DBG_ENTRY_LVL, empty_condition_, OpenDDS::DCPS::DataSampleElement::get_pub_id(), OPENDDS_STRING, pending_write_count_, publication_id_, and sample_list_element_allocator_.
00879 { 00880 DBG_ENTRY_LVL("ReplayerImpl","data_delivered",6); 00881 if (!(sample->get_pub_id() == this->publication_id_)) { 00882 GuidConverter sample_converter(sample->get_pub_id()); 00883 GuidConverter writer_converter(publication_id_); 00884 ACE_ERROR((LM_ERROR, 00885 ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::data_delivered: ") 00886 ACE_TEXT(" The publication id %C from delivered element ") 00887 ACE_TEXT("does not match the datawriter's id %C\n"), 00888 OPENDDS_STRING(sample_converter).c_str(), 00889 OPENDDS_STRING(writer_converter).c_str())); 00890 return; 00891 } 00892 DataSampleElement* elem = const_cast<DataSampleElement*>(sample); 00893 // this->data_container_->data_delivered(sample); 00894 ACE_DES_FREE(elem, sample_list_element_allocator_->free, DataSampleElement); 00895 ++data_delivered_count_; 00896 00897 { 00898 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_); 00899 if ((--pending_write_count_) == 0) { 00900 empty_condition_.broadcast(); 00901 } 00902 } 00903 }
void OpenDDS::DCPS::ReplayerImpl::data_dropped | ( | const DataSampleElement * | sample, | |
bool | dropped_by_transport | |||
) | [virtual] |
Reimplemented from OpenDDS::DCPS::TransportSendListener.
Definition at line 912 of file ReplayerImpl.cpp.
References data_dropped_count_, DBG_ENTRY_LVL, empty_condition_, pending_write_count_, and sample_list_element_allocator_.
00914 { 00915 DBG_ENTRY_LVL("ReplayerImpl","data_dropped",6); 00916 // this->data_container_->data_dropped(element, dropped_by_transport); 00917 ACE_UNUSED_ARG(dropped_by_transport); 00918 DataSampleElement* elem = const_cast<DataSampleElement*>(sample); 00919 ACE_DES_FREE(elem, sample_list_element_allocator_->free, DataSampleElement); 00920 ++data_dropped_count_; 00921 { 00922 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_); 00923 if ((--pending_write_count_) == 0) { 00924 empty_condition_.broadcast(); 00925 } 00926 } 00927 }
DDS::DomainId_t OpenDDS::DCPS::ReplayerImpl::domain_id | ( | ) | const [inline, virtual] |
Implements OpenDDS::DCPS::TransportClient.
Definition at line 105 of file ReplayerImpl.h.
00105 { return this->domain_id_; }
DDS::ReturnCode_t OpenDDS::DCPS::ReplayerImpl::enable | ( | ) |
Implements DDS::Entity.
Definition at line 303 of file ReplayerImpl.cpp.
References association_chunk_multiplier_, OpenDDS::DCPS::TransportClient::connection_info(), db_allocator_, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::TransportClient::enable_transport(), OpenDDS::DCPS::TopicImpl::get_id(), OpenDDS::DCPS::GUID_UNKNOWN, header_allocator_, DDS::LENGTH_UNLIMITED, mb_allocator_, n_chunks_, publication_id_, publisher_qos_, qos_, DDS::DataWriterQos::reliability, DDS::RELIABLE_RELIABILITY_QOS, DDS::DataWriterQos::resource_limits, DDS::RETCODE_ERROR, DDS::RETCODE_OK, sample_list_element_allocator_, OpenDDS::DCPS::EntityImpl::set_enabled(), TheServiceParticipant, topic_servant_, transport_customized_element_allocator_, transport_send_element_allocator_, and DDS::VOLATILE_DURABILITY_QOS.
00304 { 00305 //According spec: 00306 // - Calling enable on an already enabled Entity returns OK and has no 00307 // effect. 00308 // - Calling enable on an Entity whose factory is not enabled will fail 00309 // and return PRECONDITION_NOT_MET. 00310 00311 if (this->is_enabled()) { 00312 return DDS::RETCODE_OK; 00313 } 00314 00315 // if (this->publisher_servant_->is_enabled() == false) { 00316 // return DDS::RETCODE_PRECONDITION_NOT_MET; 00317 // } 00318 // 00319 const bool reliable = qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS; 00320 00321 if (qos_.resource_limits.max_samples != DDS::LENGTH_UNLIMITED) { 00322 n_chunks_ = qos_.resource_limits.max_samples; 00323 } 00324 // +1 because we might allocate one before releasing another 00325 // TBD - see if this +1 can be removed. 00326 ACE_auto_ptr_reset(mb_allocator_, 00327 new MessageBlockAllocator(n_chunks_ * association_chunk_multiplier_)); 00328 ACE_auto_ptr_reset(db_allocator_, 00329 new DataBlockAllocator(n_chunks_+1)); 00330 ACE_auto_ptr_reset(header_allocator_, 00331 new DataSampleHeaderAllocator(n_chunks_+1)); 00332 00333 ACE_auto_ptr_reset(sample_list_element_allocator_, 00334 new DataSampleElementAllocator(2 * n_chunks_)); 00335 00336 ACE_auto_ptr_reset(transport_send_element_allocator_, 00337 new TransportSendElementAllocator(2 * n_chunks_, 00338 sizeof(TransportSendElement))); 00339 ACE_auto_ptr_reset(transport_customized_element_allocator_, 00340 new TransportCustomizedElementAllocator(2 * n_chunks_, 00341 sizeof(TransportCustomizedElement))); 00342 00343 if (DCPS_debug_level >= 2) { 00344 ACE_DEBUG((LM_DEBUG, 00345 "(%P|%t) ReplayerImpl::enable-mb" 00346 " Cached_Allocator_With_Overflow %x with %d chunks\n", 00347 mb_allocator_.get(), 00348 n_chunks_)); 00349 00350 ACE_DEBUG((LM_DEBUG, 00351 "(%P|%t) ReplayerImpl::enable-db" 00352 " Cached_Allocator_With_Overflow %x with %d chunks\n", 00353 db_allocator_.get(), 00354 n_chunks_)); 00355 00356 ACE_DEBUG((LM_DEBUG, 00357 "(%P|%t) ReplayerImpl::enable-header" 00358 " Cached_Allocator_With_Overflow %x with %d chunks\n", 00359 header_allocator_.get(), 00360 n_chunks_)); 00361 } 00362 00363 this->set_enabled(); 00364 00365 try { 00366 this->enable_transport(reliable, 00367 this->qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS); 00368 00369 } catch (const Transport::Exception&) { 00370 ACE_ERROR((LM_ERROR, 00371 ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::enable, ") 00372 ACE_TEXT("Transport Exception.\n"))); 00373 return DDS::RETCODE_ERROR; 00374 00375 } 00376 00377 const TransportLocatorSeq& trans_conf_info = connection_info(); 00378 00379 00380 Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_); 00381 this->publication_id_ = 00382 disco->add_publication(this->domain_id_, 00383 this->participant_servant_->get_id(), 00384 this->topic_servant_->get_id(), 00385 this, 00386 this->qos_, 00387 trans_conf_info, 00388 this->publisher_qos_); 00389 00390 if (this->publication_id_ == GUID_UNKNOWN) { 00391 ACE_ERROR((LM_ERROR, 00392 ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::enable, ") 00393 ACE_TEXT("add_publication returned invalid id. \n"))); 00394 return DDS::RETCODE_ERROR; 00395 } 00396 00397 return DDS::RETCODE_OK; 00398 }
DDS::InstanceHandle_t OpenDDS::DCPS::ReplayerImpl::get_instance_handle | ( | ) | [virtual] |
Implements OpenDDS::DCPS::EntityImpl.
Definition at line 1151 of file ReplayerImpl.cpp.
References OpenDDS::DCPS::DomainParticipantImpl::id_to_handle(), participant_servant_, and publication_id_.
01152 { 01153 return this->participant_servant_->id_to_handle(publication_id_); 01154 }
ReplayerListener_rch OpenDDS::DCPS::ReplayerImpl::get_listener | ( | ) | [virtual] |
Get the listener for this Replayer.
Implements OpenDDS::DCPS::Replayer.
Definition at line 297 of file ReplayerImpl.cpp.
References listener_.
00298 { 00299 return listener_; 00300 }
CORBA::Long OpenDDS::DCPS::ReplayerImpl::get_priority_value | ( | const AssociationData & | data | ) | const [virtual] |
Implements OpenDDS::DCPS::TransportClient.
Definition at line 872 of file ReplayerImpl.cpp.
References qos_, and DDS::DataWriterQos::transport_priority.
00873 { 00874 return this->qos_.transport_priority.value; 00875 }
DDS::ReturnCode_t OpenDDS::DCPS::ReplayerImpl::get_qos | ( | DDS::PublisherQos & | publisher_qos, | |
DDS::DataWriterQos & | datawriter_qos | |||
) | [virtual] |
Get the Quality of Service settings for the Replayer.
Implements OpenDDS::DCPS::Replayer.
Definition at line 280 of file ReplayerImpl.cpp.
References publisher_qos_, qos_, and DDS::RETCODE_OK.
00282 { 00283 qos = qos_; 00284 publisher_qos = publisher_qos_; 00285 return DDS::RETCODE_OK; 00286 }
const RepoId & OpenDDS::DCPS::ReplayerImpl::get_repo_id | ( | ) | const [virtual] |
Implements OpenDDS::DCPS::TransportClient.
Definition at line 866 of file ReplayerImpl.cpp.
References publication_id_.
00867 { 00868 return this->publication_id_; 00869 }
void OpenDDS::DCPS::ReplayerImpl::inconsistent_topic | ( | ) | [virtual] |
Implements OpenDDS::DCPS::DataWriterCallbacks.
Definition at line 852 of file ReplayerImpl.cpp.
References OpenDDS::DCPS::TopicImpl::inconsistent_topic(), and topic_servant_.
00853 { 00854 topic_servant_->inconsistent_topic(); 00855 }
void OpenDDS::DCPS::ReplayerImpl::init | ( | DDS::Topic_ptr | topic, | |
TopicImpl * | topic_servant, | |||
const DDS::DataWriterQos & | qos, | |||
ReplayerListener_rch | a_listener, | |||
const DDS::StatusMask & | mask, | |||
OpenDDS::DCPS::DomainParticipantImpl * | participant_servant, | |||
const DDS::PublisherQos & | publisher_qos | |||
) | [virtual] |
Initialize the data members.
Definition at line 149 of file ReplayerImpl.cpp.
References OpenDDS::DCPS::TopicImpl::add_entity_ref(), OpenDDS::DCPS::BUILT_IN_PARTICIPANT_TOPIC, OpenDDS::DCPS::BUILT_IN_PUBLICATION_TOPIC, OpenDDS::DCPS::BUILT_IN_SUBSCRIPTION_TOPIC, OpenDDS::DCPS::BUILT_IN_TOPIC_TOPIC, DBG_ENTRY_LVL, domain_id_, OpenDDS::DCPS::DomainParticipantImpl::get_domain_id(), OpenDDS::DCPS::TopicImpl::get_id(), OpenDDS::DCPS::TopicDescriptionImpl::get_name(), OpenDDS::DCPS::TopicDescriptionImpl::get_type_name(), is_bit_, listener_, listener_mask_, participant_servant_, publisher_qos_, qos_, topic_id_, topic_name_, topic_objref_, topic_servant_, and type_name_.
00157 { 00158 DBG_ENTRY_LVL("ReplayerImpl","init",6); 00159 topic_objref_ = DDS::Topic::_duplicate(topic); 00160 topic_servant_ = topic_servant; 00161 topic_servant_->_add_ref(); 00162 topic_servant_->add_entity_ref(); 00163 topic_name_ = topic_servant_->get_name(); 00164 topic_id_ = topic_servant_->get_id(); 00165 type_name_ = topic_servant_->get_type_name(); 00166 00167 #if !defined (DDS_HAS_MINIMUM_BIT) 00168 is_bit_ = ACE_OS::strcmp(topic_name_.in(), BUILT_IN_PARTICIPANT_TOPIC) == 0 00169 || ACE_OS::strcmp(topic_name_.in(), BUILT_IN_TOPIC_TOPIC) == 0 00170 || ACE_OS::strcmp(topic_name_.in(), BUILT_IN_SUBSCRIPTION_TOPIC) == 0 00171 || ACE_OS::strcmp(topic_name_.in(), BUILT_IN_PUBLICATION_TOPIC) == 0; 00172 #endif // !defined (DDS_HAS_MINIMUM_BIT) 00173 00174 qos_ = qos; 00175 00176 //Note: OK to _duplicate(nil). 00177 listener_ = a_listener; 00178 listener_mask_ = mask; 00179 00180 // Only store the participant pointer, since it is our "grand" 00181 // parent, we will exist as long as it does. 00182 participant_servant_ = participant_servant; 00183 domain_id_ = participant_servant_->get_domain_id(); 00184 00185 publisher_qos_ = publisher_qos; 00186 }
bool OpenDDS::DCPS::ReplayerImpl::lookup_instance_handles | ( | const ReaderIdSeq & | ids, | |
DDS::InstanceHandleSeq & | hdls | |||
) | [private] |
Lookup the instance handles by the subscription repo ids.
Definition at line 1109 of file ReplayerImpl.cpp.
References OpenDDS::DCPS::DCPS_debug_level, and OPENDDS_STRING.
01111 { 01112 if (DCPS_debug_level > 9) { 01113 CORBA::ULong const size = ids.length(); 01114 OPENDDS_STRING separator; 01115 OPENDDS_STRING buffer; 01116 01117 for (unsigned long i = 0; i < size; ++i) { 01118 buffer += separator + OPENDDS_STRING(GuidConverter(ids[i])); 01119 separator = ", "; 01120 } 01121 01122 ACE_DEBUG((LM_DEBUG, 01123 ACE_TEXT("(%P|%t) DataWriterImpl::lookup_instance_handles: ") 01124 ACE_TEXT("searching for handles for reader Ids: %C.\n"), 01125 buffer.c_str())); 01126 } 01127 01128 CORBA::ULong const num_rds = ids.length(); 01129 hdls.length(num_rds); 01130 01131 for (CORBA::ULong i = 0; i < num_rds; ++i) { 01132 hdls[i] = this->participant_servant_->id_to_handle(ids[i]); 01133 } 01134 01135 return true; 01136 }
bool OpenDDS::DCPS::ReplayerImpl::need_sequence_repair | ( | ) | const [private] |
Definition at line 1139 of file ReplayerImpl.cpp.
References reader_info_, and sequence_number_.
Referenced by create_sample_data_message().
01140 { 01141 for (RepoIdToReaderInfoMap::const_iterator it = reader_info_.begin(), 01142 end = reader_info_.end(); it != end; ++it) { 01143 if (it->second.expected_sequence_ != sequence_number_) { 01144 return true; 01145 } 01146 } 01147 return false; 01148 }
void OpenDDS::DCPS::ReplayerImpl::notify_connection_deleted | ( | const RepoId & | ) | [virtual] |
void OpenDDS::DCPS::ReplayerImpl::notify_publication_disconnected | ( | const ReaderIdSeq & | subids | ) | [virtual] |
void OpenDDS::DCPS::ReplayerImpl::notify_publication_lost | ( | const DDS::InstanceHandleSeq & | handles | ) | [private] |
void OpenDDS::DCPS::ReplayerImpl::notify_publication_lost | ( | const ReaderIdSeq & | subids | ) | [virtual] |
void OpenDDS::DCPS::ReplayerImpl::notify_publication_reconnected | ( | const ReaderIdSeq & | subids | ) | [virtual] |
typedef OpenDDS::DCPS::ReplayerImpl::OPENDDS_MAP_CMP | ( | RepoId | , | |
SequenceNumber | , | |||
GUID_tKeyLessThan | ||||
) | [private] |
Flag indicates that the init() is called.
typedef OpenDDS::DCPS::ReplayerImpl::OPENDDS_MAP_CMP | ( | RepoId | , | |
DDS::InstanceHandle_t | , | |||
GUID_tKeyLessThan | ||||
) | [private] |
typedef OpenDDS::DCPS::ReplayerImpl::OPENDDS_MAP_CMP | ( | RepoId | , | |
ReaderInfo | , | |||
GUID_tKeyLessThan | ||||
) | [private] |
DomainParticipantImpl* OpenDDS::DCPS::ReplayerImpl::participant | ( | ) | [inline] |
Definition at line 160 of file ReplayerImpl.h.
Referenced by OpenDDS::DCPS::Service_Participant::delete_replayer(), OpenDDS::DCPS::ReplayerImpl::ReaderInfo::ReaderInfo(), register_for_reader(), and unregister_for_reader().
00160 { 00161 return participant_servant_; 00162 }
void OpenDDS::DCPS::ReplayerImpl::register_for_reader | ( | const RepoId & | participant, | |
const RepoId & | writerid, | |||
const RepoId & | readerid, | |||
const TransportLocatorSeq & | locators, | |||
DiscoveryListener * | listener | |||
) | [virtual] |
Reimplemented from OpenDDS::DCPS::DataWriterCallbacks.
Definition at line 810 of file ReplayerImpl.cpp.
References participant(), and OpenDDS::DCPS::TransportClient::register_for_reader().
00815 { 00816 TransportClient::register_for_reader(participant, writerid, readerid, locators, listener); 00817 }
void OpenDDS::DCPS::ReplayerImpl::remove_all_associations | ( | ) |
Definition at line 765 of file ReplayerImpl.cpp.
References lock_, pending_readers_, readers_, remove_associations(), and OpenDDS::DCPS::TransportClient::stop_associating().
Referenced by cleanup().
00766 { 00767 this->stop_associating(); 00768 00769 OpenDDS::DCPS::ReaderIdSeq readers; 00770 CORBA::ULong size; 00771 CORBA::ULong num_pending_readers; 00772 { 00773 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, lock_); 00774 00775 num_pending_readers = static_cast<CORBA::ULong>(pending_readers_.size()); 00776 size = static_cast<CORBA::ULong>(readers_.size()) + num_pending_readers; 00777 readers.length(size); 00778 00779 RepoIdSet::iterator itEnd = readers_.end(); 00780 int i = 0; 00781 00782 for (RepoIdSet::iterator it = readers_.begin(); it != itEnd; ++it) { 00783 readers[i++] = *it; 00784 } 00785 00786 itEnd = pending_readers_.end(); 00787 for (RepoIdSet::iterator it = pending_readers_.begin(); it != itEnd; ++it) { 00788 readers[i++] = *it; 00789 } 00790 00791 if (num_pending_readers > 0) { 00792 ACE_DEBUG((LM_WARNING, 00793 ACE_TEXT("(%P|%t) WARNING: ReplayerImpl::remove_all_associations() - ") 00794 ACE_TEXT("%d subscribers were pending and never fully associated.\n"), 00795 num_pending_readers)); 00796 } 00797 } 00798 00799 try { 00800 if (0 < size) { 00801 CORBA::Boolean dont_notify_lost = false; 00802 this->remove_associations(readers, dont_notify_lost); 00803 } 00804 00805 } catch (const CORBA::Exception&) { 00806 } 00807 }
void OpenDDS::DCPS::ReplayerImpl::remove_associations | ( | const ReaderIdSeq & | readers, | |
CORBA::Boolean | callback | |||
) | [virtual] |
Implements OpenDDS::DCPS::DataWriterCallbacks.
Definition at line 622 of file ReplayerImpl.cpp.
References DDS::PublicationMatchedStatus::current_count, DDS::PublicationMatchedStatus::current_count_change, OpenDDS::DCPS::DCPS_debug_level, id_to_handle_map_, idToSequence_, OpenDDS::DCPS::RcHandle< T >::in(), is_bit_, DDS::PublicationMatchedStatus::last_subscription_handle, listener_, OPENDDS_STRING, pending_readers_, publication_id_, publication_match_status_, reader_info_, readers_, OpenDDS::DCPS::remove(), OpenDDS::DCPS::TransportClient::stop_associating(), and DDS::PublicationMatchedStatus::total_count_change.
Referenced by remove_all_associations().
00624 { 00625 if (DCPS_debug_level >= 1) { 00626 GuidConverter writer_converter(publication_id_); 00627 GuidConverter reader_converter(readers[0]); 00628 ACE_DEBUG((LM_DEBUG, 00629 ACE_TEXT("(%P|%t) ReplayerImpl::remove_associations: ") 00630 ACE_TEXT("bit %d local %C remote %C num remotes %d\n"), 00631 is_bit_, 00632 OPENDDS_STRING(writer_converter).c_str(), 00633 OPENDDS_STRING(reader_converter).c_str(), 00634 readers.length())); 00635 } 00636 00637 this->stop_associating(readers.get_buffer(), readers.length()); 00638 00639 ReaderIdSeq fully_associated_readers; 00640 CORBA::ULong fully_associated_len = 0; 00641 ReaderIdSeq rds; 00642 CORBA::ULong rds_len = 0; 00643 DDS::InstanceHandleSeq handles; 00644 00645 { 00646 // Ensure the same acquisition order as in wait_for_acknowledgments(). 00647 // ACE_GUARD(ACE_SYNCH_MUTEX, wfaGuard, this->wfaLock_); 00648 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_); 00649 00650 //Remove the readers from fully associated reader list. 00651 //If the supplied reader is not in the cached reader list then it is 00652 //already removed. We just need remove the readers in the list that have 00653 //not been removed. 00654 00655 CORBA::ULong len = readers.length(); 00656 00657 for (CORBA::ULong i = 0; i < len; ++i) { 00658 //Remove the readers from fully associated reader list. If it's not 00659 //in there, the association_complete() is not called yet and remove it 00660 //from pending list. 00661 00662 if (OpenDDS::DCPS::remove(readers_, readers[i]) == 0) { 00663 ++fully_associated_len; 00664 fully_associated_readers.length(fully_associated_len); 00665 fully_associated_readers [fully_associated_len - 1] = readers[i]; 00666 00667 // Remove this reader from the ACK sequence map if its there. 00668 // This is where we need to be holding the wfaLock_ obtained 00669 // above. 00670 RepoIdToSequenceMap::iterator where 00671 = this->idToSequence_.find(readers[i]); 00672 00673 if (where != this->idToSequence_.end()) { 00674 this->idToSequence_.erase(where); 00675 00676 // It is possible that this subscription was causing the wait 00677 // to continue, so give the opportunity to find out. 00678 // this->wfaCondition_.broadcast(); 00679 } 00680 00681 ++rds_len; 00682 rds.length(rds_len); 00683 rds [rds_len - 1] = readers[i]; 00684 00685 } else if (OpenDDS::DCPS::remove(pending_readers_, readers[i]) == 0) { 00686 ++rds_len; 00687 rds.length(rds_len); 00688 rds [rds_len - 1] = readers[i]; 00689 00690 GuidConverter converter(readers[i]); 00691 ACE_DEBUG((LM_WARNING, 00692 ACE_TEXT("(%P|%t) WARNING: ReplayerImpl::remove_associations: ") 00693 ACE_TEXT("removing reader %C before association_complete() call.\n"), 00694 OPENDDS_STRING(converter).c_str())); 00695 } 00696 reader_info_.erase(readers[i]); 00697 //else reader is already removed which indicates remove_association() 00698 //is called multiple times. 00699 } 00700 00701 if (fully_associated_len > 0 && !is_bit_) { 00702 // The reader should be in the id_to_handle map at this time so 00703 // log with error. 00704 if (this->lookup_instance_handles(fully_associated_readers, handles) == false) { 00705 ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: ReplayerImpl::remove_associations: " 00706 "lookup_instance_handles failed, notify %d \n", notify_lost)); 00707 return; 00708 } 00709 00710 for (CORBA::ULong i = 0; i < fully_associated_len; ++i) { 00711 id_to_handle_map_.erase(fully_associated_readers[i]); 00712 } 00713 } 00714 00715 // wfaGuard.release(); 00716 00717 // Mirror the PUBLICATION_MATCHED_STATUS processing from 00718 // association_complete() here. 00719 if (!this->is_bit_) { 00720 00721 // Derive the change in the number of subscriptions reading this writer. 00722 int matchedSubscriptions = 00723 static_cast<int>(this->id_to_handle_map_.size()); 00724 this->publication_match_status_.current_count_change = 00725 matchedSubscriptions - this->publication_match_status_.current_count; 00726 00727 // Only process status if the number of subscriptions has changed. 00728 if (this->publication_match_status_.current_count_change != 0) { 00729 this->publication_match_status_.current_count = matchedSubscriptions; 00730 00731 /// Section 7.1.4.1: total_count will not decrement. 00732 00733 /// @TODO: Reconcile this with the verbiage in section 7.1.4.1 00734 /// TODO: Should rds_len really be fully_associated_len here?? 00735 this->publication_match_status_.last_subscription_handle = 00736 handles[rds_len - 1]; 00737 00738 00739 if (listener_.in()) { 00740 listener_->on_replayer_matched( 00741 this, 00742 this->publication_match_status_); 00743 00744 // Listener consumes the change. 00745 this->publication_match_status_.total_count_change = 0; 00746 this->publication_match_status_.current_count_change = 0; 00747 } 00748 00749 } 00750 } 00751 } 00752 00753 for (CORBA::ULong i = 0; i < rds.length(); ++i) { 00754 this->disassociate(rds[i]); 00755 } 00756 00757 // If this remove_association is invoked when the InfoRepo 00758 // detects a lost reader then make a callback to notify 00759 // subscription lost. 00760 if (notify_lost && handles.length() > 0) { 00761 this->notify_publication_lost(handles); 00762 } 00763 }
virtual void OpenDDS::DCPS::ReplayerImpl::retrieve_inline_qos_data | ( | InlineQosData & | qos_data | ) | const [virtual] |
DDS::ReturnCode_t OpenDDS::DCPS::ReplayerImpl::set_listener | ( | const ReplayerListener_rch & | a_listener, | |
DDS::StatusMask | mask | |||
) | [virtual] |
Change the listener for this Replayer.
Implements OpenDDS::DCPS::Replayer.
Definition at line 289 of file ReplayerImpl.cpp.
References listener_, listener_mask_, and DDS::RETCODE_OK.
00291 { 00292 listener_ = a_listener; 00293 listener_mask_ = mask; 00294 return DDS::RETCODE_OK; 00295 }
DDS::ReturnCode_t OpenDDS::DCPS::ReplayerImpl::set_qos | ( | const ::DDS::PublisherQos & | publisher_qos, | |
const DDS::DataWriterQos & | datawriter_qos | |||
) | [virtual] |
Set the Quality of Service settings for the Replayer.
Implements OpenDDS::DCPS::Replayer.
Definition at line 189 of file ReplayerImpl.cpp.
References OpenDDS::DCPS::Qos_Helper::changeable(), OpenDDS::DCPS::Qos_Helper::consistent(), domain_id_, OpenDDS::DCPS::EntityImpl::enabled_, OpenDDS::DCPS::DomainParticipantImpl::get_id(), OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK, OPENDDS_NO_DURABILITY_SERVICE_COMPATIBILITY_CHECK, OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK, OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK, OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK, OPENDDS_NO_OWNERSHIP_STRENGTH_COMPATIBILITY_CHECK, participant_servant_, publication_id_, publisher_qos_, qos_, DDS::RETCODE_ERROR, DDS::RETCODE_IMMUTABLE_POLICY, DDS::RETCODE_INCONSISTENT_POLICY, DDS::RETCODE_OK, DDS::RETCODE_UNSUPPORTED, TheServiceParticipant, and OpenDDS::DCPS::Qos_Helper::valid().
00191 { 00192 00193 OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK(publisher_qos, DDS::RETCODE_UNSUPPORTED); 00194 00195 if (Qos_Helper::valid(publisher_qos) && Qos_Helper::consistent(publisher_qos)) { 00196 if (publisher_qos_ == publisher_qos) 00197 return DDS::RETCODE_OK; 00198 00199 // for the not changeable qos, it can be changed before enable 00200 if (!Qos_Helper::changeable(publisher_qos_, publisher_qos) && enabled_ == true) { 00201 return DDS::RETCODE_IMMUTABLE_POLICY; 00202 00203 } else { 00204 publisher_qos_ = publisher_qos; 00205 } 00206 } else { 00207 return DDS::RETCODE_INCONSISTENT_POLICY; 00208 } 00209 00210 OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED); 00211 OPENDDS_NO_OWNERSHIP_STRENGTH_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED); 00212 OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED); 00213 OPENDDS_NO_DURABILITY_SERVICE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED); 00214 OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED); 00215 00216 if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) { 00217 if (qos_ == qos) 00218 return DDS::RETCODE_OK; 00219 00220 if (!Qos_Helper::changeable(qos_, qos) && enabled_ == true) { 00221 return DDS::RETCODE_IMMUTABLE_POLICY; 00222 00223 } else { 00224 Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_); 00225 // DDS::PublisherQos publisherQos; 00226 // this->publisher_servant_->get_qos(publisherQos); 00227 DDS::PublisherQos publisherQos = this->publisher_qos_; 00228 const bool status 00229 = disco->update_publication_qos(this->participant_servant_->get_domain_id(), 00230 this->participant_servant_->get_id(), 00231 this->publication_id_, 00232 qos, 00233 publisherQos); 00234 00235 if (!status) { 00236 ACE_ERROR_RETURN((LM_ERROR, 00237 ACE_TEXT("(%P|%t) DataWriterImpl::set_qos, ") 00238 ACE_TEXT("qos not updated. \n")), 00239 DDS::RETCODE_ERROR); 00240 } 00241 } 00242 00243 if (!(qos_ == qos)) { 00244 // Reset the deadline timer if the period has changed. 00245 // if (qos_.deadline.period.sec != qos.deadline.period.sec 00246 // || qos_.deadline.period.nanosec != qos.deadline.period.nanosec) { 00247 // if (qos_.deadline.period.sec == DDS::DURATION_INFINITE_SEC 00248 // && qos_.deadline.period.nanosec == DDS::DURATION_INFINITE_NSEC) { 00249 // ACE_auto_ptr_reset(this->watchdog_, 00250 // new OfferedDeadlineWatchdog( 00251 // this->reactor_, 00252 // this->lock_, 00253 // qos.deadline, 00254 // this, 00255 // this->dw_local_objref_.in(), 00256 // this->offered_deadline_missed_status_, 00257 // this->last_deadline_missed_total_count_)); 00258 // 00259 // } else if (qos.deadline.period.sec == DDS::DURATION_INFINITE_SEC 00260 // && qos.deadline.period.nanosec == DDS::DURATION_INFINITE_NSEC) { 00261 // this->watchdog_->cancel_all(); 00262 // this->watchdog_.reset(); 00263 // 00264 // } else { 00265 // this->watchdog_->reset_interval( 00266 // duration_to_time_value(qos.deadline.period)); 00267 // } 00268 // } 00269 00270 qos_ = qos; 00271 } 00272 00273 return DDS::RETCODE_OK; 00274 00275 } else { 00276 return DDS::RETCODE_INCONSISTENT_POLICY; 00277 } 00278 }
void OpenDDS::DCPS::ReplayerImpl::unregister_for_reader | ( | const RepoId & | participant, | |
const RepoId & | writerid, | |||
const RepoId & | readerid | |||
) | [virtual] |
Reimplemented from OpenDDS::DCPS::DataWriterCallbacks.
Definition at line 820 of file ReplayerImpl.cpp.
References participant(), and OpenDDS::DCPS::TransportClient::unregister_for_reader().
00823 { 00824 TransportClient::unregister_for_reader(participant, writerid, readerid); 00825 }
void OpenDDS::DCPS::ReplayerImpl::update_incompatible_qos | ( | const IncompatibleQosStatus & | status | ) | [virtual] |
Implements OpenDDS::DCPS::DataWriterCallbacks.
Definition at line 828 of file ReplayerImpl.cpp.
References OpenDDS::DCPS::IncompatibleQosStatus::count_since_last_send, OpenDDS::DCPS::IncompatibleQosStatus::last_policy_id, DDS::OfferedIncompatibleQosStatus::last_policy_id, offered_incompatible_qos_status_, OpenDDS::DCPS::IncompatibleQosStatus::policies, DDS::OfferedIncompatibleQosStatus::policies, OpenDDS::DCPS::IncompatibleQosStatus::total_count, DDS::OfferedIncompatibleQosStatus::total_count, and DDS::OfferedIncompatibleQosStatus::total_count_change.
00829 { 00830 00831 00832 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_); 00833 00834 // copy status and increment change 00835 offered_incompatible_qos_status_.total_count = status.total_count; 00836 offered_incompatible_qos_status_.total_count_change += 00837 status.count_since_last_send; 00838 offered_incompatible_qos_status_.last_policy_id = status.last_policy_id; 00839 offered_incompatible_qos_status_.policies = status.policies; 00840 00841 }
void OpenDDS::DCPS::ReplayerImpl::update_subscription_params | ( | const RepoId & | readerId, | |
const DDS::StringSeq & | exprParams | |||
) | [virtual] |
DDS::ReturnCode_t OpenDDS::DCPS::ReplayerImpl::write | ( | const RawDataSample * | sample_array, | |
int | array_size, | |||
DDS::InstanceHandle_t * | reader | |||
) | [private] |
Definition at line 974 of file ReplayerImpl.cpp.
References create_sample_data_message(), DBG_ENTRY_LVL, OpenDDS::DCPS::SendStateDataSampleList::dequeue(), OpenDDS::DCPS::SendStateDataSampleList::enqueue_tail(), OpenDDS::DCPS::DomainParticipantImpl::get_repoid(), OpenDDS::DCPS::GUID_UNKNOWN, participant_servant_, pending_write_count_, publication_id_, reader_info_, DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::RawDataSample::sample_byte_order_, sample_list_element_allocator_, OpenDDS::DCPS::TransportClient::send(), sequence_number_, OpenDDS::DCPS::RawDataSample::source_timestamp_, transport_customized_element_allocator_, and transport_send_element_allocator_.
00977 { 00978 DBG_ENTRY_LVL("ReplayerImpl","write",6); 00979 00980 OpenDDS::DCPS::RepoId repo_id; 00981 if (reader_ih_ptr) { 00982 repo_id = this->participant_servant_->get_repoid(*reader_ih_ptr); 00983 if (repo_id == GUID_UNKNOWN) { 00984 ACE_ERROR_RETURN((LM_ERROR, 00985 ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::write: ") 00986 ACE_TEXT("Invalid reader instance handle (%d)\n"), *reader_ih_ptr), 00987 DDS::RETCODE_ERROR); 00988 } 00989 } 00990 00991 SendStateDataSampleList list; 00992 00993 for (int i = 0; i < num_samples; ++i) { 00994 DataSampleElement* element = 0; 00995 00996 ACE_NEW_MALLOC_RETURN( 00997 element, 00998 static_cast<DataSampleElement*>( 00999 sample_list_element_allocator_->malloc( 01000 sizeof(DataSampleElement))), 01001 DataSampleElement(publication_id_, 01002 this, 01003 0, 01004 transport_send_element_allocator_.get(), 01005 transport_customized_element_allocator_.get()), 01006 DDS::RETCODE_ERROR); 01007 01008 element->get_header().byte_order_ = samples[i].sample_byte_order_; 01009 element->get_header().publication_id_ = this->publication_id_; 01010 list.enqueue_tail(element); 01011 DataSample* temp; 01012 DDS::ReturnCode_t ret = create_sample_data_message(samples[i].sample_->duplicate(), 01013 element->get_header(), 01014 temp, 01015 samples[i].source_timestamp_, 01016 false); 01017 element->set_sample(temp); 01018 if (reader_ih_ptr) { 01019 element->set_num_subs(1); 01020 element->set_sub_id(0, repo_id); 01021 } 01022 01023 if (ret != DDS::RETCODE_OK) { 01024 // we need to free the list 01025 while (list.dequeue(element)) { 01026 ACE_DES_FREE(element, sample_list_element_allocator_->free, DataSampleElement); 01027 } 01028 01029 return ret; 01030 } 01031 } 01032 01033 { 01034 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, DDS::RETCODE_ERROR); 01035 ++pending_write_count_; 01036 } 01037 01038 this->send(list); 01039 01040 for (RepoIdToReaderInfoMap::iterator iter = reader_info_.begin(), 01041 end = reader_info_.end(); iter != end; ++iter) { 01042 iter->second.expected_sequence_ = sequence_number_; 01043 } 01044 01045 return DDS::RETCODE_OK; 01046 }
DDS::ReturnCode_t OpenDDS::DCPS::ReplayerImpl::write | ( | const RawDataSample & | sample | ) | [virtual] |
Send the sample to all associated DataReaders.
Implements OpenDDS::DCPS::Replayer.
Definition at line 1049 of file ReplayerImpl.cpp.
Referenced by write_to_reader().
01050 { 01051 return this->write(&sample, 1, 0); 01052 }
DDS::ReturnCode_t OpenDDS::DCPS::ReplayerImpl::write_to_reader | ( | DDS::InstanceHandle_t | subscription, | |
const RawDataSampleList & | samples | |||
) | [virtual] |
Send the samples to the specified DataReader.
Implements OpenDDS::DCPS::Replayer.
Definition at line 1167 of file ReplayerImpl.cpp.
References DDS::RETCODE_ERROR, and write().
01169 { 01170 if (samples.size()) 01171 return write(&samples[0], static_cast<int>(samples.size()), &subscription); 01172 return DDS::RETCODE_ERROR; 01173 }
DDS::ReturnCode_t OpenDDS::DCPS::ReplayerImpl::write_to_reader | ( | DDS::InstanceHandle_t | subscription, | |
const RawDataSample & | sample | |||
) | [virtual] |
Send the sample to the specified DataReader.
Implements OpenDDS::DCPS::Replayer.
Definition at line 1160 of file ReplayerImpl.cpp.
References write().
01162 { 01163 return write(&sample, 1, &subscription); 01164 }
friend class ::DDS_TEST [friend] |
RepoIdSet OpenDDS::DCPS::ReplayerImpl::assoc_complete_readers_ [private] |
Definition at line 324 of file ReplayerImpl.h.
Referenced by add_association(), and association_complete().
size_t OpenDDS::DCPS::ReplayerImpl::association_chunk_multiplier_ [private] |
The multiplier for allocators affected by associations.
Definition at line 190 of file ReplayerImpl.h.
Referenced by enable().
std::auto_ptr<DataBlockAllocator> OpenDDS::DCPS::ReplayerImpl::db_allocator_ [private] |
Definition at line 279 of file ReplayerImpl.h.
Referenced by create_sample_data_message(), and enable().
ACE_Condition<ACE_Recursive_Thread_Mutex> OpenDDS::DCPS::ReplayerImpl::empty_condition_ [private] |
Definition at line 326 of file ReplayerImpl.h.
Referenced by cleanup(), data_delivered(), and data_dropped().
std::auto_ptr<DataSampleHeaderAllocator> OpenDDS::DCPS::ReplayerImpl::header_allocator_ [private] |
Definition at line 281 of file ReplayerImpl.h.
Referenced by create_sample_data_message(), and enable().
RepoIdToHandleMap OpenDDS::DCPS::ReplayerImpl::id_to_handle_map_ [private] |
Definition at line 250 of file ReplayerImpl.h.
Referenced by association_complete_i(), and remove_associations().
RepoIdToSequenceMap OpenDDS::DCPS::ReplayerImpl::idToSequence_ [private] |
bool OpenDDS::DCPS::ReplayerImpl::is_bit_ [private] |
Timestamp of last write/dispose/assert_liveliness.
Flag indicates that this datawriter is a builtin topic datawriter.
Definition at line 314 of file ReplayerImpl.h.
Referenced by add_association(), association_complete(), association_complete_i(), init(), and remove_associations().
Used to notify the entity for relevant events.
Definition at line 230 of file ReplayerImpl.h.
Referenced by association_complete_i(), get_listener(), init(), remove_associations(), and set_listener().
The StatusKind bit mask indicates which status condition change can be notified by the listener of this entity.
Definition at line 228 of file ReplayerImpl.h.
Referenced by init(), and set_listener().
ACE_Recursive_Thread_Mutex OpenDDS::DCPS::ReplayerImpl::lock_ [private] |
The sample data container.
The lock to protect the activate subscriptions and status changes.
Reimplemented from OpenDDS::DCPS::EntityImpl.
Definition at line 246 of file ReplayerImpl.h.
Referenced by remove_all_associations().
std::auto_ptr<MessageBlockAllocator> OpenDDS::DCPS::ReplayerImpl::mb_allocator_ [private] |
Definition at line 277 of file ReplayerImpl.h.
Referenced by create_sample_data_message(), and enable().
size_t OpenDDS::DCPS::ReplayerImpl::n_chunks_ [private] |
The number of chunks for the cached allocator.
Definition at line 187 of file ReplayerImpl.h.
Referenced by enable().
DDS::OfferedIncompatibleQosStatus OpenDDS::DCPS::ReplayerImpl::offered_incompatible_qos_status_ [private] |
Status conditions.
Definition at line 257 of file ReplayerImpl.h.
Referenced by ReplayerImpl(), and update_incompatible_qos().
The participant servant which creats the publisher that creates this datawriter.
Definition at line 200 of file ReplayerImpl.h.
Referenced by add_association(), association_complete_i(), get_instance_handle(), init(), set_qos(), and write().
RepoIdSet OpenDDS::DCPS::ReplayerImpl::pending_readers_ [private] |
Definition at line 324 of file ReplayerImpl.h.
Referenced by add_association(), association_complete(), remove_all_associations(), and remove_associations().
int OpenDDS::DCPS::ReplayerImpl::pending_write_count_ [private] |
Definition at line 327 of file ReplayerImpl.h.
Referenced by data_delivered(), data_dropped(), and write().
The repository id of this datawriter/publication.
Definition at line 238 of file ReplayerImpl.h.
Referenced by add_association(), cleanup(), data_delivered(), enable(), get_instance_handle(), get_repo_id(), remove_associations(), set_qos(), and write().
Definition at line 258 of file ReplayerImpl.h.
Referenced by association_complete_i(), remove_associations(), and ReplayerImpl().
The qos policy list of this datawriter.
Definition at line 196 of file ReplayerImpl.h.
Referenced by add_association(), create_sample_data_message(), enable(), get_priority_value(), get_qos(), init(), and set_qos().
RepoIdToReaderInfoMap OpenDDS::DCPS::ReplayerImpl::reader_info_ [private] |
Definition at line 211 of file ReplayerImpl.h.
Referenced by add_association(), need_sequence_repair(), remove_associations(), and write().
RepoIdSet OpenDDS::DCPS::ReplayerImpl::readers_ [private] |
Definition at line 252 of file ReplayerImpl.h.
Referenced by association_complete_i(), remove_all_associations(), and remove_associations().
std::auto_ptr<DataSampleElementAllocator> OpenDDS::DCPS::ReplayerImpl::sample_list_element_allocator_ [private] |
The cached allocator to allocate DataSampleElement objects.
Definition at line 285 of file ReplayerImpl.h.
Referenced by data_delivered(), data_dropped(), enable(), and write().
The sequence number unique in DataWriter scope.
Definition at line 240 of file ReplayerImpl.h.
Referenced by create_sample_data_message(), need_sequence_repair(), and write().
RepoId OpenDDS::DCPS::ReplayerImpl::topic_id_ [private] |
The associated topic repository id.
Definition at line 220 of file ReplayerImpl.h.
Referenced by init().
CORBA::String_var OpenDDS::DCPS::ReplayerImpl::topic_name_ [private] |
DDS::Topic_var OpenDDS::DCPS::ReplayerImpl::topic_objref_ [private] |
The topic servant.
Definition at line 224 of file ReplayerImpl.h.
Referenced by cleanup(), enable(), inconsistent_topic(), and init().
std::auto_ptr<TransportCustomizedElementAllocator> OpenDDS::DCPS::ReplayerImpl::transport_customized_element_allocator_ [private] |
std::auto_ptr<TransportSendElementAllocator> OpenDDS::DCPS::ReplayerImpl::transport_send_element_allocator_ [private] |
The allocator for TransportSendElement. The TransportSendElement allocator is put here because it needs the number of chunks information that WriteDataContainer has.
Definition at line 291 of file ReplayerImpl.h.
CORBA::String_var OpenDDS::DCPS::ReplayerImpl::type_name_ [private] |
The type name of associated topic.
Definition at line 193 of file ReplayerImpl.h.
Referenced by init().