OpenDDS::RTPS::Sedp Class Reference

#include <Sedp.h>

Inheritance diagram for OpenDDS::RTPS::Sedp:

Inheritance graph
[legend]
Collaboration diagram for OpenDDS::RTPS::Sedp:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 Sedp (const DCPS::RepoId &participant_id, Spdp &owner, ACE_Thread_Mutex &lock)
DDS::ReturnCode_t init (const DCPS::RepoId &guid, const RtpsDiscovery &disco, DDS::DomainId_t domainId)
void acknowledge ()
 request for acknowledgement from all Sedp threads (Task)
void shutdown ()
void unicast_locators (OpenDDS::DCPS::LocatorSeq &locators) const
const ACE_INET_Addr & local_address () const
const ACE_INET_Addr & multicast_group () const
bool map_ipv4_to_ipv6 () const
void associate (const SPDPdiscoveredParticipantData &pdata)
bool disassociate (const SPDPdiscoveredParticipantData &pdata)
bool update_topic_qos (const DCPS::RepoId &topicId, const DDS::TopicQos &qos, OPENDDS_STRING &name)
bool update_publication_qos (const DCPS::RepoId &publicationId, const DDS::DataWriterQos &qos, const DDS::PublisherQos &publisherQos)
bool update_subscription_qos (const DCPS::RepoId &subscriptionId, const DDS::DataReaderQos &qos, const DDS::SubscriberQos &subscriberQos)
bool update_subscription_params (const DCPS::RepoId &subId, const DDS::StringSeq &params)
void association_complete (const DCPS::RepoId &localId, const DCPS::RepoId &remoteId)
void signal_liveliness (DDS::LivelinessQosPolicyKind kind)
template<typename Map>
void remove_entities_belonging_to (Map &m, RepoId participant)

Static Public Attributes

static const bool host_is_bigendian_

Private Types

typedef DCPS::RcHandle< ReaderReader_rch
typedef LocalParticipantMessageMap::iterator LocalParticipantMessageIter
typedef LocalParticipantMessageMap::const_iterator LocalParticipantMessageCIter
typedef std::pair< DCPS::MessageId,
OpenDDS::DCPS::DiscoveredWriterData
MsgIdWtrDataPair
typedef std::pair< DCPS::MessageId,
OpenDDS::DCPS::DiscoveredReaderData
MsgIdRdrDataPair

Private Member Functions

DDS::TopicBuiltinTopicDataDataReaderImpl * topic_bit ()
DDS::PublicationBuiltinTopicDataDataReaderImpl * pub_bit ()
DDS::SubscriptionBuiltinTopicDataDataReaderImpl * sub_bit ()
void populate_discovered_writer_msg (OpenDDS::DCPS::DiscoveredWriterData &dwd, const DCPS::RepoId &publication_id, const LocalPublication &pub)
void populate_discovered_reader_msg (OpenDDS::DCPS::DiscoveredReaderData &drd, const DCPS::RepoId &subscription_id, const LocalSubscription &sub)
typedef OPENDDS_MAP_CMP (DCPS::RepoId, LocalParticipantMessage, DCPS::GUID_tKeyLessThan) LocalParticipantMessageMap
void data_received (DCPS::MessageId message_id, const OpenDDS::DCPS::DiscoveredWriterData &wdata)
void data_received (DCPS::MessageId message_id, const OpenDDS::DCPS::DiscoveredReaderData &rdata)
void data_received (DCPS::MessageId message_id, const ParticipantMessageData &data)
typedef OPENDDS_MAP_CMP (DCPS::RepoId, MsgIdWtrDataPair, DCPS::GUID_tKeyLessThan) DeferredPublicationMap
typedef OPENDDS_MAP_CMP (DCPS::RepoId, MsgIdRdrDataPair, DCPS::GUID_tKeyLessThan) DeferredSubscriptionMap
void assign_bit_key (DiscoveredPublication &pub)
void assign_bit_key (DiscoveredSubscription &sub)
template<typename Map>
void remove_entities_belonging_to (Map &m, DCPS::RepoId participant)
void remove_from_bit_i (const DiscoveredPublication &pub)
void remove_from_bit_i (const DiscoveredSubscription &sub)
virtual DDS::ReturnCode_t remove_publication_i (const DCPS::RepoId &publicationId)
virtual DDS::ReturnCode_t remove_subscription_i (const DCPS::RepoId &subscriptionId)
void inconsistent_topic (const DCPS::RepoIdSet &endpoints) const
virtual bool shutting_down () const
virtual void populate_transport_locator_sequence (DCPS::TransportLocatorSeq *&tls, DiscoveredSubscriptionIter &iter, const DCPS::RepoId &reader)
virtual void populate_transport_locator_sequence (DCPS::TransportLocatorSeq *&tls, DiscoveredPublicationIter &iter, const DCPS::RepoId &writer)
virtual bool defer_writer (const DCPS::RepoId &writer, const DCPS::RepoId &writer_participant)
virtual bool defer_reader (const DCPS::RepoId &reader, const DCPS::RepoId &reader_participant)
void write_durable_publication_data (const DCPS::RepoId &reader)
void write_durable_subscription_data (const DCPS::RepoId &reader)
void write_durable_participant_message_data (const DCPS::RepoId &reader)
DDS::ReturnCode_t write_publication_data (const DCPS::RepoId &rid, LocalPublication &pub, const DCPS::RepoId &reader=DCPS::GUID_UNKNOWN)
DDS::ReturnCode_t write_subscription_data (const DCPS::RepoId &rid, LocalSubscription &pub, const DCPS::RepoId &reader=DCPS::GUID_UNKNOWN)
DDS::ReturnCode_t write_participant_message_data (const DCPS::RepoId &rid, LocalParticipantMessage &part, const DCPS::RepoId &reader=DCPS::GUID_UNKNOWN)

Static Private Member Functions

static DCPS::RepoId make_id (const DCPS::RepoId &participant_id, const EntityId_t &entity)
static void set_inline_qos (DCPS::TransportLocatorSeq &locators)

Private Attributes

Spdpspdp_
OpenDDS::RTPS::Sedp::Writer publications_writer_
OpenDDS::RTPS::Sedp::Writer subscriptions_writer_
OpenDDS::RTPS::Sedp::Writer participant_message_writer_
Reader_rch publications_reader_
Reader_rch subscriptions_reader_
Reader_rch participant_message_reader_
OpenDDS::RTPS::Sedp::Task task_
DCPS::TransportInst_rch transport_inst_
LocalParticipantMessageMap local_participant_messages_
DeferredPublicationMap deferred_publications_
DeferredSubscriptionMap deferred_subscriptions_
DCPS::RepoIdSet defer_match_endpoints_
DCPS::RepoIdSet associated_participants_
DCPS::SequenceNumber automatic_liveliness_seq_
DCPS::SequenceNumber manual_liveliness_seq_

Classes

class  Endpoint
struct  LocalParticipantMessage
struct  Msg
class  Reader
struct  Task
class  Writer

Detailed Description

Definition at line 54 of file Sedp.h.


Member Typedef Documentation

typedef LocalParticipantMessageMap::const_iterator OpenDDS::RTPS::Sedp::LocalParticipantMessageCIter [private]

Definition at line 301 of file Sedp.h.

typedef LocalParticipantMessageMap::iterator OpenDDS::RTPS::Sedp::LocalParticipantMessageIter [private]

Definition at line 300 of file Sedp.h.

typedef std::pair<DCPS::MessageId, OpenDDS::DCPS::DiscoveredReaderData> OpenDDS::RTPS::Sedp::MsgIdRdrDataPair [private]

Definition at line 316 of file Sedp.h.

typedef std::pair<DCPS::MessageId, OpenDDS::DCPS::DiscoveredWriterData> OpenDDS::RTPS::Sedp::MsgIdWtrDataPair [private]

Definition at line 311 of file Sedp.h.

typedef DCPS::RcHandle<Reader> OpenDDS::RTPS::Sedp::Reader_rch [private]

Definition at line 239 of file Sedp.h.


Constructor & Destructor Documentation

OpenDDS::RTPS::Sedp::Sedp ( const DCPS::RepoId participant_id,
Spdp owner,
ACE_Thread_Mutex &  lock 
)


Member Function Documentation

void OpenDDS::RTPS::Sedp::acknowledge (  ) 

request for acknowledgement from all Sedp threads (Task)

Definition at line 1962 of file Sedp.cpp.

References OpenDDS::RTPS::Sedp::Task::acknowledge(), and task_.

Referenced by OpenDDS::RTPS::Spdp::fini_bit().

01963 {
01964   task_.acknowledge();
01965 }

void OpenDDS::RTPS::Sedp::assign_bit_key ( DiscoveredSubscription &  sub  )  [private]

Definition at line 364 of file Sedp.cpp.

References OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::increment_key(), OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::sub_bit_key_, and OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::sub_key_to_id_.

00365 {
00366   increment_key(sub_bit_key_);
00367   sub_key_to_id_[sub_bit_key_] = sub.reader_data_.readerProxy.remoteReaderGuid;
00368   sub.reader_data_.ddsSubscriptionData.key = sub_bit_key_;
00369 }

void OpenDDS::RTPS::Sedp::assign_bit_key ( DiscoveredPublication &  pub  )  [private]

Definition at line 356 of file Sedp.cpp.

References OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::increment_key(), OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::pub_bit_key_, and OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::pub_key_to_id_.

Referenced by data_received().

00357 {
00358   increment_key(pub_bit_key_);
00359   pub_key_to_id_[pub_bit_key_] = pub.writer_data_.writerProxy.remoteWriterGuid;
00360   pub.writer_data_.ddsPublicationData.key = pub_bit_key_;
00361 }

void OpenDDS::RTPS::Sedp::associate ( const SPDPdiscoveredParticipantData pdata  ) 

Definition at line 405 of file Sedp.cpp.

References OpenDDS::RTPS::ParticipantProxy_t::availableBuiltinEndpoints, OpenDDS::RTPS::BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_WRITER, OpenDDS::RTPS::create_association_data_proto(), OpenDDS::RTPS::DISC_BUILTIN_ENDPOINT_PUBLICATION_ANNOUNCER, OpenDDS::RTPS::DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_ANNOUNCER, OpenDDS::RTPS::Sedp::Task::enqueue(), OpenDDS::DCPS::ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_WRITER, OpenDDS::DCPS::ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER, OpenDDS::DCPS::ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER, participant_message_reader_, OpenDDS::RTPS::SPDPdiscoveredParticipantData::participantProxy, publications_reader_, OpenDDS::DCPS::AssociationData::remote_id_, subscriptions_reader_, and task_.

Referenced by OpenDDS::RTPS::Spdp::data_received().

00406 {
00407   // First create a 'prototypical' instance of AssociationData.  It will
00408   // be copied and modified for each of the (up to) four SEDP Endpoints.
00409   DCPS::AssociationData proto;
00410   create_association_data_proto(proto, pdata);
00411 
00412   const BuiltinEndpointSet_t& avail =
00413     pdata.participantProxy.availableBuiltinEndpoints;
00414 
00415   // See RTPS v2.1 section 8.5.5.1
00416   if (avail & DISC_BUILTIN_ENDPOINT_PUBLICATION_ANNOUNCER) {
00417     DCPS::AssociationData peer = proto;
00418     peer.remote_id_.entityId = ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER;
00419     publications_reader_->assoc(peer);
00420   }
00421   if (avail & DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_ANNOUNCER) {
00422     DCPS::AssociationData peer = proto;
00423     peer.remote_id_.entityId = ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER;
00424     subscriptions_reader_->assoc(peer);
00425   }
00426   if (avail & BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_WRITER) {
00427     DCPS::AssociationData peer = proto;
00428     peer.remote_id_.entityId = ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_WRITER;
00429     participant_message_reader_->assoc(peer);
00430   }
00431 
00432   SPDPdiscoveredParticipantData* dpd =
00433     new SPDPdiscoveredParticipantData(pdata);
00434   task_.enqueue(dpd);
00435 }

void OpenDDS::RTPS::Sedp::association_complete ( const DCPS::RepoId localId,
const DCPS::RepoId remoteId 
) [virtual]

Implements OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >.

void OpenDDS::RTPS::Sedp::data_received ( DCPS::MessageId  message_id,
const ParticipantMessageData data 
) [private]

Definition at line 1284 of file Sedp.cpp.

References OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::ENTITYID_PARTICIPANT, OpenDDS::RTPS::Spdp::has_discovered_participant(), OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::ignoring(), OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::local_subscriptions_, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::lock_, OpenDDS::RTPS::ParticipantMessageData::participantGuid, OpenDDS::RTPS::Spdp::shutting_down(), and spdp_.

01286 {
01287   if (spdp_.shutting_down()) { return; }
01288 
01289   const RepoId& guid = data.participantGuid;
01290   RepoId guid_participant = guid;
01291   guid_participant.entityId = ENTITYID_PARTICIPANT;
01292   RepoId prefix = data.participantGuid;
01293   prefix.entityId = EntityId_t(); // Clear the entityId so lower bound will work.
01294 
01295   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
01296 
01297   if (ignoring(guid)
01298       || ignoring(guid_participant)) {
01299     return;
01300   }
01301 
01302   if (!spdp_.has_discovered_participant (guid_participant)) {
01303     return;
01304   }
01305 
01306   for (LocalSubscriptionMap::const_iterator sub_pos = local_subscriptions_.begin(),
01307          sub_limit = local_subscriptions_.end();
01308        sub_pos != sub_limit; ++sub_pos) {
01309     const DCPS::RepoIdSet::const_iterator pos =
01310       sub_pos->second.matched_endpoints_.lower_bound(prefix);
01311     if (pos != sub_pos->second.matched_endpoints_.end() &&
01312         OpenDDS::DCPS::GuidPrefixEqual()(pos->guidPrefix, prefix.guidPrefix)) {
01313       sub_pos->second.subscription_->signal_liveliness(guid_participant);
01314     }
01315   }
01316 }

void OpenDDS::RTPS::Sedp::data_received ( DCPS::MessageId  message_id,
const OpenDDS::DCPS::DiscoveredReaderData rdata 
) [private]

Definition at line 1085 of file Sedp.cpp.

References assign_bit_key(), OpenDDS::DCPS::DiscoveredReaderData::contentFilterProperty, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::DiscoveredReaderData::ddsSubscriptionData, deferred_subscriptions_, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::discovered_subscriptions_, OpenDDS::DCPS::DISPOSE_INSTANCE, OpenDDS::DCPS::DISPOSE_UNREGISTER_INSTANCE, OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::ENTITYID_PARTICIPANT, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::get_topic_name(), DDS::HANDLE_NIL, OpenDDS::RTPS::Spdp::has_discovered_participant(), OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::ignoring(), inconsistent_topic(), OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::local_publications_, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::lock_, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::make_topic_guid(), OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::match_endpoints(), DDS::NEW_VIEW_STATE, DDS::NOT_NEW_VIEW_STATE, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::OPENDDS_MAP(), OPENDDS_STRING, paramsChanged(), qosChanged(), OpenDDS::DCPS::DiscoveredReaderData::readerProxy, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::remove_from_bit(), OpenDDS::DCPS::SAMPLE_DATA, OpenDDS::RTPS::Spdp::shutting_down(), spdp_, sub_bit(), DDS::SubscriptionBuiltinTopicData::topic_data, DDS::SubscriptionBuiltinTopicData::topic_name, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::topic_names_, DDS::SubscriptionBuiltinTopicData::type_name, and OpenDDS::DCPS::UNREGISTER_INSTANCE.

01087 {
01088   if (spdp_.shutting_down()) { return; }
01089 
01090   const RepoId& guid = rdata.readerProxy.remoteReaderGuid;
01091   RepoId guid_participant = guid;
01092   guid_participant.entityId = ENTITYID_PARTICIPANT;
01093 
01094   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
01095 
01096   if (ignoring(guid)
01097       || ignoring(guid_participant)
01098       || ignoring(rdata.ddsSubscriptionData.topic_name)) {
01099     return;
01100   }
01101 
01102   if (!spdp_.has_discovered_participant (guid_participant)) {
01103     deferred_subscriptions_[guid] = std::make_pair (message_id, rdata);
01104     return;
01105   }
01106 
01107   OPENDDS_STRING topic_name;
01108   // Find the publication  - iterator valid only as long as we hold the lock
01109   DiscoveredSubscriptionIter iter = discovered_subscriptions_.find(guid);
01110 
01111   // Must unlock when calling into sub_bit() as it may call back into us
01112   ACE_Reverse_Lock<ACE_Thread_Mutex> rev_lock(lock_);
01113 
01114   if (message_id == DCPS::SAMPLE_DATA) {
01115     OpenDDS::DCPS::DiscoveredReaderData rdata_copy;
01116 
01117     if (iter == discovered_subscriptions_.end()) { // add new
01118       { // Reduce scope of sub and td
01119         DiscoveredSubscription& sub =
01120           discovered_subscriptions_[guid] = DiscoveredSubscription(rdata);
01121 
01122         topic_name = get_topic_name(sub);
01123         OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
01124           topics_.find(topic_name);
01125         if (top_it == topics_.end()) {
01126           top_it =
01127             topics_.insert(std::make_pair(topic_name, TopicDetails())).first;
01128           top_it->second.data_type_ = rdata.ddsSubscriptionData.type_name;
01129           top_it->second.qos_.topic_data = rdata.ddsSubscriptionData.topic_data;
01130           top_it->second.repo_id_ = make_topic_guid();
01131 
01132         } else if (top_it->second.data_type_ !=
01133                    rdata.ddsSubscriptionData.type_name.in()) {
01134           inconsistent_topic(top_it->second.endpoints_);
01135           if (DCPS::DCPS_debug_level) {
01136             ACE_DEBUG((LM_WARNING,
01137                        ACE_TEXT("(%P|%t) Sedp::data_received(drd) - WARNING ")
01138                        ACE_TEXT("topic %C discovered data type %C doesn't ")
01139                        ACE_TEXT("match known data type %C, ignoring ")
01140                        ACE_TEXT("discovered subcription.\n"),
01141                        topic_name.c_str(),
01142                        rdata.ddsSubscriptionData.type_name.in(),
01143                        top_it->second.data_type_.c_str()));
01144           }
01145           return;
01146         }
01147 
01148         TopicDetails& td = top_it->second;
01149         topic_names_[td.repo_id_] = topic_name;
01150         td.endpoints_.insert(guid);
01151 
01152         std::memcpy(sub.reader_data_.ddsSubscriptionData.participant_key.value,
01153                     guid.guidPrefix, sizeof(DDS::BuiltinTopicKey_t));
01154         assign_bit_key(sub);
01155         rdata_copy = sub.reader_data_;
01156       }
01157 
01158       // Iter no longer valid once lock released
01159       iter = discovered_subscriptions_.end();
01160 
01161       DDS::InstanceHandle_t instance_handle = DDS::HANDLE_NIL;
01162 #ifndef DDS_HAS_MINIMUM_BIT
01163       {
01164         // Release lock for call into sub_bit
01165         ACE_GUARD(ACE_Reverse_Lock< ACE_Thread_Mutex>, rg, rev_lock);
01166         DDS::SubscriptionBuiltinTopicDataDataReaderImpl* bit = sub_bit();
01167         if (bit) { // bit may be null if the DomainParticipant is shutting down
01168           instance_handle =
01169             bit->store_synthetic_data(rdata_copy.ddsSubscriptionData,
01170                                       DDS::NEW_VIEW_STATE);
01171         }
01172       }
01173 #endif /* DDS_HAS_MINIMUM_BIT */
01174 
01175       if (spdp_.shutting_down()) { return; }
01176       // Subscription may have been removed while lock released
01177       iter = discovered_subscriptions_.find(guid);
01178       if (iter != discovered_subscriptions_.end()) {
01179         iter->second.bit_ih_ = instance_handle;
01180         OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
01181             topics_.find(topic_name);
01182         if (top_it != topics_.end()) {
01183           if (DCPS::DCPS_debug_level > 3) {
01184             ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Sedp::data_received(drd) - ")
01185                                  ACE_TEXT("calling match_endpoints new\n")));
01186           }
01187           match_endpoints(guid, top_it->second);
01188         }
01189       }
01190 
01191     } else { // update existing
01192       if (qosChanged(iter->second.reader_data_.ddsSubscriptionData,
01193                      rdata.ddsSubscriptionData)) {
01194 #ifndef DDS_HAS_MINIMUM_BIT
01195         DDS::SubscriptionBuiltinTopicDataDataReaderImpl* bit = sub_bit();
01196         if (bit) { // bit may be null if the DomainParticipant is shutting down
01197           bit->store_synthetic_data(
01198                 iter->second.reader_data_.ddsSubscriptionData,
01199                 DDS::NOT_NEW_VIEW_STATE);
01200         }
01201 #endif /* DDS_HAS_MINIMUM_BIT */
01202 
01203         // Match/unmatch local publication(s)
01204         topic_name = get_topic_name(iter->second);
01205         OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
01206             topics_.find(topic_name);
01207         if (top_it != topics_.end()) {
01208           if (DCPS::DCPS_debug_level > 3) {
01209             ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Sedp::data_received(drd) - ")
01210                                  ACE_TEXT("calling match_endpoints update\n")));
01211           }
01212           match_endpoints(guid, top_it->second);
01213         }
01214       }
01215 
01216       if (paramsChanged(iter->second.reader_data_.contentFilterProperty,
01217                         rdata.contentFilterProperty)) {
01218         // Let any associated local publications know about the change
01219         topic_name = get_topic_name(iter->second);
01220         OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
01221             topics_.find(topic_name);
01222         using DCPS::RepoIdSet;
01223         const RepoIdSet& assoc =
01224           (top_it == topics_.end()) ? RepoIdSet() : top_it->second.endpoints_;
01225         for (RepoIdSet::const_iterator i = assoc.begin(); i != assoc.end(); ++i) {
01226           if (i->entityId.entityKind & 4) continue; // subscription
01227           const LocalPublicationIter lpi = local_publications_.find(*i);
01228           if (lpi != local_publications_.end()) {
01229             lpi->second.publication_->update_subscription_params(guid,
01230               rdata.contentFilterProperty.expressionParameters);
01231           }
01232         }
01233       }
01234     }
01235     // For each associated opendds writer to this reader
01236     CORBA::ULong len = rdata.readerProxy.associatedWriters.length();
01237     for (CORBA::ULong writerIndex = 0; writerIndex < len; ++writerIndex)
01238     {
01239       GUID_t writerGuid = rdata.readerProxy.associatedWriters[writerIndex];
01240 
01241       // If the associated writer is in this participant
01242       LocalPublicationIter lp = local_publications_.find(writerGuid);
01243       if (lp != local_publications_.end()) {
01244         // If the local writer is not fully associated with the reader
01245         if (lp->second.remote_opendds_associations_.insert(guid).second) {
01246           // This is a new association
01247           lp->second.publication_->association_complete(guid);
01248         }
01249       }
01250     }
01251 
01252   } else if (message_id == DCPS::UNREGISTER_INSTANCE ||
01253              message_id == DCPS::DISPOSE_INSTANCE ||
01254              message_id == DCPS::DISPOSE_UNREGISTER_INSTANCE) {
01255     if (iter != discovered_subscriptions_.end()) {
01256       // Unmatch local publication(s)
01257       topic_name = get_topic_name(iter->second);
01258       OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
01259           topics_.find(topic_name);
01260       if (top_it != topics_.end()) {
01261         top_it->second.endpoints_.erase(guid);
01262         if (DCPS::DCPS_debug_level > 3) {
01263           ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Sedp::data_received(drd) - ")
01264                                ACE_TEXT("calling match_endpoints disp/unreg\n")));
01265         }
01266         match_endpoints(guid, top_it->second, true /*remove*/);
01267         if (spdp_.shutting_down()) { return; }
01268       }
01269       remove_from_bit(iter->second);
01270       discovered_subscriptions_.erase(iter);
01271     }
01272   }
01273 }

void OpenDDS::RTPS::Sedp::data_received ( DCPS::MessageId  message_id,
const OpenDDS::DCPS::DiscoveredWriterData wdata 
) [private]

Definition at line 925 of file Sedp.cpp.

References assign_bit_key(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::DiscoveredWriterData::ddsPublicationData, deferred_publications_, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::discovered_publications_, OpenDDS::DCPS::DISPOSE_INSTANCE, OpenDDS::DCPS::DISPOSE_UNREGISTER_INSTANCE, OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::ENTITYID_PARTICIPANT, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::get_topic_name(), DDS::HANDLE_NIL, OpenDDS::RTPS::Spdp::has_discovered_participant(), OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::ignoring(), inconsistent_topic(), OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::lock_, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::make_topic_guid(), OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::match_endpoints(), DDS::NEW_VIEW_STATE, DDS::NOT_NEW_VIEW_STATE, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::OPENDDS_MAP(), OPENDDS_STRING, pub_bit(), qosChanged(), OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::remove_from_bit(), OpenDDS::DCPS::SAMPLE_DATA, OpenDDS::RTPS::Spdp::shutting_down(), spdp_, DDS::PublicationBuiltinTopicData::topic_data, DDS::PublicationBuiltinTopicData::topic_name, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::topic_names_, DDS::PublicationBuiltinTopicData::type_name, OpenDDS::DCPS::UNREGISTER_INSTANCE, and OpenDDS::DCPS::DiscoveredWriterData::writerProxy.

Referenced by OpenDDS::RTPS::Sedp::Task::svc_i().

00927 {
00928   if (spdp_.shutting_down()) { return; }
00929 
00930   const RepoId& guid = wdata.writerProxy.remoteWriterGuid;
00931   RepoId guid_participant = guid;
00932   guid_participant.entityId = ENTITYID_PARTICIPANT;
00933 
00934   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00935   if (ignoring(guid)
00936       || ignoring(guid_participant)
00937       || ignoring(wdata.ddsPublicationData.topic_name)) {
00938     return;
00939   }
00940 
00941   if (!spdp_.has_discovered_participant (guid_participant)) {
00942     deferred_publications_[guid] = std::make_pair (message_id, wdata);
00943     return;
00944   }
00945 
00946   OPENDDS_STRING topic_name;
00947   // Find the publication  - iterator valid only as long as we hold the lock
00948   DiscoveredPublicationIter iter = discovered_publications_.find(guid);
00949 
00950   if (message_id == DCPS::SAMPLE_DATA) {
00951     OpenDDS::DCPS::DiscoveredWriterData wdata_copy;
00952 
00953     if (iter == discovered_publications_.end()) { // add new
00954       // Must unlock when calling into pub_bit() as it may call back into us
00955       ACE_Reverse_Lock<ACE_Thread_Mutex> rev_lock(lock_);
00956 
00957       { // Reduce scope of pub and td
00958         DiscoveredPublication& pub =
00959             discovered_publications_[guid] = DiscoveredPublication(wdata);
00960 
00961         topic_name = get_topic_name(pub);
00962         OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
00963           topics_.find(topic_name);
00964         if (top_it == topics_.end()) {
00965           top_it =
00966             topics_.insert(std::make_pair(topic_name, TopicDetails())).first;
00967           top_it->second.data_type_ = wdata.ddsPublicationData.type_name;
00968           top_it->second.qos_.topic_data = wdata.ddsPublicationData.topic_data;
00969           top_it->second.repo_id_ = make_topic_guid();
00970 
00971         } else if (top_it->second.data_type_ !=
00972                    wdata.ddsPublicationData.type_name.in()) {
00973           inconsistent_topic(top_it->second.endpoints_);
00974           if (DCPS::DCPS_debug_level) {
00975             ACE_DEBUG((LM_WARNING,
00976                        ACE_TEXT("(%P|%t) Sedp::data_received(dwd) - WARNING ")
00977                        ACE_TEXT("topic %C discovered data type %C doesn't ")
00978                        ACE_TEXT("match known data type %C, ignoring ")
00979                        ACE_TEXT("discovered publication.\n"),
00980                        topic_name.c_str(),
00981                        wdata.ddsPublicationData.type_name.in(),
00982                        top_it->second.data_type_.c_str()));
00983           }
00984           return;
00985         }
00986 
00987         TopicDetails& td = top_it->second;
00988         topic_names_[td.repo_id_] = topic_name;
00989         td.endpoints_.insert(guid);
00990 
00991         std::memcpy(pub.writer_data_.ddsPublicationData.participant_key.value,
00992                     guid.guidPrefix, sizeof(DDS::BuiltinTopicKey_t));
00993         assign_bit_key(pub);
00994         wdata_copy = pub.writer_data_;
00995       }
00996 
00997       // Iter no longer valid once lock released
00998       iter = discovered_publications_.end();
00999 
01000       DDS::InstanceHandle_t instance_handle = DDS::HANDLE_NIL;
01001 #ifndef DDS_HAS_MINIMUM_BIT
01002       {
01003         // Release lock for call into pub_bit
01004         ACE_GUARD(ACE_Reverse_Lock< ACE_Thread_Mutex>, rg, rev_lock);
01005         DDS::PublicationBuiltinTopicDataDataReaderImpl* bit = pub_bit();
01006         if (bit) { // bit may be null if the DomainParticipant is shutting down
01007           instance_handle =
01008             bit->store_synthetic_data(wdata_copy.ddsPublicationData,
01009                                       DDS::NEW_VIEW_STATE);
01010         }
01011       }
01012 #endif /* DDS_HAS_MINIMUM_BIT */
01013 
01014       if (spdp_.shutting_down()) { return; }
01015       // Publication may have been removed while lock released
01016       iter = discovered_publications_.find(guid);
01017       if (iter != discovered_publications_.end()) {
01018         iter->second.bit_ih_ = instance_handle;
01019         OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
01020             topics_.find(topic_name);
01021         if (top_it != topics_.end()) {
01022           if (DCPS::DCPS_debug_level > 3) {
01023             ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Sedp::data_received(dwd) - ")
01024                                  ACE_TEXT("calling match_endpoints new\n")));
01025           }
01026           match_endpoints(guid, top_it->second);
01027         }
01028       }
01029 
01030     } else if (qosChanged(iter->second.writer_data_.ddsPublicationData,
01031                           wdata.ddsPublicationData)) { // update existing
01032 #ifndef DDS_HAS_MINIMUM_BIT
01033       DDS::PublicationBuiltinTopicDataDataReaderImpl* bit = pub_bit();
01034       if (bit) { // bit may be null if the DomainParticipant is shutting down
01035         bit->store_synthetic_data(iter->second.writer_data_.ddsPublicationData,
01036                                   DDS::NOT_NEW_VIEW_STATE);
01037       }
01038 #endif /* DDS_HAS_MINIMUM_BIT */
01039 
01040       // Match/unmatch local subscription(s)
01041       topic_name = get_topic_name(iter->second);
01042       OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
01043           topics_.find(topic_name);
01044       if (top_it != topics_.end()) {
01045         if (DCPS::DCPS_debug_level > 3) {
01046           ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Sedp::data_received(dwd) - ")
01047                                ACE_TEXT("calling match_endpoints update\n")));
01048         }
01049         match_endpoints(guid, top_it->second);
01050       }
01051     }
01052 
01053   } else if (message_id == DCPS::UNREGISTER_INSTANCE ||
01054              message_id == DCPS::DISPOSE_INSTANCE ||
01055              message_id == DCPS::DISPOSE_UNREGISTER_INSTANCE) {
01056     if (iter != discovered_publications_.end()) {
01057       // Unmatch local subscription(s)
01058       topic_name = get_topic_name(iter->second);
01059       OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
01060           topics_.find(topic_name);
01061       if (top_it != topics_.end()) {
01062         top_it->second.endpoints_.erase(guid);
01063         match_endpoints(guid, top_it->second, true /*remove*/);
01064         if (spdp_.shutting_down()) { return; }
01065       }
01066       remove_from_bit(iter->second);
01067       if (DCPS::DCPS_debug_level > 3) {
01068         ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Sedp::data_received(dwd) - ")
01069                              ACE_TEXT("calling match_endpoints disp/unreg\n")));
01070       }
01071       discovered_publications_.erase(iter);
01072     }
01073   }
01074 }

virtual bool OpenDDS::RTPS::Sedp::defer_reader ( const DCPS::RepoId reader,
const DCPS::RepoId reader_participant 
) [private, virtual]

Implements OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >.

virtual bool OpenDDS::RTPS::Sedp::defer_writer ( const DCPS::RepoId writer,
const DCPS::RepoId writer_participant 
) [private, virtual]

Implements OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >.

bool OpenDDS::RTPS::Sedp::disassociate ( const SPDPdiscoveredParticipantData pdata  ) 

Definition at line 550 of file Sedp.cpp.

References associated_participants_, OpenDDS::RTPS::ParticipantProxy_t::availableBuiltinEndpoints, OpenDDS::RTPS::BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_READER, OpenDDS::RTPS::BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_WRITER, OpenDDS::DCPS::TransportClient::disassociate(), OpenDDS::RTPS::DISC_BUILTIN_ENDPOINT_PUBLICATION_ANNOUNCER, OpenDDS::RTPS::DISC_BUILTIN_ENDPOINT_PUBLICATION_DETECTOR, OpenDDS::RTPS::DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_ANNOUNCER, OpenDDS::RTPS::DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_DETECTOR, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::discovered_publications_, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::discovered_subscriptions_, OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_READER, OpenDDS::DCPS::ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_WRITER, OpenDDS::DCPS::ENTITYID_PARTICIPANT, OpenDDS::DCPS::ENTITYID_SEDP_BUILTIN_PUBLICATIONS_READER, OpenDDS::DCPS::ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER, OpenDDS::DCPS::ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_READER, OpenDDS::DCPS::ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER, OpenDDS::RTPS::ParticipantProxy_t::guidPrefix, OpenDDS::RTPS::Spdp::has_discovered_participant(), OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::lock_, participant_message_reader_, participant_message_writer_, OpenDDS::RTPS::SPDPdiscoveredParticipantData::participantProxy, publications_reader_, publications_writer_, remove_entities_belonging_to(), spdp_, subscriptions_reader_, and subscriptions_writer_.

00551 {
00552   RepoId part;
00553   std::memcpy(part.guidPrefix, pdata.participantProxy.guidPrefix,
00554               sizeof(GuidPrefix_t));
00555   part.entityId = ENTITYID_PARTICIPANT;
00556   associated_participants_.erase(part);
00557   const BuiltinEndpointSet_t avail =
00558     pdata.participantProxy.availableBuiltinEndpoints;
00559 
00560   { // Release lock, so we can call into transport
00561     ACE_Reverse_Lock<ACE_Thread_Mutex> rev_lock(lock_);
00562     ACE_GUARD_RETURN(ACE_Reverse_Lock< ACE_Thread_Mutex>, rg, rev_lock, false);
00563 
00564     // See RTPS v2.1 section 8.5.5.2
00565     if (avail & DISC_BUILTIN_ENDPOINT_PUBLICATION_DETECTOR) {
00566       RepoId id = part;
00567       id.entityId = ENTITYID_SEDP_BUILTIN_PUBLICATIONS_READER;
00568       publications_writer_.disassociate(id);
00569     }
00570     if (avail & DISC_BUILTIN_ENDPOINT_PUBLICATION_ANNOUNCER) {
00571       RepoId id = part;
00572       id.entityId = ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER;
00573       publications_reader_->disassociate(id);
00574     }
00575     if (avail & DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_DETECTOR) {
00576       RepoId id = part;
00577       id.entityId = ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_READER;
00578       subscriptions_writer_.disassociate(id);
00579     }
00580     if (avail & DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_ANNOUNCER) {
00581       RepoId id = part;
00582       id.entityId = ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER;
00583       subscriptions_reader_->disassociate(id);
00584     }
00585     if (avail & BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_READER) {
00586       RepoId id = part;
00587       id.entityId = ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_READER;
00588       participant_message_writer_.disassociate(id);
00589     }
00590     if (avail & BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_WRITER) {
00591       RepoId id = part;
00592       id.entityId = ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_WRITER;
00593       participant_message_reader_->disassociate(id);
00594     }
00595     //FUTURE: if/when topic propagation is supported, add it here
00596   }
00597   if (spdp_.has_discovered_participant(part)) {
00598     remove_entities_belonging_to(discovered_publications_, part);
00599     remove_entities_belonging_to(discovered_subscriptions_, part);
00600     return true;
00601   } else {
00602     return false;
00603   }
00604 }

void OpenDDS::RTPS::Sedp::inconsistent_topic ( const DCPS::RepoIdSet &  endpoints  )  const [private]

Definition at line 769 of file Sedp.cpp.

References OpenDDS::DCPS::GUID_t::guidPrefix, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::local_publications_, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::local_subscriptions_, and OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::participant_id_.

Referenced by data_received().

00770 {
00771   using DCPS::RepoIdSet;
00772   for (RepoIdSet::const_iterator iter(eps.begin()); iter != eps.end(); ++iter) {
00773     if (0 == std::memcmp(participant_id_.guidPrefix, iter->guidPrefix,
00774                          sizeof(GuidPrefix_t))) {
00775       const bool reader = iter->entityId.entityKind & 4;
00776       if (reader) {
00777         const LocalSubscriptionCIter lsi = local_subscriptions_.find(*iter);
00778         if (lsi != local_subscriptions_.end()) {
00779           lsi->second.subscription_->inconsistent_topic();
00780           // Only make one callback per inconsistent topic, even if we have
00781           // more than one reader/writer on the topic -- it's the Topic object
00782           // that will actually see the InconsistentTopicStatus change.
00783           return;
00784         }
00785       } else {
00786         const LocalPublicationCIter lpi = local_publications_.find(*iter);
00787         if (lpi != local_publications_.end()) {
00788           lpi->second.publication_->inconsistent_topic();
00789           return; // see comment above
00790         }
00791       }
00792     }
00793   }
00794 }

DDS::ReturnCode_t OpenDDS::RTPS::Sedp::init ( const DCPS::RepoId guid,
const RtpsDiscovery disco,
DDS::DomainId_t  domainId 
)

const ACE_INET_Addr & OpenDDS::RTPS::Sedp::local_address (  )  const

Definition at line 331 of file Sedp.cpp.

References transport_inst_.

Referenced by map_ipv4_to_ipv6().

00332 {
00333   DCPS::RtpsUdpInst_rch rtps_inst =
00334       DCPS::static_rchandle_cast<DCPS::RtpsUdpInst>(transport_inst_);
00335   return rtps_inst->local_address_;
00336 }

static DCPS::RepoId OpenDDS::RTPS::Sedp::make_id ( const DCPS::RepoId participant_id,
const EntityId_t entity 
) [static, private]

bool OpenDDS::RTPS::Sedp::map_ipv4_to_ipv6 (  )  const

Definition at line 346 of file Sedp.cpp.

References local_address().

00347 {
00348   bool map = false;
00349   if (local_address().get_type() != AF_INET) {
00350     map = true;
00351   }
00352   return map;
00353 }

const ACE_INET_Addr & OpenDDS::RTPS::Sedp::multicast_group (  )  const

Definition at line 339 of file Sedp.cpp.

References transport_inst_.

00340 {
00341   DCPS::RtpsUdpInst_rch rtps_inst =
00342       DCPS::static_rchandle_cast<DCPS::RtpsUdpInst>(transport_inst_);
00343   return rtps_inst->multicast_group_address_;
00344 }

typedef OpenDDS::RTPS::Sedp::OPENDDS_MAP_CMP ( DCPS::RepoId  ,
MsgIdRdrDataPair  ,
DCPS::GUID_tKeyLessThan   
) [private]

typedef OpenDDS::RTPS::Sedp::OPENDDS_MAP_CMP ( DCPS::RepoId  ,
MsgIdWtrDataPair  ,
DCPS::GUID_tKeyLessThan   
) [private]

typedef OpenDDS::RTPS::Sedp::OPENDDS_MAP_CMP ( DCPS::RepoId  ,
LocalParticipantMessage  ,
DCPS::GUID_tKeyLessThan   
) [private]

void OpenDDS::RTPS::Sedp::populate_discovered_reader_msg ( OpenDDS::DCPS::DiscoveredReaderData drd,
const DCPS::RepoId subscription_id,
const LocalSubscription &  sub 
) [private]

void OpenDDS::RTPS::Sedp::populate_discovered_writer_msg ( OpenDDS::DCPS::DiscoveredWriterData dwd,
const DCPS::RepoId publication_id,
const LocalPublication &  pub 
) [private]

void OpenDDS::RTPS::Sedp::populate_transport_locator_sequence ( DCPS::TransportLocatorSeq *&  tls,
DiscoveredPublicationIter iter,
const DCPS::RepoId writer 
) [private, virtual]

Definition at line 2114 of file Sedp.cpp.

References OpenDDS::DCPS::TransportLocator::data, defer_match_endpoints_, OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::ENTITYID_PARTICIPANT, OpenDDS::DCPS::gen_find_size(), OpenDDS::RTPS::Spdp::get_default_locators(), OpenDDS::RTPS::message_block_to_sequence(), spdp_, and OpenDDS::DCPS::TransportLocator::transport_type.

02117 {
02118   OpenDDS::DCPS::LocatorSeq locs;
02119   bool participantExpectsInlineQos = false;
02120   RepoId remote_participant = writer;
02121   remote_participant.entityId = ENTITYID_PARTICIPANT;
02122   const bool participant_found =
02123     spdp_.get_default_locators(remote_participant, locs,
02124                                participantExpectsInlineQos);
02125   if (!wTls->length()) {     // if no locators provided, add the default
02126     if (!participant_found) {
02127       defer_match_endpoints_.insert(writer);
02128       return;
02129     } else if (locs.length()) {
02130       size_t size = 0, padding = 0;
02131       DCPS::gen_find_size(locs, size, padding);
02132 
02133       ACE_Message_Block mb_locator(size + 1);   // Add space for boolean
02134       using DCPS::Serializer;
02135       Serializer ser_loc(&mb_locator,
02136                          ACE_CDR_BYTE_ORDER,
02137                          Serializer::ALIGN_CDR);
02138       ser_loc << locs;
02139       ser_loc << ACE_OutputCDR::from_boolean(participantExpectsInlineQos);
02140 
02141       DCPS::TransportLocator tl;
02142       tl.transport_type = "rtps_udp";
02143       message_block_to_sequence (mb_locator, tl.data);
02144       wTls->length(1);
02145       (*wTls)[0] = tl;
02146     } else {
02147       ACE_DEBUG((LM_WARNING,
02148                  ACE_TEXT("(%P|%t) Sedp::match - ")
02149                  ACE_TEXT("remote writer found with no locators ")
02150                  ACE_TEXT("and no default locators\n")));
02151     }
02152   }
02153 }

void OpenDDS::RTPS::Sedp::populate_transport_locator_sequence ( DCPS::TransportLocatorSeq *&  tls,
DiscoveredSubscriptionIter iter,
const DCPS::RepoId reader 
) [private, virtual]

Definition at line 2069 of file Sedp.cpp.

References OpenDDS::DCPS::TransportLocator::data, defer_match_endpoints_, OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::ENTITYID_PARTICIPANT, OpenDDS::DCPS::gen_find_size(), OpenDDS::RTPS::Spdp::get_default_locators(), OpenDDS::RTPS::message_block_to_sequence(), spdp_, and OpenDDS::DCPS::TransportLocator::transport_type.

02072 {
02073   OpenDDS::DCPS::LocatorSeq locs;
02074   bool participantExpectsInlineQos = false;
02075   RepoId remote_participant = reader;
02076   remote_participant.entityId = ENTITYID_PARTICIPANT;
02077   const bool participant_found =
02078     spdp_.get_default_locators(remote_participant, locs,
02079                                participantExpectsInlineQos);
02080   if (!rTls->length()) {     // if no locators provided, add the default
02081     if (!participant_found) {
02082       defer_match_endpoints_.insert(reader);
02083       return;
02084     } else if (locs.length()) {
02085       size_t size = 0, padding = 0;
02086       DCPS::gen_find_size(locs, size, padding);
02087 
02088       ACE_Message_Block mb_locator(size + 1);   // Add space for boolean
02089       using DCPS::Serializer;
02090       Serializer ser_loc(&mb_locator,
02091                          ACE_CDR_BYTE_ORDER,
02092                          Serializer::ALIGN_CDR);
02093       ser_loc << locs;
02094       const bool readerExpectsInlineQos =
02095         dsi->second.reader_data_.readerProxy.expectsInlineQos;
02096       ser_loc << ACE_OutputCDR::from_boolean(participantExpectsInlineQos
02097                                              || readerExpectsInlineQos);
02098 
02099       DCPS::TransportLocator tl;
02100       tl.transport_type = "rtps_udp";
02101       message_block_to_sequence (mb_locator, tl.data);
02102       rTls->length(1);
02103       (*rTls)[0] = tl;
02104     } else {
02105       ACE_DEBUG((LM_WARNING,
02106                  ACE_TEXT("(%P|%t) Sedp::match - ")
02107                  ACE_TEXT("remote reader found with no locators ")
02108                  ACE_TEXT("and no default locators\n")));
02109     }
02110   }
02111 }

DDS::PublicationBuiltinTopicDataDataReaderImpl * OpenDDS::RTPS::Sedp::pub_bit (  )  [private]

Definition at line 702 of file Sedp.cpp.

References OpenDDS::DCPS::LocalParticipant< EndpointManagerType >::bit_subscriber(), OpenDDS::DCPS::BUILT_IN_PUBLICATION_TOPIC, and spdp_.

Referenced by data_received(), and OpenDDS::RTPS::Sedp::Task::svc_i().

00703 {
00704   DDS::Subscriber_var sub = spdp_.bit_subscriber();
00705   if (!sub.in())
00706     return 0;
00707 
00708   DDS::DataReader_var d =
00709     sub->lookup_datareader(DCPS::BUILT_IN_PUBLICATION_TOPIC);
00710   return dynamic_cast<DDS::PublicationBuiltinTopicDataDataReaderImpl*>(d.in());
00711 }

template<typename Map>
void OpenDDS::RTPS::Sedp::remove_entities_belonging_to ( Map &  m,
RepoId  participant 
)

Definition at line 608 of file Sedp.cpp.

References OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::get_topic_name(), OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::match_endpoints(), OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::OPENDDS_MAP(), OPENDDS_STRING, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::remove_from_bit(), OpenDDS::RTPS::Spdp::shutting_down(), and spdp_.

00609 {
00610   participant.entityId.entityKey[0] = 0;
00611   participant.entityId.entityKey[1] = 0;
00612   participant.entityId.entityKey[2] = 0;
00613   participant.entityId.entityKind = 0;
00614   for (typename Map::iterator i = m.lower_bound(participant);
00615        i != m.end() && 0 == std::memcmp(i->first.guidPrefix,
00616                                         participant.guidPrefix,
00617                                         sizeof(GuidPrefix_t));) {
00618     OPENDDS_STRING topic_name = get_topic_name(i->second);
00619     OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
00620       topics_.find(topic_name);
00621     if (top_it != topics_.end()) {
00622       top_it->second.endpoints_.erase(i->first);
00623       if (DCPS::DCPS_debug_level > 3) {
00624         ACE_DEBUG((LM_DEBUG,
00625                    ACE_TEXT("(%P|%t) Sedp::remove_entities_belonging_to - ")
00626                    ACE_TEXT("calling match_endpoints remove\n")));
00627       }
00628       match_endpoints(i->first, top_it->second, true /*remove*/);
00629       if (spdp_.shutting_down()) { return; }
00630     }
00631     remove_from_bit(i->second);
00632     m.erase(i++);
00633   }
00634 }

template<typename Map>
void OpenDDS::RTPS::Sedp::remove_entities_belonging_to ( Map &  m,
DCPS::RepoId  participant 
) [private]

Referenced by disassociate().

void OpenDDS::RTPS::Sedp::remove_from_bit_i ( const DiscoveredSubscription &  sub  )  [private]

Definition at line 647 of file Sedp.cpp.

References OpenDDS::RTPS::Sedp::Task::enqueue(), OpenDDS::RTPS::Sedp::Msg::MSG_REMOVE_FROM_SUB_BIT, and task_.

00648 {
00649 #ifndef DDS_HAS_MINIMUM_BIT
00650   task_.enqueue(Msg::MSG_REMOVE_FROM_SUB_BIT, sub.bit_ih_);
00651 #else
00652   ACE_UNUSED_ARG(sub);
00653 #endif /* DDS_HAS_MINIMUM_BIT */
00654 }

void OpenDDS::RTPS::Sedp::remove_from_bit_i ( const DiscoveredPublication &  pub  )  [private]

Definition at line 637 of file Sedp.cpp.

References OpenDDS::RTPS::Sedp::Task::enqueue(), OpenDDS::RTPS::Sedp::Msg::MSG_REMOVE_FROM_PUB_BIT, and task_.

00638 {
00639 #ifndef DDS_HAS_MINIMUM_BIT
00640   task_.enqueue(Msg::MSG_REMOVE_FROM_PUB_BIT, pub.bit_ih_);
00641 #else
00642   ACE_UNUSED_ARG(pub);
00643 #endif /* DDS_HAS_MINIMUM_BIT */
00644 }

virtual DDS::ReturnCode_t OpenDDS::RTPS::Sedp::remove_publication_i ( const DCPS::RepoId publicationId  )  [private, virtual]

Implements OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >.

virtual DDS::ReturnCode_t OpenDDS::RTPS::Sedp::remove_subscription_i ( const DCPS::RepoId subscriptionId  )  [private, virtual]

Implements OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >.

void OpenDDS::RTPS::Sedp::set_inline_qos ( DCPS::TransportLocatorSeq locators  )  [static, private]

Definition at line 1949 of file Sedp.cpp.

References OPENDDS_STRING.

Referenced by OpenDDS::RTPS::Sedp::Reader::data_received().

01950 {
01951   const OPENDDS_STRING rtps_udp = "rtps_udp";
01952   for (CORBA::ULong i = 0; i < locators.length(); ++i) {
01953     if (locators[i].transport_type.in() == rtps_udp) {
01954       const CORBA::ULong len = locators[i].data.length();
01955       locators[i].data.length(len + 1);
01956       locators[i].data[len] = CORBA::Octet(1);
01957     }
01958   }
01959 }

void OpenDDS::RTPS::Sedp::shutdown (  ) 

Definition at line 891 of file Sedp.cpp.

References participant_message_reader_, publications_reader_, OpenDDS::RTPS::Sedp::Task::shutdown(), subscriptions_reader_, and task_.

Referenced by OpenDDS::RTPS::Spdp::~Spdp().

00892 {
00893   task_.shutdown();
00894   publications_reader_->shutting_down_ = 1;
00895   subscriptions_reader_->shutting_down_ = 1;
00896   participant_message_reader_->shutting_down_ = 1;
00897 }

bool OpenDDS::RTPS::Sedp::shutting_down (  )  const [private, virtual]

Implements OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >.

Definition at line 2063 of file Sedp.cpp.

References OpenDDS::RTPS::Spdp::shutting_down(), and spdp_.

02064 {
02065   return spdp_.shutting_down();
02066 }

void OpenDDS::RTPS::Sedp::signal_liveliness ( DDS::LivelinessQosPolicyKind  kind  ) 

Definition at line 1341 of file Sedp.cpp.

References DDS::AUTOMATIC_LIVELINESS_QOS, automatic_liveliness_seq_, OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::GUID_UNKNOWN, DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS, DDS::MANUAL_BY_TOPIC_LIVELINESS_QOS, manual_liveliness_seq_, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::participant_id_, OpenDDS::RTPS::PARTICIPANT_MESSAGE_DATA_KIND_AUTOMATIC_LIVELINESS_UPDATE, OpenDDS::RTPS::PARTICIPANT_MESSAGE_DATA_KIND_MANUAL_LIVELINESS_UPDATE, participant_message_writer_, OpenDDS::RTPS::ParticipantMessageData::participantGuid, and OpenDDS::RTPS::Sedp::Writer::write_sample().

Referenced by OpenDDS::RTPS::Spdp::signal_liveliness().

01342 {
01343   ParticipantMessageData pmd;
01344   pmd.participantGuid = participant_id_;
01345   switch (kind) {
01346   case DDS::AUTOMATIC_LIVELINESS_QOS:
01347     pmd.participantGuid.entityId = OpenDDS::DCPS::EntityIdConverter(PARTICIPANT_MESSAGE_DATA_KIND_AUTOMATIC_LIVELINESS_UPDATE);
01348     participant_message_writer_.write_sample(pmd, GUID_UNKNOWN, automatic_liveliness_seq_);
01349     break;
01350   case DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS:
01351     pmd.participantGuid.entityId = OpenDDS::DCPS::EntityIdConverter(PARTICIPANT_MESSAGE_DATA_KIND_MANUAL_LIVELINESS_UPDATE);
01352     participant_message_writer_.write_sample(pmd, GUID_UNKNOWN, manual_liveliness_seq_);
01353     break;
01354   case DDS::MANUAL_BY_TOPIC_LIVELINESS_QOS:
01355     // Do nothing.
01356     break;
01357   }
01358 }

DDS::SubscriptionBuiltinTopicDataDataReaderImpl * OpenDDS::RTPS::Sedp::sub_bit (  )  [private]

Definition at line 714 of file Sedp.cpp.

References OpenDDS::DCPS::LocalParticipant< EndpointManagerType >::bit_subscriber(), OpenDDS::DCPS::BUILT_IN_SUBSCRIPTION_TOPIC, and spdp_.

Referenced by data_received(), and OpenDDS::RTPS::Sedp::Task::svc_i().

00715 {
00716   DDS::Subscriber_var sub = spdp_.bit_subscriber();
00717   if (!sub.in())
00718     return 0;
00719 
00720   DDS::DataReader_var d =
00721     sub->lookup_datareader(DCPS::BUILT_IN_SUBSCRIPTION_TOPIC);
00722   return dynamic_cast<DDS::SubscriptionBuiltinTopicDataDataReaderImpl*>(d.in());
00723 }

DDS::TopicBuiltinTopicDataDataReaderImpl * OpenDDS::RTPS::Sedp::topic_bit (  )  [private]

Definition at line 690 of file Sedp.cpp.

References OpenDDS::DCPS::LocalParticipant< EndpointManagerType >::bit_subscriber(), OpenDDS::DCPS::BUILT_IN_TOPIC_TOPIC, and spdp_.

00691 {
00692   DDS::Subscriber_var sub = spdp_.bit_subscriber();
00693   if (!sub.in())
00694     return 0;
00695 
00696   DDS::DataReader_var d =
00697     sub->lookup_datareader(DCPS::BUILT_IN_TOPIC_TOPIC);
00698   return dynamic_cast<DDS::TopicBuiltinTopicDataDataReaderImpl*>(d.in());
00699 }

void OpenDDS::RTPS::Sedp::unicast_locators ( OpenDDS::DCPS::LocatorSeq locators  )  const

Definition at line 283 of file Sedp.cpp.

References OpenDDS::RTPS::address_to_bytes(), OpenDDS::RTPS::address_to_kind(), OpenDDS::DCPS::get_interface_addrs(), OPENDDS_VECTOR, TheServiceParticipant, and transport_inst_.

00284 {
00285   DCPS::RtpsUdpInst_rch rtps_inst =
00286     DCPS::static_rchandle_cast<DCPS::RtpsUdpInst>(transport_inst_);
00287   using namespace OpenDDS::RTPS;
00288 
00289   CORBA::ULong idx = 0;
00290 
00291   // multicast first so it's preferred by remote peers
00292   if (rtps_inst->use_multicast_ && rtps_inst->multicast_group_address_ != ACE_INET_Addr()) {
00293     idx = locators.length();
00294     locators.length(idx + 1);
00295     locators[idx].kind = address_to_kind(rtps_inst->multicast_group_address_);
00296     locators[idx].port = rtps_inst->multicast_group_address_.get_port_number();
00297     RTPS::address_to_bytes(locators[idx].address,
00298       rtps_inst->multicast_group_address_);
00299   }
00300 
00301   //if local_address_string is empty, or only the port has been set
00302   //need to get interface addresses to populate into the locator
00303   if (rtps_inst->local_address_config_str_.empty() ||
00304       rtps_inst->local_address_config_str_.rfind(':') == 0) {
00305     typedef OPENDDS_VECTOR(ACE_INET_Addr) AddrVector;
00306     AddrVector addrs;
00307     if (TheServiceParticipant->default_address ().empty ()) {
00308       OpenDDS::DCPS::get_interface_addrs(addrs);
00309     } else {
00310       addrs.push_back (ACE_INET_Addr (static_cast<u_short> (0), TheServiceParticipant->default_address ().c_str ()));
00311     }
00312     for (AddrVector::iterator adr_it = addrs.begin(); adr_it != addrs.end(); ++adr_it) {
00313       idx = locators.length();
00314       locators.length(idx + 1);
00315       locators[idx].kind = address_to_kind(*adr_it);
00316       locators[idx].port = rtps_inst->local_address_.get_port_number();
00317       RTPS::address_to_bytes(locators[idx].address,
00318         *adr_it);
00319     }
00320   } else {
00321     idx = locators.length();
00322     locators.length(idx + 1);
00323     locators[idx].kind = address_to_kind(rtps_inst->local_address_);
00324     locators[idx].port = rtps_inst->local_address_.get_port_number();
00325     RTPS::address_to_bytes(locators[idx].address,
00326       rtps_inst->local_address_);
00327   }
00328 }

bool OpenDDS::RTPS::Sedp::update_publication_qos ( const DCPS::RepoId publicationId,
const DDS::DataWriterQos qos,
const DDS::PublisherQos publisherQos 
) [virtual]

Implements OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >.

bool OpenDDS::RTPS::Sedp::update_subscription_params ( const DCPS::RepoId subId,
const DDS::StringSeq params 
) [virtual]

Implements OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >.

bool OpenDDS::RTPS::Sedp::update_subscription_qos ( const DCPS::RepoId subscriptionId,
const DDS::DataReaderQos qos,
const DDS::SubscriberQos subscriberQos 
) [virtual]

Implements OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >.

bool OpenDDS::RTPS::Sedp::update_topic_qos ( const DCPS::RepoId topicId,
const DDS::TopicQos qos,
OPENDDS_STRING &  name 
) [virtual]

Implements OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >.

void OpenDDS::RTPS::Sedp::write_durable_participant_message_data ( const DCPS::RepoId reader  )  [private]

Definition at line 1858 of file Sedp.cpp.

References OpenDDS::RTPS::Sedp::Writer::end_historic_samples(), local_participant_messages_, participant_message_writer_, and write_participant_message_data().

Referenced by OpenDDS::RTPS::Sedp::Task::svc_i().

01859 {
01860   LocalParticipantMessageIter part, end = local_participant_messages_.end();
01861   for (part = local_participant_messages_.begin(); part != end; ++part) {
01862     write_participant_message_data(part->first, part->second, reader);
01863   }
01864   participant_message_writer_.end_historic_samples(reader);
01865 }

void OpenDDS::RTPS::Sedp::write_durable_publication_data ( const DCPS::RepoId reader  )  [private]

Definition at line 1838 of file Sedp.cpp.

References OpenDDS::RTPS::Sedp::Writer::end_historic_samples(), OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::local_publications_, publications_writer_, and write_publication_data().

Referenced by OpenDDS::RTPS::Sedp::Task::svc_i().

01839 {
01840   LocalPublicationIter pub, end = local_publications_.end();
01841   for (pub = local_publications_.begin(); pub != end; ++pub) {
01842     write_publication_data(pub->first, pub->second, reader);
01843   }
01844   publications_writer_.end_historic_samples(reader);
01845 }

void OpenDDS::RTPS::Sedp::write_durable_subscription_data ( const DCPS::RepoId reader  )  [private]

Definition at line 1848 of file Sedp.cpp.

References OpenDDS::RTPS::Sedp::Writer::end_historic_samples(), OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::local_subscriptions_, subscriptions_writer_, and write_subscription_data().

Referenced by OpenDDS::RTPS::Sedp::Task::svc_i().

01849 {
01850   LocalSubscriptionIter sub, end = local_subscriptions_.end();
01851   for (sub = local_subscriptions_.begin(); sub != end; ++sub) {
01852     write_subscription_data(sub->first, sub->second, reader);
01853   }
01854   subscriptions_writer_.end_historic_samples(reader);
01855 }

DDS::ReturnCode_t OpenDDS::RTPS::Sedp::write_participant_message_data ( const DCPS::RepoId rid,
LocalParticipantMessage part,
const DCPS::RepoId reader = DCPS::GUID_UNKNOWN 
) [private]

Referenced by write_durable_participant_message_data().

DDS::ReturnCode_t OpenDDS::RTPS::Sedp::write_publication_data ( const DCPS::RepoId rid,
LocalPublication &  pub,
const DCPS::RepoId reader = DCPS::GUID_UNKNOWN 
) [private]

Referenced by write_durable_publication_data().

DDS::ReturnCode_t OpenDDS::RTPS::Sedp::write_subscription_data ( const DCPS::RepoId rid,
LocalSubscription &  pub,
const DCPS::RepoId reader = DCPS::GUID_UNKNOWN 
) [private]

Referenced by write_durable_subscription_data().


Member Data Documentation

DCPS::RepoIdSet OpenDDS::RTPS::Sedp::associated_participants_ [private]

Definition at line 335 of file Sedp.h.

Referenced by disassociate(), and OpenDDS::RTPS::Sedp::Task::svc_i().

DCPS::SequenceNumber OpenDDS::RTPS::Sedp::automatic_liveliness_seq_ [private]

Definition at line 374 of file Sedp.h.

Referenced by signal_liveliness().

DCPS::RepoIdSet OpenDDS::RTPS::Sedp::defer_match_endpoints_ [private]

Definition at line 335 of file Sedp.h.

Referenced by populate_transport_locator_sequence(), and OpenDDS::RTPS::Sedp::Task::svc_i().

DeferredPublicationMap OpenDDS::RTPS::Sedp::deferred_publications_ [private]

Definition at line 314 of file Sedp.h.

Referenced by data_received(), and OpenDDS::RTPS::Sedp::Task::svc_i().

DeferredSubscriptionMap OpenDDS::RTPS::Sedp::deferred_subscriptions_ [private]

Definition at line 319 of file Sedp.h.

Referenced by data_received(), and OpenDDS::RTPS::Sedp::Task::svc_i().

const bool OpenDDS::RTPS::Sedp::host_is_bigendian_ [static]

Definition at line 101 of file Sedp.h.

Referenced by OpenDDS::RTPS::Sedp::Writer::write_sample().

LocalParticipantMessageMap OpenDDS::RTPS::Sedp::local_participant_messages_ [private]

Definition at line 302 of file Sedp.h.

Referenced by write_durable_participant_message_data().

DCPS::SequenceNumber OpenDDS::RTPS::Sedp::manual_liveliness_seq_ [private]

Definition at line 375 of file Sedp.h.

Referenced by signal_liveliness().

Reader_rch OpenDDS::RTPS::Sedp::participant_message_reader_ [private]

Definition at line 241 of file Sedp.h.

Referenced by associate(), disassociate(), and shutdown().

OpenDDS::RTPS::Sedp::Writer OpenDDS::RTPS::Sedp::participant_message_writer_ [private]

Referenced by disassociate(), signal_liveliness(), OpenDDS::RTPS::Sedp::Task::svc_i(), and write_durable_participant_message_data().

Reader_rch OpenDDS::RTPS::Sedp::publications_reader_ [private]

Definition at line 241 of file Sedp.h.

Referenced by associate(), disassociate(), and shutdown().

OpenDDS::RTPS::Sedp::Writer OpenDDS::RTPS::Sedp::publications_writer_ [private]

Referenced by disassociate(), OpenDDS::RTPS::Sedp::Task::svc_i(), and write_durable_publication_data().

Spdp& OpenDDS::RTPS::Sedp::spdp_ [private]

Definition at line 103 of file Sedp.h.

Referenced by data_received(), disassociate(), populate_transport_locator_sequence(), pub_bit(), remove_entities_belonging_to(), shutting_down(), sub_bit(), and topic_bit().

Reader_rch OpenDDS::RTPS::Sedp::subscriptions_reader_ [private]

Definition at line 241 of file Sedp.h.

Referenced by associate(), disassociate(), and shutdown().

OpenDDS::RTPS::Sedp::Writer OpenDDS::RTPS::Sedp::subscriptions_writer_ [private]

Referenced by disassociate(), OpenDDS::RTPS::Sedp::Task::svc_i(), and write_durable_subscription_data().

OpenDDS::RTPS::Sedp::Task OpenDDS::RTPS::Sedp::task_ [private]

Referenced by acknowledge(), associate(), OpenDDS::RTPS::Sedp::Reader::data_received(), remove_from_bit_i(), and shutdown().

DCPS::TransportInst_rch OpenDDS::RTPS::Sedp::transport_inst_ [private]

Definition at line 278 of file Sedp.h.

Referenced by local_address(), multicast_group(), and unicast_locators().


The documentation for this class was generated from the following files:
Generated on Fri Feb 12 20:06:50 2016 for OpenDDS by  doxygen 1.4.7