#include <StaticDiscovery.h>
Inheritance diagram for OpenDDS::DCPS::StaticEndpointManager:
Public Member Functions | |
StaticEndpointManager (const RepoId &participant_id, ACE_Thread_Mutex &lock, const EndpointRegistry ®istry, StaticParticipant &participant) | |
void | init_bit () |
virtual void | assign_publication_key (RepoId &rid, const RepoId &topicId, const DDS::DataWriterQos &qos) |
virtual void | assign_subscription_key (RepoId &rid, const RepoId &topicId, const DDS::DataReaderQos &qos) |
virtual bool | update_topic_qos (const RepoId &, const DDS::TopicQos &, OPENDDS_STRING &) |
virtual bool | update_publication_qos (const RepoId &, const DDS::DataWriterQos &, const DDS::PublisherQos &) |
virtual bool | update_subscription_qos (const RepoId &, const DDS::DataReaderQos &, const DDS::SubscriberQos &) |
virtual bool | update_subscription_params (const RepoId &, const DDS::StringSeq &) |
virtual void | association_complete (const RepoId &, const RepoId &) |
virtual bool | disassociate (const StaticDiscoveredParticipantData &) |
virtual DDS::ReturnCode_t | add_publication_i (const RepoId &, LocalPublication &) |
virtual DDS::ReturnCode_t | remove_publication_i (const RepoId &) |
virtual DDS::ReturnCode_t | add_subscription_i (const RepoId &, LocalSubscription &) |
virtual DDS::ReturnCode_t | remove_subscription_i (const RepoId &) |
virtual bool | shutting_down () const |
virtual void | populate_transport_locator_sequence (TransportLocatorSeq *&, DiscoveredSubscriptionIter &, const RepoId &) |
virtual void | populate_transport_locator_sequence (TransportLocatorSeq *&, DiscoveredPublicationIter &, const RepoId &) |
virtual bool | defer_writer (const RepoId &, const RepoId &) |
virtual bool | defer_reader (const RepoId &, const RepoId &) |
virtual void | reader_exists (const RepoId &readerid, const RepoId &writerid) |
virtual void | reader_does_not_exist (const RepoId &readerid, const RepoId &writerid) |
virtual void | writer_exists (const RepoId &writerid, const RepoId &readerid) |
virtual void | writer_does_not_exist (const RepoId &writerid, const RepoId &readerid) |
DDS::PublicationBuiltinTopicDataDataReaderImpl * | pub_bit () |
DDS::SubscriptionBuiltinTopicDataDataReaderImpl * | sub_bit () |
Private Attributes | |
const EndpointRegistry & | registry_ |
StaticParticipant & | participant_ |
Definition at line 131 of file StaticDiscovery.h.
OpenDDS::DCPS::StaticEndpointManager::StaticEndpointManager | ( | const RepoId & | participant_id, | |
ACE_Thread_Mutex & | lock, | |||
const EndpointRegistry & | registry, | |||
StaticParticipant & | participant | |||
) |
Definition at line 63 of file StaticDiscovery.cpp.
References OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::pub_bit_key_, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::sub_bit_key_, and DDS::BuiltinTopicKey_t::value.
00067 : EndpointManager<StaticDiscoveredParticipantData>(participant_id, lock) 00068 , registry_(registry) 00069 , participant_(participant) 00070 { 00071 pub_bit_key_.value[0] = pub_bit_key_.value[1] = pub_bit_key_.value[2] = 0; 00072 sub_bit_key_.value[0] = sub_bit_key_.value[1] = sub_bit_key_.value[2] = 0; 00073 }
DDS::ReturnCode_t OpenDDS::DCPS::StaticEndpointManager::add_publication_i | ( | const RepoId & | , | |
LocalPublication & | ||||
) | [virtual] |
Definition at line 289 of file StaticDiscovery.cpp.
References OpenDDS::DCPS::EndpointRegistry::Writer::best_effort_readers, OpenDDS::DCPS::ReaderAssociation::exprParams, OpenDDS::DCPS::ReaderAssociation::filterClassName, OpenDDS::DCPS::ReaderAssociation::filterExpression, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::participant_id_, OpenDDS::DCPS::EndpointRegistry::Reader::qos, OpenDDS::DCPS::EndpointRegistry::reader_map, OpenDDS::DCPS::ReaderAssociation::readerId, OpenDDS::DCPS::ReaderAssociation::readerQos, OpenDDS::DCPS::ReaderAssociation::readerTransInfo, registry_, OpenDDS::DCPS::EndpointRegistry::Writer::reliable_readers, DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::ReaderAssociation::subQos, OpenDDS::DCPS::EndpointRegistry::Reader::subscriber_qos, OpenDDS::DCPS::EndpointRegistry::Reader::trans_info, and OpenDDS::DCPS::EndpointRegistry::writer_map.
00291 { 00292 /* 00293 Find all matching remote readers. 00294 If the reader is best effort, then associate immediately. 00295 If the reader is reliable (we are reliable by implication), register with the transport to receive notification that the remote reader is up. 00296 */ 00297 EndpointRegistry::WriterMapType::const_iterator pos = registry_.writer_map.find(writerid); 00298 if (pos == registry_.writer_map.end()) { 00299 return DDS::RETCODE_ERROR; 00300 } 00301 const EndpointRegistry::Writer& writer = pos->second; 00302 00303 for (RepoIdSet::const_iterator pos = writer.best_effort_readers.begin(), limit = writer.best_effort_readers.end(); 00304 pos != limit; 00305 ++pos) { 00306 const RepoId& readerid = *pos; 00307 const EndpointRegistry::Reader& reader = registry_.reader_map.find(readerid)->second; 00308 00309 #ifdef __SUNPRO_CC 00310 ReaderAssociation ra; 00311 ra.readerTransInfo = reader.trans_info; 00312 ra.readerId = readerid; 00313 ra.subQos = reader.subscriber_qos; 00314 ra.readerQos = reader.qos; 00315 ra.filterClassName = ""; 00316 ra.filterExpression = ""; 00317 ra.exprParams = 0; 00318 #else 00319 const ReaderAssociation ra = 00320 {reader.trans_info, readerid, reader.subscriber_qos, reader.qos, "", "", 0}; 00321 #endif 00322 pub.publication_->add_association(writerid, ra, true); 00323 pub.publication_->association_complete(readerid); 00324 } 00325 00326 for (RepoIdSet::const_iterator pos = writer.reliable_readers.begin(), limit = writer.reliable_readers.end(); 00327 pos != limit; 00328 ++pos) { 00329 const RepoId& readerid = *pos; 00330 const EndpointRegistry::Reader& reader = registry_.reader_map.find(readerid)->second; 00331 pub.publication_->register_for_reader(participant_id_, writerid, readerid, reader.trans_info, this); 00332 } 00333 00334 return DDS::RETCODE_OK; 00335 }
DDS::ReturnCode_t OpenDDS::DCPS::StaticEndpointManager::add_subscription_i | ( | const RepoId & | , | |
LocalSubscription & | ||||
) | [virtual] |
Definition at line 369 of file StaticDiscovery.cpp.
References OpenDDS::DCPS::EndpointRegistry::Reader::best_effort_writers, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::participant_id_, OpenDDS::DCPS::EndpointRegistry::Writer::publisher_qos, OpenDDS::DCPS::WriterAssociation::pubQos, OpenDDS::DCPS::EndpointRegistry::Writer::qos, OpenDDS::DCPS::EndpointRegistry::reader_map, registry_, OpenDDS::DCPS::EndpointRegistry::Reader::reliable_writers, DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::EndpointRegistry::Writer::trans_info, OpenDDS::DCPS::EndpointRegistry::writer_map, OpenDDS::DCPS::WriterAssociation::writerId, OpenDDS::DCPS::WriterAssociation::writerQos, and OpenDDS::DCPS::WriterAssociation::writerTransInfo.
00371 { 00372 /* 00373 Find all matching remote writers. 00374 If we (the reader) is best effort, then associate immediately. 00375 If we (the reader) are reliable, then register with the transport to receive notification that the remote writer is up. 00376 */ 00377 EndpointRegistry::ReaderMapType::const_iterator pos = registry_.reader_map.find(readerid); 00378 if (pos == registry_.reader_map.end()) { 00379 return DDS::RETCODE_ERROR; 00380 } 00381 const EndpointRegistry::Reader& reader = pos->second; 00382 00383 for (RepoIdSet::const_iterator pos = reader.best_effort_writers.begin(), limit = reader.best_effort_writers.end(); 00384 pos != limit; 00385 ++pos) { 00386 const RepoId& writerid = *pos; 00387 const EndpointRegistry::Writer& writer = registry_.writer_map.find(writerid)->second; 00388 00389 #ifdef __SUNPRO_CC 00390 WriterAssociation wa; 00391 wa.writerTransInfo = writer.trans_info; 00392 wa.writerId = writerid; 00393 wa.pubQos = writer.publisher_qos; 00394 wa.writerQos = writer.qos; 00395 #else 00396 const WriterAssociation wa = 00397 {writer.trans_info, writerid, writer.publisher_qos, writer.qos}; 00398 #endif 00399 sub.subscription_->add_association(readerid, wa, false); 00400 } 00401 00402 for (RepoIdSet::const_iterator pos = reader.reliable_writers.begin(), limit = reader.reliable_writers.end(); 00403 pos != limit; 00404 ++pos) { 00405 const RepoId& writerid = *pos; 00406 const EndpointRegistry::Writer& writer = registry_.writer_map.find(writerid)->second; 00407 sub.subscription_->register_for_writer(participant_id_, readerid, writerid, writer.trans_info, this); 00408 } 00409 00410 return DDS::RETCODE_OK; 00411 }
void OpenDDS::DCPS::StaticEndpointManager::assign_publication_key | ( | RepoId & | rid, | |
const RepoId & | topicId, | |||
const DDS::DataWriterQos & | qos | |||
) | [virtual] |
Reimplemented from OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >.
Definition at line 169 of file StaticDiscovery.cpp.
References OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::EntityId_t::entityKey, OpenDDS::DCPS::EntityId_t::entityKind, OpenDDS::DCPS::ENTITYKIND_USER_WRITER_WITH_KEY, registry_, DDS::DataWriterQos::user_data, and OpenDDS::DCPS::EndpointRegistry::writer_map.
00172 { 00173 if (qos.user_data.value.length() != 3) { 00174 ACE_DEBUG((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: StaticEndpointManager::assign_publication_key: no user data to identify writer\n"))); 00175 return; 00176 } 00177 00178 rid.entityId.entityKey[0] = qos.user_data.value[0]; 00179 rid.entityId.entityKey[1] = qos.user_data.value[1]; 00180 rid.entityId.entityKey[2] = qos.user_data.value[2]; 00181 rid.entityId.entityKind = ENTITYKIND_USER_WRITER_WITH_KEY; 00182 00183 if (DCPS_debug_level > 8) { 00184 ACE_DEBUG((LM_INFO, "(%P|%t) looking up writer ID %s\n", 00185 LogGuid(rid).c_str())); 00186 } 00187 00188 EndpointRegistry::WriterMapType::const_iterator pos = registry_.writer_map.find(rid); 00189 if (pos == registry_.writer_map.end()) { 00190 ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: StaticEndpointManager::assign_publication_key: unknown writer: %s\n"), LogGuid(rid).c_str())); 00191 return; 00192 } 00193 00194 DDS::DataWriterQos qos2(qos); 00195 // Qos in registry will not have the user data so overwrite. 00196 qos2.user_data = pos->second.qos.user_data; 00197 00198 DDS::DataWriterQos qos3(pos->second.qos); 00199 00200 if (qos2 != qos3) { 00201 ACE_DEBUG((LM_ERROR, ACE_TEXT("(%P|%t) WARNING: StaticEndpointManager::assign_publication_key: dynamic and static QoS differ\n"))); 00202 } 00203 }
void OpenDDS::DCPS::StaticEndpointManager::assign_subscription_key | ( | RepoId & | rid, | |
const RepoId & | topicId, | |||
const DDS::DataReaderQos & | qos | |||
) | [virtual] |
Reimplemented from OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >.
Definition at line 205 of file StaticDiscovery.cpp.
References OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::EntityId_t::entityKey, OpenDDS::DCPS::EntityId_t::entityKind, OpenDDS::DCPS::ENTITYKIND_USER_READER_WITH_KEY, OpenDDS::DCPS::EndpointRegistry::reader_map, registry_, and DDS::DataReaderQos::user_data.
00208 { 00209 if (qos.user_data.value.length() != 3) { 00210 ACE_DEBUG((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: StaticEndpointManager::assign_subscription_key: no user data to identify reader\n"))); 00211 return; 00212 } 00213 00214 rid.entityId.entityKey[0] = qos.user_data.value[0]; 00215 rid.entityId.entityKey[1] = qos.user_data.value[1]; 00216 rid.entityId.entityKey[2] = qos.user_data.value[2]; 00217 rid.entityId.entityKind = ENTITYKIND_USER_READER_WITH_KEY; 00218 00219 EndpointRegistry::ReaderMapType::const_iterator pos = registry_.reader_map.find(rid); 00220 if (pos == registry_.reader_map.end()) { 00221 ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: StaticEndpointManager::assign_subscription_key: unknown reader: %s\n"), LogGuid(rid).c_str())); 00222 return; 00223 } 00224 00225 DDS::DataReaderQos qos2(qos); 00226 // Qos in registry will not have the user data so overwrite. 00227 qos2.user_data = pos->second.qos.user_data; 00228 00229 if (qos2 != pos->second.qos) { 00230 ACE_DEBUG((LM_ERROR, ACE_TEXT("(%P|%t) WARNING: StaticEndpointManager::assign_subscription_key: dynamic and static QoS differ\n"))); 00231 } 00232 }
void OpenDDS::DCPS::StaticEndpointManager::association_complete | ( | const RepoId & | , | |
const RepoId & | ||||
) | [virtual] |
Implements OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >.
Definition at line 274 of file StaticDiscovery.cpp.
bool OpenDDS::DCPS::StaticEndpointManager::defer_reader | ( | const RepoId & | , | |
const RepoId & | ||||
) | [virtual] |
Implements OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >.
Definition at line 480 of file StaticDiscovery.cpp.
00482 { 00483 // TODO 00484 ACE_DEBUG((LM_NOTICE, ACE_TEXT("(%P|%t) StaticEndpointManager::defer_reader TODO\n"))); 00485 return false; 00486 }
bool OpenDDS::DCPS::StaticEndpointManager::defer_writer | ( | const RepoId & | , | |
const RepoId & | ||||
) | [virtual] |
Implements OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >.
Definition at line 471 of file StaticDiscovery.cpp.
00473 { 00474 ACE_DEBUG((LM_NOTICE, ACE_TEXT("(%P|%t) StaticEndpointManager::defer_writer TODO\n"))); 00475 // TODO 00476 return false; 00477 }
bool OpenDDS::DCPS::StaticEndpointManager::disassociate | ( | const StaticDiscoveredParticipantData & | ) | [virtual] |
Definition at line 281 of file StaticDiscovery.cpp.
00282 { 00283 ACE_DEBUG((LM_NOTICE, ACE_TEXT("(%P|%t) StaticEndpointManager::disassociate TODO\n"))); 00284 // TODO 00285 return false; 00286 }
void OpenDDS::DCPS::StaticEndpointManager::init_bit | ( | ) |
Definition at line 75 of file StaticDiscovery.cpp.
References DDS::SubscriptionBuiltinTopicData::deadline, DDS::PublicationBuiltinTopicData::deadline, DDS::SubscriptionBuiltinTopicData::destination_order, DDS::PublicationBuiltinTopicData::destination_order, DDS::SubscriptionBuiltinTopicData::durability, DDS::PublicationBuiltinTopicData::durability, DDS::PublicationBuiltinTopicData::durability_service, DDS::SubscriptionBuiltinTopicData::group_data, DDS::PublicationBuiltinTopicData::group_data, OpenDDS::DCPS::GUID_t::guidPrefix, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::increment_key(), DDS::SubscriptionBuiltinTopicData::key, DDS::PublicationBuiltinTopicData::key, DDS::SubscriptionBuiltinTopicData::latency_budget, DDS::PublicationBuiltinTopicData::latency_budget, DDS::PublicationBuiltinTopicData::lifespan, DDS::SubscriptionBuiltinTopicData::liveliness, DDS::PublicationBuiltinTopicData::liveliness, DDS::NEW_VIEW_STATE, OPENDDS_STRING, DDS::SubscriptionBuiltinTopicData::ownership, DDS::PublicationBuiltinTopicData::ownership, DDS::PublicationBuiltinTopicData::ownership_strength, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::participant_id_, DDS::SubscriptionBuiltinTopicData::partition, DDS::PublicationBuiltinTopicData::partition, DDS::SubscriptionBuiltinTopicData::presentation, DDS::PublicationBuiltinTopicData::presentation, pub_bit(), OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::pub_bit_key_, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::pub_key_to_id_, OpenDDS::DCPS::EndpointRegistry::reader_map, registry_, DDS::SubscriptionBuiltinTopicData::reliability, DDS::PublicationBuiltinTopicData::reliability, sub_bit(), OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::sub_bit_key_, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::sub_key_to_id_, DDS::SubscriptionBuiltinTopicData::time_based_filter, OpenDDS::DCPS::EndpointRegistry::topic_map, DDS::SubscriptionBuiltinTopicData::topic_name, DDS::PublicationBuiltinTopicData::topic_name, DDS::SubscriptionBuiltinTopicData::type_name, OpenDDS::DCPS::EndpointRegistry::Topic::type_name, DDS::PublicationBuiltinTopicData::type_name, DDS::SubscriptionBuiltinTopicData::user_data, DDS::PublicationBuiltinTopicData::user_data, and OpenDDS::DCPS::EndpointRegistry::writer_map.
Referenced by OpenDDS::DCPS::StaticParticipant::init_bit().
00076 { 00077 // Discover all remote publications and subscriptions. 00078 00079 for (EndpointRegistry::WriterMapType::const_iterator pos = registry_.writer_map.begin(), 00080 limit = registry_.writer_map.end(); 00081 pos != limit; 00082 ++pos) { 00083 const RepoId& remoteid = pos->first; 00084 const EndpointRegistry::Writer& writer = pos->second; 00085 00086 if (!GuidPrefixEqual()(participant_id_.guidPrefix, remoteid.guidPrefix)) { 00087 increment_key(pub_bit_key_); 00088 pub_key_to_id_[pub_bit_key_] = remoteid; 00089 00090 // pos represents a remote. 00091 // Populate data. 00092 DDS::PublicationBuiltinTopicData data; 00093 00094 data.key = pub_bit_key_; 00095 OPENDDS_STRING topic_name = writer.topic_name; 00096 data.topic_name = topic_name.c_str(); 00097 const EndpointRegistry::Topic& topic = registry_.topic_map.find(topic_name)->second; 00098 data.type_name = topic.type_name.c_str(); 00099 data.durability = writer.qos.durability; 00100 data.durability_service = writer.qos.durability_service; 00101 data.deadline = writer.qos.deadline; 00102 data.latency_budget = writer.qos.latency_budget; 00103 data.liveliness = writer.qos.liveliness; 00104 data.reliability = writer.qos.reliability; 00105 data.lifespan = writer.qos.lifespan; 00106 data.user_data = writer.qos.user_data; 00107 data.ownership = writer.qos.ownership; 00108 data.ownership_strength = writer.qos.ownership_strength; 00109 data.destination_order = writer.qos.destination_order; 00110 data.presentation = writer.publisher_qos.presentation; 00111 data.partition = writer.publisher_qos.partition; 00112 // If the TopicQos becomes available, this can be populated. 00113 //data.topic_data = topic_details.qos_.topic_data; 00114 data.group_data = writer.publisher_qos.group_data; 00115 #ifndef DDS_HAS_MINIMUM_BIT 00116 DDS::PublicationBuiltinTopicDataDataReaderImpl* bit = pub_bit(); 00117 if (bit) { // bit may be null if the DomainParticipant is shutting down 00118 bit->store_synthetic_data(data, DDS::NEW_VIEW_STATE); 00119 } 00120 #endif /* DDS_HAS_MINIMUM_BIT */ 00121 } 00122 } 00123 00124 for (EndpointRegistry::ReaderMapType::const_iterator pos = registry_.reader_map.begin(), 00125 limit = registry_.reader_map.end(); 00126 pos != limit; 00127 ++pos) { 00128 const RepoId& remoteid = pos->first; 00129 const EndpointRegistry::Reader& reader = pos->second; 00130 00131 if (!GuidPrefixEqual()(participant_id_.guidPrefix, remoteid.guidPrefix)) { 00132 increment_key(sub_bit_key_); 00133 sub_key_to_id_[sub_bit_key_] = remoteid; 00134 00135 // pos represents a remote. 00136 // Populate data. 00137 DDS::SubscriptionBuiltinTopicData data; 00138 00139 data.key = sub_bit_key_; 00140 OPENDDS_STRING topic_name = reader.topic_name; 00141 data.topic_name = topic_name.c_str(); 00142 const EndpointRegistry::Topic& topic = registry_.topic_map.find(topic_name)->second; 00143 data.type_name = topic.type_name.c_str(); 00144 data.durability = reader.qos.durability; 00145 data.deadline = reader.qos.deadline; 00146 data.latency_budget = reader.qos.latency_budget; 00147 data.liveliness = reader.qos.liveliness; 00148 data.reliability = reader.qos.reliability; 00149 data.ownership = reader.qos.ownership; 00150 data.destination_order = reader.qos.destination_order; 00151 data.user_data = reader.qos.user_data; 00152 data.time_based_filter = reader.qos.time_based_filter; 00153 data.presentation = reader.subscriber_qos.presentation; 00154 data.partition = reader.subscriber_qos.partition; 00155 // // If the TopicQos becomes available, this can be populated. 00156 //data.topic_data = topic_details.qos_.topic_data; 00157 data.group_data = reader.subscriber_qos.group_data; 00158 00159 #ifndef DDS_HAS_MINIMUM_BIT 00160 DDS::SubscriptionBuiltinTopicDataDataReaderImpl* bit = sub_bit(); 00161 if (bit) { // bit may be null if the DomainParticipant is shutting down 00162 bit->store_synthetic_data(data, DDS::NEW_VIEW_STATE); 00163 } 00164 #endif /* DDS_HAS_MINIMUM_BIT */ 00165 } 00166 } 00167 }
void OpenDDS::DCPS::StaticEndpointManager::populate_transport_locator_sequence | ( | TransportLocatorSeq *& | , | |
DiscoveredPublicationIter & | , | |||
const RepoId & | ||||
) | [virtual] |
Definition at line 462 of file StaticDiscovery.cpp.
00465 { 00466 ACE_DEBUG((LM_NOTICE, ACE_TEXT("(%P|%t) StaticEndpointManager::populate_transport_locator_sequence TODO\n"))); 00467 // TODO 00468 }
void OpenDDS::DCPS::StaticEndpointManager::populate_transport_locator_sequence | ( | TransportLocatorSeq *& | , | |
DiscoveredSubscriptionIter & | , | |||
const RepoId & | ||||
) | [virtual] |
Definition at line 453 of file StaticDiscovery.cpp.
00456 { 00457 ACE_DEBUG((LM_NOTICE, ACE_TEXT("(%P|%t) StaticEndpointManager::populate_transport_locator_sequence TODO\n"))); 00458 // TODO 00459 }
DDS::PublicationBuiltinTopicDataDataReaderImpl * OpenDDS::DCPS::StaticEndpointManager::pub_bit | ( | ) |
Definition at line 573 of file StaticDiscovery.cpp.
References OpenDDS::DCPS::LocalParticipant< EndpointManagerType >::bit_subscriber(), OpenDDS::DCPS::BUILT_IN_PUBLICATION_TOPIC, and participant_.
Referenced by init_bit().
00574 { 00575 DDS::Subscriber_var sub = participant_.bit_subscriber(); 00576 if (!sub.in()) 00577 return 0; 00578 00579 DDS::DataReader_var d = sub->lookup_datareader(BUILT_IN_PUBLICATION_TOPIC); 00580 return dynamic_cast<DDS::PublicationBuiltinTopicDataDataReaderImpl*>(d.in()); 00581 }
void OpenDDS::DCPS::StaticEndpointManager::reader_does_not_exist | ( | const RepoId & | readerid, | |
const RepoId & | writerid | |||
) | [virtual] |
Implements OpenDDS::DCPS::DiscoveryListener.
Definition at line 517 of file StaticDiscovery.cpp.
References OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::local_publications_, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::lock_, OpenDDS::DCPS::EndpointRegistry::reader_map, and registry_.
00518 { 00519 ACE_GUARD(ACE_Thread_Mutex, g, lock_); 00520 LocalPublicationMap::const_iterator lp_pos = local_publications_.find(writerid); 00521 EndpointRegistry::ReaderMapType::const_iterator reader_pos = registry_.reader_map.find(readerid); 00522 if (lp_pos != local_publications_.end() && 00523 reader_pos != registry_.reader_map.end()) { 00524 DataWriterCallbacks* dwr = lp_pos->second.publication_; 00525 ReaderIdSeq ids; 00526 ids.length(1); 00527 ids[0] = readerid; 00528 dwr->remove_associations(ids, true); 00529 } 00530 }
void OpenDDS::DCPS::StaticEndpointManager::reader_exists | ( | const RepoId & | readerid, | |
const RepoId & | writerid | |||
) | [virtual] |
Implements OpenDDS::DCPS::DiscoveryListener.
Definition at line 489 of file StaticDiscovery.cpp.
References OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::local_publications_, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::lock_, OpenDDS::DCPS::EndpointRegistry::reader_map, and registry_.
00490 { 00491 ACE_GUARD(ACE_Thread_Mutex, g, lock_); 00492 LocalPublicationMap::const_iterator lp_pos = local_publications_.find(writerid); 00493 EndpointRegistry::ReaderMapType::const_iterator reader_pos = registry_.reader_map.find(readerid); 00494 if (lp_pos != local_publications_.end() && 00495 reader_pos != registry_.reader_map.end()) { 00496 DataWriterCallbacks* dwr = lp_pos->second.publication_; 00497 #ifdef __SUNPRO_CC 00498 ReaderAssociation ra; 00499 ra.readerTransInfo = reader_pos->second.trans_info; 00500 ra.readerId = readerid; 00501 ra.subQos = reader_pos->second.subscriber_qos; 00502 ra.readerQos = reader_pos->second.qos; 00503 ra.filterClassName = ""; 00504 ra.filterExpression = ""; 00505 ra.exprParams = 0; 00506 #else 00507 const ReaderAssociation ra = 00508 {reader_pos->second.trans_info, readerid, reader_pos->second.subscriber_qos, reader_pos->second.qos, "", "", 0}; 00509 00510 #endif 00511 dwr->add_association(writerid, ra, true); 00512 dwr->association_complete(readerid); 00513 } 00514 }
DDS::ReturnCode_t OpenDDS::DCPS::StaticEndpointManager::remove_publication_i | ( | const RepoId & | ) | [virtual] |
Implements OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >.
Definition at line 338 of file StaticDiscovery.cpp.
References OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::local_publications_, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::participant_id_, registry_, OpenDDS::DCPS::EndpointRegistry::Writer::reliable_readers, DDS::RETCODE_ERROR, DDS::RETCODE_OK, and OpenDDS::DCPS::EndpointRegistry::writer_map.
00339 { 00340 LocalPublicationMap::const_iterator lp_pos = local_publications_.find(writerid); 00341 if (lp_pos == local_publications_.end()) { 00342 return DDS::RETCODE_ERROR; 00343 } 00344 00345 const LocalPublication& pub = lp_pos->second; 00346 00347 EndpointRegistry::WriterMapType::const_iterator pos = registry_.writer_map.find(writerid); 00348 if (pos == registry_.writer_map.end()) { 00349 return DDS::RETCODE_ERROR; 00350 } 00351 00352 const EndpointRegistry::Writer& writer = pos->second; 00353 00354 ReaderIdSeq ids; 00355 ids.length((CORBA::ULong)writer.reliable_readers.size()); 00356 CORBA::ULong idx = 0; 00357 for (RepoIdSet::const_iterator pos = writer.reliable_readers.begin(), limit = writer.reliable_readers.end(); 00358 pos != limit; 00359 ++pos, ++idx) { 00360 const RepoId& readerid = *pos; 00361 ids[idx] = readerid; 00362 pub.publication_->unregister_for_reader(participant_id_, writerid, readerid); 00363 } 00364 00365 return DDS::RETCODE_OK; 00366 }
DDS::ReturnCode_t OpenDDS::DCPS::StaticEndpointManager::remove_subscription_i | ( | const RepoId & | ) | [virtual] |
Implements OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >.
Definition at line 414 of file StaticDiscovery.cpp.
References OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::local_subscriptions_, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::participant_id_, OpenDDS::DCPS::EndpointRegistry::reader_map, registry_, OpenDDS::DCPS::EndpointRegistry::Reader::reliable_writers, DDS::RETCODE_ERROR, and DDS::RETCODE_OK.
00415 { 00416 LocalSubscriptionMap::const_iterator ls_pos = local_subscriptions_.find(readerid); 00417 if (ls_pos == local_subscriptions_.end()) { 00418 return DDS::RETCODE_ERROR; 00419 } 00420 00421 const LocalSubscription& sub = ls_pos->second; 00422 00423 EndpointRegistry::ReaderMapType::const_iterator pos = registry_.reader_map.find(readerid); 00424 if (pos == registry_.reader_map.end()) { 00425 return DDS::RETCODE_ERROR; 00426 } 00427 00428 const EndpointRegistry::Reader& reader = pos->second; 00429 00430 WriterIdSeq ids; 00431 ids.length((CORBA::ULong)reader.reliable_writers.size()); 00432 CORBA::ULong idx = 0; 00433 for (RepoIdSet::const_iterator pos = reader.reliable_writers.begin(), limit = reader.reliable_writers.end(); 00434 pos != limit; 00435 ++pos, ++idx) { 00436 const RepoId& writerid = *pos; 00437 ids[idx] = writerid; 00438 sub.subscription_->unregister_for_writer(participant_id_, readerid, writerid); 00439 } 00440 00441 return DDS::RETCODE_OK; 00442 }
bool OpenDDS::DCPS::StaticEndpointManager::shutting_down | ( | ) | const [virtual] |
Implements OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >.
Definition at line 445 of file StaticDiscovery.cpp.
00446 { 00447 ACE_DEBUG((LM_NOTICE, ACE_TEXT("(%P|%t) StaticEndpointManager::shutting_down TODO\n"))); 00448 // TODO 00449 return false; 00450 }
DDS::SubscriptionBuiltinTopicDataDataReaderImpl * OpenDDS::DCPS::StaticEndpointManager::sub_bit | ( | ) |
Definition at line 584 of file StaticDiscovery.cpp.
References OpenDDS::DCPS::LocalParticipant< EndpointManagerType >::bit_subscriber(), OpenDDS::DCPS::BUILT_IN_SUBSCRIPTION_TOPIC, and participant_.
Referenced by init_bit().
00585 { 00586 DDS::Subscriber_var sub = participant_.bit_subscriber(); 00587 if (!sub.in()) 00588 return 0; 00589 00590 DDS::DataReader_var d = sub->lookup_datareader(BUILT_IN_SUBSCRIPTION_TOPIC); 00591 return dynamic_cast<DDS::SubscriptionBuiltinTopicDataDataReaderImpl*>(d.in()); 00592 }
bool OpenDDS::DCPS::StaticEndpointManager::update_publication_qos | ( | const RepoId & | , | |
const DDS::DataWriterQos & | , | |||
const DDS::PublisherQos & | ||||
) | [virtual] |
Implements OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >.
Definition at line 245 of file StaticDiscovery.cpp.
00248 { 00249 ACE_DEBUG((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: StaticEndpointManager::update_publication_qos - ") 00250 ACE_TEXT("Not allowed\n"))); 00251 return false; 00252 }
bool OpenDDS::DCPS::StaticEndpointManager::update_subscription_params | ( | const RepoId & | , | |
const DDS::StringSeq & | ||||
) | [virtual] |
Implements OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >.
Definition at line 265 of file StaticDiscovery.cpp.
00267 { 00268 ACE_DEBUG((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: StaticEndpointManager::update_subscription_qos - ") 00269 ACE_TEXT("Not allowed\n"))); 00270 return false; 00271 }
bool OpenDDS::DCPS::StaticEndpointManager::update_subscription_qos | ( | const RepoId & | , | |
const DDS::DataReaderQos & | , | |||
const DDS::SubscriberQos & | ||||
) | [virtual] |
Implements OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >.
Definition at line 255 of file StaticDiscovery.cpp.
00258 { 00259 ACE_DEBUG((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: StaticEndpointManager::update_subscription_qos - ") 00260 ACE_TEXT("Not allowed\n"))); 00261 return false; 00262 }
bool OpenDDS::DCPS::StaticEndpointManager::update_topic_qos | ( | const RepoId & | , | |
const DDS::TopicQos & | , | |||
OPENDDS_STRING & | ||||
) | [virtual] |
Implements OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >.
Definition at line 235 of file StaticDiscovery.cpp.
00238 { 00239 ACE_DEBUG((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: StaticEndpointManager::update_topic_qos - ") 00240 ACE_TEXT("Not allowed\n"))); 00241 return false; 00242 }
void OpenDDS::DCPS::StaticEndpointManager::writer_does_not_exist | ( | const RepoId & | writerid, | |
const RepoId & | readerid | |||
) | [virtual] |
Implements OpenDDS::DCPS::DiscoveryListener.
Definition at line 556 of file StaticDiscovery.cpp.
References OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::local_subscriptions_, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::lock_, registry_, and OpenDDS::DCPS::EndpointRegistry::writer_map.
00557 { 00558 ACE_GUARD(ACE_Thread_Mutex, g, lock_); 00559 LocalSubscriptionMap::const_iterator ls_pos = local_subscriptions_.find(readerid); 00560 EndpointRegistry::WriterMapType::const_iterator writer_pos = registry_.writer_map.find(writerid); 00561 if (ls_pos != local_subscriptions_.end() && 00562 writer_pos != registry_.writer_map.end()) { 00563 DataReaderCallbacks* drr = ls_pos->second.subscription_; 00564 WriterIdSeq ids; 00565 ids.length(1); 00566 ids[0] = writerid; 00567 drr->remove_associations(ids, true); 00568 } 00569 }
void OpenDDS::DCPS::StaticEndpointManager::writer_exists | ( | const RepoId & | writerid, | |
const RepoId & | readerid | |||
) | [virtual] |
Implements OpenDDS::DCPS::DiscoveryListener.
Definition at line 533 of file StaticDiscovery.cpp.
References OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::local_subscriptions_, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::lock_, registry_, and OpenDDS::DCPS::EndpointRegistry::writer_map.
00534 { 00535 ACE_GUARD(ACE_Thread_Mutex, g, lock_); 00536 LocalSubscriptionMap::const_iterator ls_pos = local_subscriptions_.find(readerid); 00537 EndpointRegistry::WriterMapType::const_iterator writer_pos = registry_.writer_map.find(writerid); 00538 if (ls_pos != local_subscriptions_.end() && 00539 writer_pos != registry_.writer_map.end()) { 00540 DataReaderCallbacks* drr = ls_pos->second.subscription_; 00541 #ifdef __SUNPRO_CC 00542 WriterAssociation wa; 00543 wa.writerTransInfo = writer_pos->second.trans_info; 00544 wa.writerId = writerid; 00545 wa.pubQos = writer_pos->second.publisher_qos; 00546 wa.writerQos = writer_pos->second.qos; 00547 #else 00548 const WriterAssociation wa = 00549 {writer_pos->second.trans_info, writerid, writer_pos->second.publisher_qos, writer_pos->second.qos}; 00550 #endif 00551 drr->add_association(readerid, wa, false); 00552 } 00553 }
const EndpointRegistry& OpenDDS::DCPS::StaticEndpointManager::registry_ [private] |
Definition at line 206 of file StaticDiscovery.h.
Referenced by add_publication_i(), add_subscription_i(), assign_publication_key(), assign_subscription_key(), init_bit(), reader_does_not_exist(), reader_exists(), remove_publication_i(), remove_subscription_i(), writer_does_not_exist(), and writer_exists().