OpenDDS::DCPS::StaticEndpointManager Class Reference

#include <StaticDiscovery.h>

Inheritance diagram for OpenDDS::DCPS::StaticEndpointManager:

Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::StaticEndpointManager:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 StaticEndpointManager (const RepoId &participant_id, ACE_Thread_Mutex &lock, const EndpointRegistry &registry, StaticParticipant &participant)
void init_bit ()
virtual void assign_publication_key (RepoId &rid, const RepoId &topicId, const DDS::DataWriterQos &qos)
virtual void assign_subscription_key (RepoId &rid, const RepoId &topicId, const DDS::DataReaderQos &qos)
virtual bool update_topic_qos (const RepoId &, const DDS::TopicQos &, OPENDDS_STRING &)
virtual bool update_publication_qos (const RepoId &, const DDS::DataWriterQos &, const DDS::PublisherQos &)
virtual bool update_subscription_qos (const RepoId &, const DDS::DataReaderQos &, const DDS::SubscriberQos &)
virtual bool update_subscription_params (const RepoId &, const DDS::StringSeq &)
virtual void association_complete (const RepoId &, const RepoId &)
virtual bool disassociate (const StaticDiscoveredParticipantData &)
virtual DDS::ReturnCode_t add_publication_i (const RepoId &, LocalPublication &)
virtual DDS::ReturnCode_t remove_publication_i (const RepoId &)
virtual DDS::ReturnCode_t add_subscription_i (const RepoId &, LocalSubscription &)
virtual DDS::ReturnCode_t remove_subscription_i (const RepoId &)
virtual bool shutting_down () const
virtual void populate_transport_locator_sequence (TransportLocatorSeq *&, DiscoveredSubscriptionIter &, const RepoId &)
virtual void populate_transport_locator_sequence (TransportLocatorSeq *&, DiscoveredPublicationIter &, const RepoId &)
virtual bool defer_writer (const RepoId &, const RepoId &)
virtual bool defer_reader (const RepoId &, const RepoId &)
virtual void reader_exists (const RepoId &readerid, const RepoId &writerid)
virtual void reader_does_not_exist (const RepoId &readerid, const RepoId &writerid)
virtual void writer_exists (const RepoId &writerid, const RepoId &readerid)
virtual void writer_does_not_exist (const RepoId &writerid, const RepoId &readerid)
DDS::PublicationBuiltinTopicDataDataReaderImpl * pub_bit ()
DDS::SubscriptionBuiltinTopicDataDataReaderImpl * sub_bit ()

Private Attributes

const EndpointRegistryregistry_
StaticParticipantparticipant_

Detailed Description

Definition at line 131 of file StaticDiscovery.h.


Constructor & Destructor Documentation

OpenDDS::DCPS::StaticEndpointManager::StaticEndpointManager ( const RepoId participant_id,
ACE_Thread_Mutex &  lock,
const EndpointRegistry registry,
StaticParticipant participant 
)

Definition at line 63 of file StaticDiscovery.cpp.

References OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::pub_bit_key_, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::sub_bit_key_, and DDS::BuiltinTopicKey_t::value.

00067   : EndpointManager<StaticDiscoveredParticipantData>(participant_id, lock)
00068   , registry_(registry)
00069   , participant_(participant)
00070 {
00071   pub_bit_key_.value[0] = pub_bit_key_.value[1] = pub_bit_key_.value[2] = 0;
00072   sub_bit_key_.value[0] = sub_bit_key_.value[1] = sub_bit_key_.value[2] = 0;
00073 }


Member Function Documentation

DDS::ReturnCode_t OpenDDS::DCPS::StaticEndpointManager::add_publication_i ( const RepoId ,
LocalPublication &   
) [virtual]

Definition at line 289 of file StaticDiscovery.cpp.

References OpenDDS::DCPS::EndpointRegistry::Writer::best_effort_readers, OpenDDS::DCPS::ReaderAssociation::exprParams, OpenDDS::DCPS::ReaderAssociation::filterClassName, OpenDDS::DCPS::ReaderAssociation::filterExpression, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::participant_id_, OpenDDS::DCPS::EndpointRegistry::Reader::qos, OpenDDS::DCPS::EndpointRegistry::reader_map, OpenDDS::DCPS::ReaderAssociation::readerId, OpenDDS::DCPS::ReaderAssociation::readerQos, OpenDDS::DCPS::ReaderAssociation::readerTransInfo, registry_, OpenDDS::DCPS::EndpointRegistry::Writer::reliable_readers, DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::ReaderAssociation::subQos, OpenDDS::DCPS::EndpointRegistry::Reader::subscriber_qos, OpenDDS::DCPS::EndpointRegistry::Reader::trans_info, and OpenDDS::DCPS::EndpointRegistry::writer_map.

00291 {
00292   /*
00293     Find all matching remote readers.
00294     If the reader is best effort, then associate immediately.
00295     If the reader is reliable (we are reliable by implication), register with the transport to receive notification that the remote reader is up.
00296     */
00297   EndpointRegistry::WriterMapType::const_iterator pos = registry_.writer_map.find(writerid);
00298   if (pos == registry_.writer_map.end()) {
00299     return DDS::RETCODE_ERROR;
00300   }
00301   const EndpointRegistry::Writer& writer = pos->second;
00302 
00303   for (RepoIdSet::const_iterator pos = writer.best_effort_readers.begin(), limit = writer.best_effort_readers.end();
00304        pos != limit;
00305        ++pos) {
00306     const RepoId& readerid = *pos;
00307     const EndpointRegistry::Reader& reader = registry_.reader_map.find(readerid)->second;
00308 
00309 #ifdef __SUNPRO_CC
00310     ReaderAssociation ra;
00311     ra.readerTransInfo = reader.trans_info;
00312     ra.readerId = readerid;
00313     ra.subQos = reader.subscriber_qos;
00314     ra.readerQos = reader.qos;
00315     ra.filterClassName = "";
00316     ra.filterExpression = "";
00317     ra.exprParams = 0;
00318 #else
00319     const ReaderAssociation ra =
00320       {reader.trans_info, readerid, reader.subscriber_qos, reader.qos, "", "", 0};
00321 #endif
00322     pub.publication_->add_association(writerid, ra, true);
00323     pub.publication_->association_complete(readerid);
00324   }
00325 
00326   for (RepoIdSet::const_iterator pos = writer.reliable_readers.begin(), limit = writer.reliable_readers.end();
00327        pos != limit;
00328        ++pos) {
00329     const RepoId& readerid = *pos;
00330     const EndpointRegistry::Reader& reader = registry_.reader_map.find(readerid)->second;
00331     pub.publication_->register_for_reader(participant_id_, writerid, readerid, reader.trans_info, this);
00332   }
00333 
00334   return DDS::RETCODE_OK;
00335 }

DDS::ReturnCode_t OpenDDS::DCPS::StaticEndpointManager::add_subscription_i ( const RepoId ,
LocalSubscription &   
) [virtual]

Definition at line 369 of file StaticDiscovery.cpp.

References OpenDDS::DCPS::EndpointRegistry::Reader::best_effort_writers, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::participant_id_, OpenDDS::DCPS::EndpointRegistry::Writer::publisher_qos, OpenDDS::DCPS::WriterAssociation::pubQos, OpenDDS::DCPS::EndpointRegistry::Writer::qos, OpenDDS::DCPS::EndpointRegistry::reader_map, registry_, OpenDDS::DCPS::EndpointRegistry::Reader::reliable_writers, DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::EndpointRegistry::Writer::trans_info, OpenDDS::DCPS::EndpointRegistry::writer_map, OpenDDS::DCPS::WriterAssociation::writerId, OpenDDS::DCPS::WriterAssociation::writerQos, and OpenDDS::DCPS::WriterAssociation::writerTransInfo.

00371 {
00372   /*
00373     Find all matching remote writers.
00374     If we (the reader) is best effort, then associate immediately.
00375     If we (the reader) are reliable, then register with the transport to receive notification that the remote writer is up.
00376     */
00377   EndpointRegistry::ReaderMapType::const_iterator pos = registry_.reader_map.find(readerid);
00378   if (pos == registry_.reader_map.end()) {
00379     return DDS::RETCODE_ERROR;
00380   }
00381   const EndpointRegistry::Reader& reader = pos->second;
00382 
00383   for (RepoIdSet::const_iterator pos = reader.best_effort_writers.begin(), limit = reader.best_effort_writers.end();
00384        pos != limit;
00385        ++pos) {
00386     const RepoId& writerid = *pos;
00387     const EndpointRegistry::Writer& writer = registry_.writer_map.find(writerid)->second;
00388 
00389 #ifdef __SUNPRO_CC
00390     WriterAssociation wa;
00391     wa.writerTransInfo = writer.trans_info;
00392     wa.writerId = writerid;
00393     wa.pubQos = writer.publisher_qos;
00394     wa.writerQos = writer.qos;
00395 #else
00396     const WriterAssociation wa =
00397       {writer.trans_info, writerid, writer.publisher_qos, writer.qos};
00398 #endif
00399     sub.subscription_->add_association(readerid, wa, false);
00400   }
00401 
00402   for (RepoIdSet::const_iterator pos = reader.reliable_writers.begin(), limit = reader.reliable_writers.end();
00403        pos != limit;
00404        ++pos) {
00405     const RepoId& writerid = *pos;
00406     const EndpointRegistry::Writer& writer = registry_.writer_map.find(writerid)->second;
00407     sub.subscription_->register_for_writer(participant_id_, readerid, writerid, writer.trans_info, this);
00408   }
00409 
00410   return DDS::RETCODE_OK;
00411 }

void OpenDDS::DCPS::StaticEndpointManager::assign_publication_key ( RepoId rid,
const RepoId topicId,
const DDS::DataWriterQos qos 
) [virtual]

Reimplemented from OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >.

Definition at line 169 of file StaticDiscovery.cpp.

References OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::EntityId_t::entityKey, OpenDDS::DCPS::EntityId_t::entityKind, OpenDDS::DCPS::ENTITYKIND_USER_WRITER_WITH_KEY, registry_, DDS::DataWriterQos::user_data, and OpenDDS::DCPS::EndpointRegistry::writer_map.

00172 {
00173   if (qos.user_data.value.length() != 3) {
00174     ACE_DEBUG((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: StaticEndpointManager::assign_publication_key: no user data to identify writer\n")));
00175     return;
00176   }
00177 
00178   rid.entityId.entityKey[0] = qos.user_data.value[0];
00179   rid.entityId.entityKey[1] = qos.user_data.value[1];
00180   rid.entityId.entityKey[2] = qos.user_data.value[2];
00181   rid.entityId.entityKind = ENTITYKIND_USER_WRITER_WITH_KEY;
00182 
00183   if (DCPS_debug_level > 8) {
00184     ACE_DEBUG((LM_INFO, "(%P|%t) looking up writer ID %s\n",
00185                LogGuid(rid).c_str()));
00186   }
00187 
00188   EndpointRegistry::WriterMapType::const_iterator pos = registry_.writer_map.find(rid);
00189   if (pos == registry_.writer_map.end()) {
00190     ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: StaticEndpointManager::assign_publication_key: unknown writer: %s\n"), LogGuid(rid).c_str()));
00191     return;
00192   }
00193 
00194   DDS::DataWriterQos qos2(qos);
00195   // Qos in registry will not have the user data so overwrite.
00196   qos2.user_data = pos->second.qos.user_data;
00197 
00198   DDS::DataWriterQos qos3(pos->second.qos);
00199 
00200   if (qos2 != qos3) {
00201     ACE_DEBUG((LM_ERROR, ACE_TEXT("(%P|%t) WARNING: StaticEndpointManager::assign_publication_key: dynamic and static QoS differ\n")));
00202   }
00203 }

void OpenDDS::DCPS::StaticEndpointManager::assign_subscription_key ( RepoId rid,
const RepoId topicId,
const DDS::DataReaderQos qos 
) [virtual]

Reimplemented from OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >.

Definition at line 205 of file StaticDiscovery.cpp.

References OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::EntityId_t::entityKey, OpenDDS::DCPS::EntityId_t::entityKind, OpenDDS::DCPS::ENTITYKIND_USER_READER_WITH_KEY, OpenDDS::DCPS::EndpointRegistry::reader_map, registry_, and DDS::DataReaderQos::user_data.

00208 {
00209   if (qos.user_data.value.length() != 3) {
00210     ACE_DEBUG((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: StaticEndpointManager::assign_subscription_key: no user data to identify reader\n")));
00211     return;
00212   }
00213 
00214   rid.entityId.entityKey[0] = qos.user_data.value[0];
00215   rid.entityId.entityKey[1] = qos.user_data.value[1];
00216   rid.entityId.entityKey[2] = qos.user_data.value[2];
00217   rid.entityId.entityKind = ENTITYKIND_USER_READER_WITH_KEY;
00218 
00219   EndpointRegistry::ReaderMapType::const_iterator pos = registry_.reader_map.find(rid);
00220   if (pos == registry_.reader_map.end()) {
00221     ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: StaticEndpointManager::assign_subscription_key: unknown reader: %s\n"), LogGuid(rid).c_str()));
00222     return;
00223   }
00224 
00225   DDS::DataReaderQos qos2(qos);
00226   // Qos in registry will not have the user data so overwrite.
00227   qos2.user_data = pos->second.qos.user_data;
00228 
00229   if (qos2 != pos->second.qos) {
00230     ACE_DEBUG((LM_ERROR, ACE_TEXT("(%P|%t) WARNING: StaticEndpointManager::assign_subscription_key: dynamic and static QoS differ\n")));
00231   }
00232 }

void OpenDDS::DCPS::StaticEndpointManager::association_complete ( const RepoId ,
const RepoId  
) [virtual]

Implements OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >.

Definition at line 274 of file StaticDiscovery.cpp.

00276 {
00277   // Do nothing.
00278 }

bool OpenDDS::DCPS::StaticEndpointManager::defer_reader ( const RepoId ,
const RepoId  
) [virtual]

Implements OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >.

Definition at line 480 of file StaticDiscovery.cpp.

00482 {
00483   // TODO
00484   ACE_DEBUG((LM_NOTICE, ACE_TEXT("(%P|%t) StaticEndpointManager::defer_reader TODO\n")));
00485   return false;
00486 }

bool OpenDDS::DCPS::StaticEndpointManager::defer_writer ( const RepoId ,
const RepoId  
) [virtual]

Implements OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >.

Definition at line 471 of file StaticDiscovery.cpp.

00473 {
00474   ACE_DEBUG((LM_NOTICE, ACE_TEXT("(%P|%t) StaticEndpointManager::defer_writer TODO\n")));
00475   // TODO
00476   return false;
00477 }

bool OpenDDS::DCPS::StaticEndpointManager::disassociate ( const StaticDiscoveredParticipantData  )  [virtual]

Definition at line 281 of file StaticDiscovery.cpp.

00282 {
00283   ACE_DEBUG((LM_NOTICE, ACE_TEXT("(%P|%t) StaticEndpointManager::disassociate TODO\n")));
00284   // TODO
00285   return false;
00286 }

void OpenDDS::DCPS::StaticEndpointManager::init_bit (  ) 

Definition at line 75 of file StaticDiscovery.cpp.

References DDS::SubscriptionBuiltinTopicData::deadline, DDS::PublicationBuiltinTopicData::deadline, DDS::SubscriptionBuiltinTopicData::destination_order, DDS::PublicationBuiltinTopicData::destination_order, DDS::SubscriptionBuiltinTopicData::durability, DDS::PublicationBuiltinTopicData::durability, DDS::PublicationBuiltinTopicData::durability_service, DDS::SubscriptionBuiltinTopicData::group_data, DDS::PublicationBuiltinTopicData::group_data, OpenDDS::DCPS::GUID_t::guidPrefix, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::increment_key(), DDS::SubscriptionBuiltinTopicData::key, DDS::PublicationBuiltinTopicData::key, DDS::SubscriptionBuiltinTopicData::latency_budget, DDS::PublicationBuiltinTopicData::latency_budget, DDS::PublicationBuiltinTopicData::lifespan, DDS::SubscriptionBuiltinTopicData::liveliness, DDS::PublicationBuiltinTopicData::liveliness, DDS::NEW_VIEW_STATE, OPENDDS_STRING, DDS::SubscriptionBuiltinTopicData::ownership, DDS::PublicationBuiltinTopicData::ownership, DDS::PublicationBuiltinTopicData::ownership_strength, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::participant_id_, DDS::SubscriptionBuiltinTopicData::partition, DDS::PublicationBuiltinTopicData::partition, DDS::SubscriptionBuiltinTopicData::presentation, DDS::PublicationBuiltinTopicData::presentation, pub_bit(), OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::pub_bit_key_, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::pub_key_to_id_, OpenDDS::DCPS::EndpointRegistry::reader_map, registry_, DDS::SubscriptionBuiltinTopicData::reliability, DDS::PublicationBuiltinTopicData::reliability, sub_bit(), OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::sub_bit_key_, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::sub_key_to_id_, DDS::SubscriptionBuiltinTopicData::time_based_filter, OpenDDS::DCPS::EndpointRegistry::topic_map, DDS::SubscriptionBuiltinTopicData::topic_name, DDS::PublicationBuiltinTopicData::topic_name, DDS::SubscriptionBuiltinTopicData::type_name, OpenDDS::DCPS::EndpointRegistry::Topic::type_name, DDS::PublicationBuiltinTopicData::type_name, DDS::SubscriptionBuiltinTopicData::user_data, DDS::PublicationBuiltinTopicData::user_data, and OpenDDS::DCPS::EndpointRegistry::writer_map.

Referenced by OpenDDS::DCPS::StaticParticipant::init_bit().

00076 {
00077   // Discover all remote publications and subscriptions.
00078 
00079   for (EndpointRegistry::WriterMapType::const_iterator pos = registry_.writer_map.begin(),
00080          limit = registry_.writer_map.end();
00081        pos != limit;
00082        ++pos) {
00083     const RepoId& remoteid = pos->first;
00084     const EndpointRegistry::Writer& writer = pos->second;
00085 
00086     if (!GuidPrefixEqual()(participant_id_.guidPrefix, remoteid.guidPrefix)) {
00087       increment_key(pub_bit_key_);
00088       pub_key_to_id_[pub_bit_key_] = remoteid;
00089 
00090       // pos represents a remote.
00091       // Populate data.
00092       DDS::PublicationBuiltinTopicData data;
00093 
00094       data.key = pub_bit_key_;
00095       OPENDDS_STRING topic_name = writer.topic_name;
00096       data.topic_name = topic_name.c_str();
00097       const EndpointRegistry::Topic& topic = registry_.topic_map.find(topic_name)->second;
00098       data.type_name = topic.type_name.c_str();
00099       data.durability = writer.qos.durability;
00100       data.durability_service = writer.qos.durability_service;
00101       data.deadline = writer.qos.deadline;
00102       data.latency_budget = writer.qos.latency_budget;
00103       data.liveliness = writer.qos.liveliness;
00104       data.reliability = writer.qos.reliability;
00105       data.lifespan = writer.qos.lifespan;
00106       data.user_data = writer.qos.user_data;
00107       data.ownership = writer.qos.ownership;
00108       data.ownership_strength = writer.qos.ownership_strength;
00109       data.destination_order = writer.qos.destination_order;
00110       data.presentation = writer.publisher_qos.presentation;
00111       data.partition = writer.publisher_qos.partition;
00112       // If the TopicQos becomes available, this can be populated.
00113       //data.topic_data = topic_details.qos_.topic_data;
00114       data.group_data = writer.publisher_qos.group_data;
00115 #ifndef DDS_HAS_MINIMUM_BIT
00116       DDS::PublicationBuiltinTopicDataDataReaderImpl* bit = pub_bit();
00117       if (bit) { // bit may be null if the DomainParticipant is shutting down
00118         bit->store_synthetic_data(data, DDS::NEW_VIEW_STATE);
00119       }
00120 #endif /* DDS_HAS_MINIMUM_BIT */
00121     }
00122   }
00123 
00124   for (EndpointRegistry::ReaderMapType::const_iterator pos = registry_.reader_map.begin(),
00125          limit = registry_.reader_map.end();
00126        pos != limit;
00127        ++pos) {
00128     const RepoId& remoteid = pos->first;
00129     const EndpointRegistry::Reader& reader = pos->second;
00130 
00131     if (!GuidPrefixEqual()(participant_id_.guidPrefix, remoteid.guidPrefix)) {
00132       increment_key(sub_bit_key_);
00133       sub_key_to_id_[sub_bit_key_] = remoteid;
00134 
00135       // pos represents a remote.
00136       // Populate data.
00137       DDS::SubscriptionBuiltinTopicData data;
00138 
00139       data.key = sub_bit_key_;
00140       OPENDDS_STRING topic_name = reader.topic_name;
00141       data.topic_name = topic_name.c_str();
00142       const EndpointRegistry::Topic& topic = registry_.topic_map.find(topic_name)->second;
00143       data.type_name = topic.type_name.c_str();
00144       data.durability = reader.qos.durability;
00145       data.deadline = reader.qos.deadline;
00146       data.latency_budget = reader.qos.latency_budget;
00147       data.liveliness = reader.qos.liveliness;
00148       data.reliability = reader.qos.reliability;
00149       data.ownership = reader.qos.ownership;
00150       data.destination_order = reader.qos.destination_order;
00151       data.user_data = reader.qos.user_data;
00152       data.time_based_filter = reader.qos.time_based_filter;
00153       data.presentation = reader.subscriber_qos.presentation;
00154       data.partition = reader.subscriber_qos.partition;
00155       // // If the TopicQos becomes available, this can be populated.
00156       //data.topic_data = topic_details.qos_.topic_data;
00157       data.group_data = reader.subscriber_qos.group_data;
00158 
00159 #ifndef DDS_HAS_MINIMUM_BIT
00160       DDS::SubscriptionBuiltinTopicDataDataReaderImpl* bit = sub_bit();
00161       if (bit) { // bit may be null if the DomainParticipant is shutting down
00162         bit->store_synthetic_data(data, DDS::NEW_VIEW_STATE);
00163       }
00164 #endif /* DDS_HAS_MINIMUM_BIT */
00165     }
00166   }
00167 }

void OpenDDS::DCPS::StaticEndpointManager::populate_transport_locator_sequence ( TransportLocatorSeq *&  ,
DiscoveredPublicationIter ,
const RepoId  
) [virtual]

Definition at line 462 of file StaticDiscovery.cpp.

00465 {
00466   ACE_DEBUG((LM_NOTICE, ACE_TEXT("(%P|%t) StaticEndpointManager::populate_transport_locator_sequence TODO\n")));
00467   // TODO
00468 }

void OpenDDS::DCPS::StaticEndpointManager::populate_transport_locator_sequence ( TransportLocatorSeq *&  ,
DiscoveredSubscriptionIter ,
const RepoId  
) [virtual]

Definition at line 453 of file StaticDiscovery.cpp.

00456 {
00457   ACE_DEBUG((LM_NOTICE, ACE_TEXT("(%P|%t) StaticEndpointManager::populate_transport_locator_sequence TODO\n")));
00458   // TODO
00459 }

DDS::PublicationBuiltinTopicDataDataReaderImpl * OpenDDS::DCPS::StaticEndpointManager::pub_bit (  ) 

Definition at line 573 of file StaticDiscovery.cpp.

References OpenDDS::DCPS::LocalParticipant< EndpointManagerType >::bit_subscriber(), OpenDDS::DCPS::BUILT_IN_PUBLICATION_TOPIC, and participant_.

Referenced by init_bit().

00574 {
00575   DDS::Subscriber_var sub = participant_.bit_subscriber();
00576   if (!sub.in())
00577     return 0;
00578 
00579   DDS::DataReader_var d = sub->lookup_datareader(BUILT_IN_PUBLICATION_TOPIC);
00580   return dynamic_cast<DDS::PublicationBuiltinTopicDataDataReaderImpl*>(d.in());
00581 }

void OpenDDS::DCPS::StaticEndpointManager::reader_does_not_exist ( const RepoId readerid,
const RepoId writerid 
) [virtual]

Implements OpenDDS::DCPS::DiscoveryListener.

Definition at line 517 of file StaticDiscovery.cpp.

References OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::local_publications_, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::lock_, OpenDDS::DCPS::EndpointRegistry::reader_map, and registry_.

00518 {
00519   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00520   LocalPublicationMap::const_iterator lp_pos = local_publications_.find(writerid);
00521   EndpointRegistry::ReaderMapType::const_iterator reader_pos = registry_.reader_map.find(readerid);
00522   if (lp_pos != local_publications_.end() &&
00523       reader_pos != registry_.reader_map.end()) {
00524     DataWriterCallbacks* dwr = lp_pos->second.publication_;
00525     ReaderIdSeq ids;
00526     ids.length(1);
00527     ids[0] = readerid;
00528     dwr->remove_associations(ids, true);
00529   }
00530 }

void OpenDDS::DCPS::StaticEndpointManager::reader_exists ( const RepoId readerid,
const RepoId writerid 
) [virtual]

Implements OpenDDS::DCPS::DiscoveryListener.

Definition at line 489 of file StaticDiscovery.cpp.

References OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::local_publications_, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::lock_, OpenDDS::DCPS::EndpointRegistry::reader_map, and registry_.

00490 {
00491   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00492   LocalPublicationMap::const_iterator lp_pos = local_publications_.find(writerid);
00493   EndpointRegistry::ReaderMapType::const_iterator reader_pos = registry_.reader_map.find(readerid);
00494   if (lp_pos != local_publications_.end() &&
00495       reader_pos != registry_.reader_map.end()) {
00496     DataWriterCallbacks* dwr = lp_pos->second.publication_;
00497 #ifdef __SUNPRO_CC
00498     ReaderAssociation ra;
00499     ra.readerTransInfo = reader_pos->second.trans_info;
00500     ra.readerId = readerid;
00501     ra.subQos = reader_pos->second.subscriber_qos;
00502     ra.readerQos = reader_pos->second.qos;
00503     ra.filterClassName = "";
00504     ra.filterExpression = "";
00505     ra.exprParams = 0;
00506 #else
00507     const ReaderAssociation ra =
00508       {reader_pos->second.trans_info, readerid, reader_pos->second.subscriber_qos, reader_pos->second.qos, "", "", 0};
00509 
00510 #endif
00511     dwr->add_association(writerid, ra, true);
00512     dwr->association_complete(readerid);
00513   }
00514 }

DDS::ReturnCode_t OpenDDS::DCPS::StaticEndpointManager::remove_publication_i ( const RepoId  )  [virtual]

Implements OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >.

Definition at line 338 of file StaticDiscovery.cpp.

References OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::local_publications_, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::participant_id_, registry_, OpenDDS::DCPS::EndpointRegistry::Writer::reliable_readers, DDS::RETCODE_ERROR, DDS::RETCODE_OK, and OpenDDS::DCPS::EndpointRegistry::writer_map.

00339 {
00340   LocalPublicationMap::const_iterator lp_pos = local_publications_.find(writerid);
00341   if (lp_pos == local_publications_.end()) {
00342     return DDS::RETCODE_ERROR;
00343   }
00344 
00345   const LocalPublication& pub = lp_pos->second;
00346 
00347   EndpointRegistry::WriterMapType::const_iterator pos = registry_.writer_map.find(writerid);
00348   if (pos == registry_.writer_map.end()) {
00349     return DDS::RETCODE_ERROR;
00350   }
00351 
00352   const EndpointRegistry::Writer& writer = pos->second;
00353 
00354   ReaderIdSeq ids;
00355   ids.length((CORBA::ULong)writer.reliable_readers.size());
00356   CORBA::ULong idx = 0;
00357   for (RepoIdSet::const_iterator pos = writer.reliable_readers.begin(), limit = writer.reliable_readers.end();
00358         pos != limit;
00359         ++pos, ++idx) {
00360     const RepoId& readerid = *pos;
00361     ids[idx] = readerid;
00362     pub.publication_->unregister_for_reader(participant_id_, writerid, readerid);
00363   }
00364 
00365   return DDS::RETCODE_OK;
00366 }

DDS::ReturnCode_t OpenDDS::DCPS::StaticEndpointManager::remove_subscription_i ( const RepoId  )  [virtual]

Implements OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >.

Definition at line 414 of file StaticDiscovery.cpp.

References OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::local_subscriptions_, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::participant_id_, OpenDDS::DCPS::EndpointRegistry::reader_map, registry_, OpenDDS::DCPS::EndpointRegistry::Reader::reliable_writers, DDS::RETCODE_ERROR, and DDS::RETCODE_OK.

00415 {
00416   LocalSubscriptionMap::const_iterator ls_pos = local_subscriptions_.find(readerid);
00417   if (ls_pos == local_subscriptions_.end()) {
00418     return DDS::RETCODE_ERROR;
00419   }
00420 
00421   const LocalSubscription& sub = ls_pos->second;
00422 
00423   EndpointRegistry::ReaderMapType::const_iterator pos = registry_.reader_map.find(readerid);
00424   if (pos == registry_.reader_map.end()) {
00425     return DDS::RETCODE_ERROR;
00426   }
00427 
00428   const EndpointRegistry::Reader& reader = pos->second;
00429 
00430   WriterIdSeq ids;
00431   ids.length((CORBA::ULong)reader.reliable_writers.size());
00432   CORBA::ULong idx = 0;
00433   for (RepoIdSet::const_iterator pos = reader.reliable_writers.begin(), limit = reader.reliable_writers.end();
00434         pos != limit;
00435         ++pos, ++idx) {
00436     const RepoId& writerid = *pos;
00437     ids[idx] = writerid;
00438     sub.subscription_->unregister_for_writer(participant_id_, readerid, writerid);
00439   }
00440 
00441   return DDS::RETCODE_OK;
00442 }

bool OpenDDS::DCPS::StaticEndpointManager::shutting_down (  )  const [virtual]

Implements OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >.

Definition at line 445 of file StaticDiscovery.cpp.

00446 {
00447   ACE_DEBUG((LM_NOTICE, ACE_TEXT("(%P|%t) StaticEndpointManager::shutting_down TODO\n")));
00448   // TODO
00449   return false;
00450 }

DDS::SubscriptionBuiltinTopicDataDataReaderImpl * OpenDDS::DCPS::StaticEndpointManager::sub_bit (  ) 

Definition at line 584 of file StaticDiscovery.cpp.

References OpenDDS::DCPS::LocalParticipant< EndpointManagerType >::bit_subscriber(), OpenDDS::DCPS::BUILT_IN_SUBSCRIPTION_TOPIC, and participant_.

Referenced by init_bit().

00585 {
00586   DDS::Subscriber_var sub = participant_.bit_subscriber();
00587   if (!sub.in())
00588     return 0;
00589 
00590   DDS::DataReader_var d = sub->lookup_datareader(BUILT_IN_SUBSCRIPTION_TOPIC);
00591   return dynamic_cast<DDS::SubscriptionBuiltinTopicDataDataReaderImpl*>(d.in());
00592 }

bool OpenDDS::DCPS::StaticEndpointManager::update_publication_qos ( const RepoId ,
const DDS::DataWriterQos ,
const DDS::PublisherQos  
) [virtual]

Implements OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >.

Definition at line 245 of file StaticDiscovery.cpp.

00248 {
00249   ACE_DEBUG((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: StaticEndpointManager::update_publication_qos - ")
00250              ACE_TEXT("Not allowed\n")));
00251   return false;
00252 }

bool OpenDDS::DCPS::StaticEndpointManager::update_subscription_params ( const RepoId ,
const DDS::StringSeq  
) [virtual]

Implements OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >.

Definition at line 265 of file StaticDiscovery.cpp.

00267 {
00268   ACE_DEBUG((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: StaticEndpointManager::update_subscription_qos - ")
00269              ACE_TEXT("Not allowed\n")));
00270   return false;
00271 }

bool OpenDDS::DCPS::StaticEndpointManager::update_subscription_qos ( const RepoId ,
const DDS::DataReaderQos ,
const DDS::SubscriberQos  
) [virtual]

Implements OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >.

Definition at line 255 of file StaticDiscovery.cpp.

00258 {
00259   ACE_DEBUG((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: StaticEndpointManager::update_subscription_qos - ")
00260              ACE_TEXT("Not allowed\n")));
00261   return false;
00262 }

bool OpenDDS::DCPS::StaticEndpointManager::update_topic_qos ( const RepoId ,
const DDS::TopicQos ,
OPENDDS_STRING &   
) [virtual]

Implements OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >.

Definition at line 235 of file StaticDiscovery.cpp.

00238 {
00239   ACE_DEBUG((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: StaticEndpointManager::update_topic_qos - ")
00240              ACE_TEXT("Not allowed\n")));
00241   return false;
00242 }

void OpenDDS::DCPS::StaticEndpointManager::writer_does_not_exist ( const RepoId writerid,
const RepoId readerid 
) [virtual]

Implements OpenDDS::DCPS::DiscoveryListener.

Definition at line 556 of file StaticDiscovery.cpp.

References OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::local_subscriptions_, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::lock_, registry_, and OpenDDS::DCPS::EndpointRegistry::writer_map.

00557 {
00558   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00559   LocalSubscriptionMap::const_iterator ls_pos = local_subscriptions_.find(readerid);
00560   EndpointRegistry::WriterMapType::const_iterator writer_pos = registry_.writer_map.find(writerid);
00561   if (ls_pos != local_subscriptions_.end() &&
00562       writer_pos != registry_.writer_map.end()) {
00563     DataReaderCallbacks* drr = ls_pos->second.subscription_;
00564     WriterIdSeq ids;
00565     ids.length(1);
00566     ids[0] = writerid;
00567     drr->remove_associations(ids, true);
00568   }
00569 }

void OpenDDS::DCPS::StaticEndpointManager::writer_exists ( const RepoId writerid,
const RepoId readerid 
) [virtual]

Implements OpenDDS::DCPS::DiscoveryListener.

Definition at line 533 of file StaticDiscovery.cpp.

References OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::local_subscriptions_, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::lock_, registry_, and OpenDDS::DCPS::EndpointRegistry::writer_map.

00534 {
00535   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00536   LocalSubscriptionMap::const_iterator ls_pos = local_subscriptions_.find(readerid);
00537   EndpointRegistry::WriterMapType::const_iterator writer_pos = registry_.writer_map.find(writerid);
00538   if (ls_pos != local_subscriptions_.end() &&
00539       writer_pos != registry_.writer_map.end()) {
00540     DataReaderCallbacks* drr = ls_pos->second.subscription_;
00541 #ifdef __SUNPRO_CC
00542     WriterAssociation wa;
00543     wa.writerTransInfo = writer_pos->second.trans_info;
00544     wa.writerId = writerid;
00545     wa.pubQos = writer_pos->second.publisher_qos;
00546     wa.writerQos = writer_pos->second.qos;
00547 #else
00548     const WriterAssociation wa =
00549       {writer_pos->second.trans_info, writerid, writer_pos->second.publisher_qos, writer_pos->second.qos};
00550 #endif
00551     drr->add_association(readerid, wa, false);
00552   }
00553 }


Member Data Documentation

StaticParticipant& OpenDDS::DCPS::StaticEndpointManager::participant_ [private]

Definition at line 207 of file StaticDiscovery.h.

Referenced by pub_bit(), and sub_bit().

const EndpointRegistry& OpenDDS::DCPS::StaticEndpointManager::registry_ [private]

Definition at line 206 of file StaticDiscovery.h.

Referenced by add_publication_i(), add_subscription_i(), assign_publication_key(), assign_subscription_key(), init_bit(), reader_does_not_exist(), reader_exists(), remove_publication_i(), remove_subscription_i(), writer_does_not_exist(), and writer_exists().


The documentation for this class was generated from the following files:
Generated on Fri Feb 12 20:06:25 2016 for OpenDDS by  doxygen 1.4.7