OpenDDS::Federator::ManagerImpl Class Reference

#include <FederatorManagerImpl.h>

Inheritance diagram for OpenDDS::Federator::ManagerImpl:

Inheritance graph
[legend]
Collaboration diagram for OpenDDS::Federator::ManagerImpl:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 ManagerImpl (Config &config)
 Default constructor.
virtual ~ManagerImpl ()
 Virtual destructor.
virtual CORBA::Boolean discover_federation (const char *ior)
virtual Manager_ptr join_federation (Manager_ptr peer, FederationDomain federation)
virtual void leave_federation (RepoKey id)
virtual RepoKey federation_id ()
virtual OpenDDS::DCPS::DCPSInfo_ptr repository ()
virtual void initializeOwner (const OpenDDS::Federator::OwnerUpdate &data)
virtual void initializeTopic (const OpenDDS::Federator::TopicUpdate &data)
virtual void initializeParticipant (const OpenDDS::Federator::ParticipantUpdate &data)
virtual void initializePublication (const OpenDDS::Federator::PublicationUpdate &data)
virtual void initializeSubscription (const OpenDDS::Federator::SubscriptionUpdate &data)
virtual void leave_and_shutdown ()
virtual void shutdown ()
void initialize ()
 Establish the update publications and subscriptions.
void finalize ()
 Release resources gracefully.
TAO_DDS_DCPSInfo_i *& info ()
 Accessors for the DCPSInfo reference.
TAO_DDS_DCPSInfo_iinfo () const
void localRepo (::OpenDDS::DCPS::DCPSInfo_ptr repo)
 Capture a remote callable reference to the DCPSInfo.
const TAO_DDS_DCPSFederationIdid () const
 Accessors for the federation Id value.
CORBA::ORB_ptr orb ()
 Accessors for the ORB.
void orb (CORBA::ORB_ptr value)
void pushState (Manager_ptr peer)
 Push our current state to a remote repository.
void processDeferred ()
 Handle any deferred updates that might have become processable.
virtual void unregisterCallback ()
virtual void requestImage ()
virtual void create (const Update::UTopic &topic)
virtual void create (const Update::UParticipant &participant)
virtual void create (const Update::URActor &reader)
virtual void create (const Update::UWActor &writer)
virtual void create (const Update::OwnershipData &data)
virtual void update (const Update::IdPath &id, const DDS::DomainParticipantQos &qos)
virtual void update (const Update::IdPath &id, const DDS::TopicQos &qos)
virtual void update (const Update::IdPath &id, const DDS::DataWriterQos &qos)
virtual void update (const Update::IdPath &id, const DDS::PublisherQos &qos)
virtual void update (const Update::IdPath &id, const DDS::DataReaderQos &qos)
virtual void update (const Update::IdPath &id, const DDS::SubscriberQos &qos)
virtual void update (const Update::IdPath &id, const DDS::StringSeq &exprParams)
virtual void destroy (const Update::IdPath &id, Update::ItemType type, Update::ActorType actor)
void processCreate (const OwnerUpdate *sample, const DDS::SampleInfo *info)
 Null implementation for OwnerUpdate samples.
void processCreate (const PublicationUpdate *sample, const DDS::SampleInfo *info)
 Create a proxy for a new publication.
void processCreate (const SubscriptionUpdate *sample, const DDS::SampleInfo *info)
 Create a proxy for a new subscription.
void processCreate (const ParticipantUpdate *sample, const DDS::SampleInfo *info)
 Create a proxy for a new participant.
void processCreate (const TopicUpdate *sample, const DDS::SampleInfo *info)
 Create a proxy for a new topic.
void processUpdateQos1 (const OwnerUpdate *sample, const DDS::SampleInfo *info)
 Process ownership changes.
void processUpdateQos1 (const PublicationUpdate *sample, const DDS::SampleInfo *info)
 Update the proxy DataWriterQos for a publication.
void processUpdateQos2 (const PublicationUpdate *sample, const DDS::SampleInfo *info)
 Update the proxy PublisherQos for a publication.
void processUpdateQos1 (const SubscriptionUpdate *sample, const DDS::SampleInfo *info)
 Update the proxy DataReaderQos for a subscription.
void processUpdateQos2 (const SubscriptionUpdate *sample, const DDS::SampleInfo *info)
 Update the proxy SubscriberQos for a subscription.
void processUpdateFilterExpressionParams (const SubscriptionUpdate *sample, const DDS::SampleInfo *info)
 Update the proxy filter expression params for a subscription.
void processUpdateQos1 (const ParticipantUpdate *sample, const DDS::SampleInfo *info)
 Update the proxy ParticipantQos for a participant.
void processUpdateQos1 (const TopicUpdate *sample, const DDS::SampleInfo *info)
 Update the proxy TopicQos for a topic.
void processDelete (const OwnerUpdate *sample, const DDS::SampleInfo *info)
 Null implementation for OwnerUpdate samples.
void processDelete (const PublicationUpdate *sample, const DDS::SampleInfo *info)
 Delete a proxy for a publication.
void processDelete (const SubscriptionUpdate *sample, const DDS::SampleInfo *info)
 Delete a proxy for a subscription.
void processDelete (const ParticipantUpdate *sample, const DDS::SampleInfo *info)
 Delete a proxy for a participant.
void processDelete (const TopicUpdate *sample, const DDS::SampleInfo *info)
 Delete a proxy for a topic.

Private Types

typedef std::map< RepoKey,
Manager_var > 
IdToManagerMap
 Map type to hold references to federated repository Managers.

Private Attributes

ACE_SYNCH_MUTEX lock_
 Critical section MUTEX.
ACE_Condition< ACE_SYNCH_MUTEX > joining_
 Condition used to gate joining activities.
RepoKey joiner_
 Simple recursion avoidance during the join operations.
RepoKey joinRepo_
 Repository to which we joined.
bool federated_
IdToManagerMap peers_
 The peer with which we have federated.
OpenDDS::DCPS::SequenceNumber sequence_
 The packet sequence number for data that we publish.
Configconfig_
 The configuration information for this manager.
TAO_DDS_DCPSInfo_iinfo_
 The Info object reference to update.
OpenDDS::DCPS::DCPSInfo_var localRepo_
 Remotely callable reference to the local repository.
CORBA::ORB_var orb_
 The ORB in which we are activated.
InfoRepoMulticastResponder multicastResponder_
 Multicast responder.
DDS::DomainParticipant_var federationParticipant_
 local DomainParticipant
UpdateListener< OwnerUpdate,
OwnerUpdateDataReader > 
ownerListener_
 TopicUpdate listener.
UpdateListener< TopicUpdate,
TopicUpdateDataReader > 
topicListener_
 TopicUpdate listener.
UpdateListener< ParticipantUpdate,
ParticipantUpdateDataReader > 
participantListener_
 ParticipantUpdate listener.
UpdateListener< PublicationUpdate,
PublicationUpdateDataReader > 
publicationListener_
 PublicationUpdate listener.
UpdateListener< SubscriptionUpdate,
SubscriptionUpdateDataReader > 
subscriptionListener_
 SubscriptionUpdate listener.
OwnerUpdateDataWriter_var ownerWriter_
 TopicUpdate writer.
TopicUpdateDataWriter_var topicWriter_
 TopicUpdate writer.
ParticipantUpdateDataWriter_var participantWriter_
 ParticipantUpdate writer.
PublicationUpdateDataWriter_var publicationWriter_
 PublicationUpdate writer.
SubscriptionUpdateDataWriter_var subscriptionWriter_
 SubscriptionUpdate writer.
std::list< OwnerUpdatedeferredOwnerships_
 Deferred ownership updates.
std::list< TopicUpdatedeferredTopics_
 Deferred topic updates.
std::list< PublicationUpdatedeferredPublications_
 Deferred publication updates.
std::list< SubscriptionUpdatedeferredSubscriptions_
 Deferred subscription updates.
bool multicastEnabled_
 Is multicast enabled?
ACE_Thread_Mutex deferred_lock_
 Protect deferred updates.

Detailed Description

Definition at line 34 of file FederatorManagerImpl.h.


Member Typedef Documentation

typedef std::map<RepoKey, Manager_var> OpenDDS::Federator::ManagerImpl::IdToManagerMap [private]

Map type to hold references to federated repository Managers.

Definition at line 222 of file FederatorManagerImpl.h.


Constructor & Destructor Documentation

OpenDDS::Federator::ManagerImpl::ManagerImpl ( Config config  ) 

Default constructor.

Definition at line 37 of file FederatorManagerImpl.cpp.

References OpenDDS::DCPS::DCPS_debug_level, and multicastEnabled_.

00038   : joining_(this->lock_),
00039     joiner_(NIL_REPOSITORY),
00040     joinRepo_(NIL_REPOSITORY),
00041     federated_(false),
00042     config_(config),
00043     info_(0),
00044     ownerListener_(*this),
00045     topicListener_(*this),
00046     participantListener_(*this),
00047     publicationListener_(*this),
00048     subscriptionListener_(*this),
00049     multicastEnabled_(false)
00050 {
00051   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00052     ACE_DEBUG((LM_DEBUG,
00053                ACE_TEXT("(%P|%t) Federator::ManagerImpl::ManagerImpl()\n")));
00054   }
00055 
00056   char* mdec = ACE_OS::getenv("MulticastDiscoveryEnabled");
00057 
00058   if (mdec != 0) {
00059     std::string mde(ACE_OS::getenv("MulticastDiscoveryEnabled"));
00060 
00061     if (mde != "0") {
00062       multicastEnabled_ = true;
00063     }
00064   }
00065 }

OpenDDS::Federator::ManagerImpl::~ManagerImpl (  )  [virtual]

Virtual destructor.

Definition at line 67 of file FederatorManagerImpl.cpp.

References OpenDDS::DCPS::DCPS_debug_level.

00068 {
00069   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00070     ACE_DEBUG((LM_DEBUG,
00071                ACE_TEXT("(%P|%t) Federator::ManagerImpl::~ManagerImpl()\n")));
00072   }
00073 }


Member Function Documentation

void OpenDDS::Federator::ManagerImpl::create ( const Update::OwnershipData data  )  [virtual]

Implements Update::Updater.

Definition at line 177 of file FederatorManagerImpl_updates.cpp.

References OpenDDS::Federator::OwnerUpdate::action, OpenDDS::Federator::CreateEntity, OpenDDS::DCPS::DCPS_debug_level, Update::OwnershipData::domain, OpenDDS::Federator::OwnerUpdate::domain, DDS::HANDLE_NIL, TAO_DDS_DCPSFederationId::id(), id(), Update::OwnershipData::owner, OpenDDS::Federator::OwnerUpdate::owner, ownerWriter_, Update::OwnershipData::participant, OpenDDS::Federator::OwnerUpdate::participant, and OpenDDS::Federator::OwnerUpdate::sender.

00178 {
00179   if (CORBA::is_nil(this->ownerWriter_.in())) {
00180     // Decline to publish data until we can.
00181     return;
00182   }
00183 
00184   OwnerUpdate sample;
00185   sample.sender      = this->id().id();
00186   sample.action      = CreateEntity;
00187 
00188   sample.domain      = data.domain;
00189   sample.participant = data.participant;
00190   sample.owner       = data.owner;
00191 
00192   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00193     OpenDDS::DCPS::RepoIdConverter converter(sample.participant);
00194     ACE_DEBUG((LM_DEBUG,
00195                ACE_TEXT("(%P|%t) Federator::ManagerImpl::create( OwnerUpdate): ")
00196                ACE_TEXT("repo %d - [ domain %d/ participant %C/ sender %d/ owner %d ]\n"),
00197                this->id().id(),
00198                sample.domain,
00199                std::string(converter).c_str(),
00200                sample.sender,
00201                sample.owner));
00202   }
00203 
00204   this->ownerWriter_->write(sample, DDS::HANDLE_NIL);
00205 }

virtual void OpenDDS::Federator::ManagerImpl::create ( const Update::UWActor writer  )  [virtual]

Implements Update::Updater.

void OpenDDS::Federator::ManagerImpl::create ( const Update::URActor reader  )  [virtual]

Implements Update::Updater.

Definition at line 102 of file FederatorManagerImpl_updates.cpp.

References OpenDDS::Federator::SubscriptionUpdate::action, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::actorId, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::callback, OpenDDS::Federator::SubscriptionUpdate::callback, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::contentSubscriptionProfile, OpenDDS::Federator::CreateEntity, OpenDDS::Federator::SubscriptionUpdate::datareader_qos, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::SubscriptionUpdate::domain, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::domainId, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::drdwQos, OpenDDS::Federator::SubscriptionUpdate::expression_params, OpenDDS::Federator::SubscriptionUpdate::filter_class_name, OpenDDS::Federator::SubscriptionUpdate::filter_expression, DDS::HANDLE_NIL, OpenDDS::Federator::SubscriptionUpdate::id, TAO_DDS_DCPSFederationId::id(), id(), OpenDDS::Federator::SubscriptionUpdate::participant, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::participantId, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::pubsubQos, OpenDDS::Federator::SubscriptionUpdate::sender, OpenDDS::Federator::SubscriptionUpdate::subscriber_qos, subscriptionWriter_, OpenDDS::Federator::SubscriptionUpdate::topic, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::topicId, OpenDDS::Federator::SubscriptionUpdate::transport_info, and Update::ActorStrt< PSQ, RWQ, C, T, CSP >::transportInterfaceInfo.

00103 {
00104   if (CORBA::is_nil(this->subscriptionWriter_.in())) {
00105     // Decline to publish data until we can.
00106     return;
00107   }
00108 
00109   SubscriptionUpdate sample;
00110   sample.sender         = this->id().id();
00111   sample.action         = CreateEntity;
00112 
00113   sample.domain         = reader.domainId;
00114   sample.participant    = reader.participantId;
00115   sample.topic          = reader.topicId;
00116   sample.id             = reader.actorId;
00117   sample.callback       = reader.callback.c_str();
00118   sample.datareader_qos = reader.drdwQos;
00119   sample.subscriber_qos = reader.pubsubQos;
00120   sample.transport_info = reader.transportInterfaceInfo;
00121   sample.filter_class_name = reader.contentSubscriptionProfile.filterClassName;
00122   sample.filter_expression = reader.contentSubscriptionProfile.filterExpr;
00123   sample.expression_params = reader.contentSubscriptionProfile.exprParams;
00124 
00125   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00126     OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
00127     OpenDDS::DCPS::RepoIdConverter sub_converter(sample.id);
00128     ACE_DEBUG((LM_DEBUG,
00129                ACE_TEXT("(%P|%t) Federator::ManagerImpl::create( SubscriptionUpdate): ")
00130                ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
00131                this->id().id(),
00132                sample.domain,
00133                std::string(part_converter).c_str(),
00134                std::string(sub_converter).c_str()));
00135   }
00136 
00137   this->subscriptionWriter_->write(sample, DDS::HANDLE_NIL);
00138 }

void OpenDDS::Federator::ManagerImpl::create ( const Update::UParticipant participant  )  [virtual]

Implements Update::Updater.

Definition at line 72 of file FederatorManagerImpl_updates.cpp.

References OpenDDS::Federator::ParticipantUpdate::action, OpenDDS::Federator::CreateEntity, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::ParticipantUpdate::domain, Update::ParticipantStrt< Q >::domainId, DDS::HANDLE_NIL, OpenDDS::Federator::ParticipantUpdate::id, TAO_DDS_DCPSFederationId::id(), id(), Update::ParticipantStrt< Q >::owner, OpenDDS::Federator::ParticipantUpdate::owner, Update::ParticipantStrt< Q >::participantId, Update::ParticipantStrt< Q >::participantQos, participantWriter_, OpenDDS::Federator::ParticipantUpdate::qos, and OpenDDS::Federator::ParticipantUpdate::sender.

00073 {
00074   if (CORBA::is_nil(this->participantWriter_.in())) {
00075     // Decline to publish data until we can.
00076     return;
00077   }
00078 
00079   ParticipantUpdate sample;
00080   sample.sender = this->id().id();
00081   sample.action = CreateEntity;
00082 
00083   sample.owner  = participant.owner;
00084   sample.domain = participant.domainId;
00085   sample.id     = participant.participantId;
00086   sample.qos    = participant.participantQos;
00087 
00088   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00089     OpenDDS::DCPS::RepoIdConverter converter(sample.id);
00090     ACE_DEBUG((LM_DEBUG,
00091                ACE_TEXT("(%P|%t) Federator::ManagerImpl::create( ParticipantUpdate): ")
00092                ACE_TEXT("repo %d - [ domain %d/ participant %C ]\n"),
00093                this->id().id(),
00094                sample.domain,
00095                std::string(converter).c_str()));
00096   }
00097 
00098   this->participantWriter_->write(sample, DDS::HANDLE_NIL);
00099 }

void OpenDDS::Federator::ManagerImpl::create ( const Update::UTopic topic  )  [virtual]

Implements Update::Updater.

Definition at line 38 of file FederatorManagerImpl_updates.cpp.

References OpenDDS::Federator::TopicUpdate::action, OpenDDS::Federator::CreateEntity, Update::TopicStrt< Q, S >::dataType, OpenDDS::Federator::TopicUpdate::datatype, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::TopicUpdate::domain, Update::TopicStrt< Q, S >::domainId, DDS::HANDLE_NIL, OpenDDS::Federator::TopicUpdate::id, TAO_DDS_DCPSFederationId::id(), id(), Update::TopicStrt< Q, S >::name, OpenDDS::Federator::TopicUpdate::participant, Update::TopicStrt< Q, S >::participantId, OpenDDS::Federator::TopicUpdate::qos, OpenDDS::Federator::TopicUpdate::sender, OpenDDS::Federator::TopicUpdate::topic, Update::TopicStrt< Q, S >::topicId, Update::TopicStrt< Q, S >::topicQos, and topicWriter_.

00039 {
00040   if (CORBA::is_nil(this->topicWriter_.in())) {
00041     // Decline to publish data until we can.
00042     return;
00043   }
00044 
00045   TopicUpdate sample;
00046   sample.sender      = this->id().id();
00047   sample.action      = CreateEntity;
00048 
00049   sample.id          = topic.topicId;
00050   sample.domain      = topic.domainId;
00051   sample.participant = topic.participantId;
00052   sample.topic       = topic.name.c_str();
00053   sample.datatype    = topic.dataType.c_str();
00054   sample.qos         = topic.topicQos;
00055 
00056   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00057     OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
00058     OpenDDS::DCPS::RepoIdConverter topic_converter(sample.id);
00059     ACE_DEBUG((LM_DEBUG,
00060                ACE_TEXT("(%P|%t) Federator::ManagerImpl::create( TopicUpdate): ")
00061                ACE_TEXT("repo %d - [ domain %d/ participant %C/ topic %C ]\n"),
00062                this->id().id(),
00063                sample.domain,
00064                std::string(part_converter).c_str(),
00065                std::string(topic_converter).c_str()));
00066   }
00067 
00068   this->topicWriter_->write(sample, DDS::HANDLE_NIL);
00069 }

void OpenDDS::Federator::ManagerImpl::destroy ( const Update::IdPath id,
Update::ItemType  type,
Update::ActorType  actor 
) [virtual]

Implements Update::Updater.

Definition at line 208 of file FederatorManagerImpl_updates.cpp.

References OpenDDS::Federator::SubscriptionUpdate::action, OpenDDS::Federator::PublicationUpdate::action, OpenDDS::Federator::ParticipantUpdate::action, OpenDDS::Federator::TopicUpdate::action, Update::Actor, Update::DataReader, Update::DataWriter, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::DestroyEntity, OpenDDS::Federator::SubscriptionUpdate::domain, OpenDDS::Federator::PublicationUpdate::domain, OpenDDS::Federator::ParticipantUpdate::domain, OpenDDS::Federator::TopicUpdate::domain, DDS::HANDLE_NIL, OpenDDS::Federator::SubscriptionUpdate::id, OpenDDS::Federator::PublicationUpdate::id, OpenDDS::Federator::ParticipantUpdate::id, OpenDDS::Federator::TopicUpdate::id, TAO_DDS_DCPSFederationId::id(), id(), OpenDDS::Federator::SubscriptionUpdate::participant, OpenDDS::Federator::PublicationUpdate::participant, Update::Participant, OpenDDS::Federator::TopicUpdate::participant, participantWriter_, publicationWriter_, OpenDDS::Federator::SubscriptionUpdate::sender, OpenDDS::Federator::PublicationUpdate::sender, OpenDDS::Federator::ParticipantUpdate::sender, OpenDDS::Federator::TopicUpdate::sender, subscriptionWriter_, Update::Topic, and topicWriter_.

00212 {
00213   //
00214   // Do not propagate any destroy() messages within the FederationDomain.
00215   // This domain will be managed separately.
00216   //
00217   if (id.domain == this->config_.federationDomain()) {
00218     return;
00219   }
00220 
00221   switch (type) {
00222   case Update::Topic: {
00223     if (CORBA::is_nil(this->topicWriter_.in())) {
00224       // Decline to publish data until we can.
00225       return;
00226     }
00227 
00228     TopicUpdate sample;
00229     sample.sender      = this->id().id();
00230     sample.action      = DestroyEntity;
00231 
00232     sample.id          = id.id;
00233     sample.domain      = id.domain;
00234     sample.participant = id.participant;
00235 
00236     if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00237       OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
00238       OpenDDS::DCPS::RepoIdConverter topic_converter(sample.id);
00239       ACE_DEBUG((LM_DEBUG,
00240                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::destroy( TopicUpdate): ")
00241                  ACE_TEXT("repo %d - [ domain %d/ participant %C/ topic %C ]\n"),
00242                  this->id().id(),
00243                  sample.domain,
00244                  std::string(part_converter).c_str(),
00245                  std::string(topic_converter).c_str()));
00246     }
00247 
00248     this->topicWriter_->write(sample, DDS::HANDLE_NIL);
00249   }
00250   break;
00251 
00252   case Update::Participant: {
00253     if (CORBA::is_nil(this->participantWriter_.in())) {
00254       // Decline to publish data until we can.
00255       return;
00256     }
00257 
00258     ParticipantUpdate sample;
00259     sample.sender = this->id().id();
00260     sample.action = DestroyEntity;
00261 
00262     sample.domain = id.domain;
00263     sample.id     = id.id;
00264 
00265     if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00266       OpenDDS::DCPS::RepoIdConverter converter(sample.id);
00267       ACE_DEBUG((LM_DEBUG,
00268                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::destroy( ParticipantUpdate): ")
00269                  ACE_TEXT("repo %d - [ domain %d/ participant %C ]\n"),
00270                  this->id().id(),
00271                  sample.domain,
00272                  std::string(converter).c_str()));
00273     }
00274 
00275     this->participantWriter_->write(sample, DDS::HANDLE_NIL);
00276   }
00277   break;
00278 
00279   case Update::Actor:
00280 
00281     // This is VERY annoying.
00282     switch (actor) {
00283     case Update::DataWriter: {
00284       if (CORBA::is_nil(this->publicationWriter_.in())) {
00285         // Decline to publish data until we can.
00286         return;
00287       }
00288 
00289       PublicationUpdate sample;
00290       sample.sender         = this->id().id();
00291       sample.action         = DestroyEntity;
00292 
00293       sample.domain         = id.domain;
00294       sample.participant    = id.participant;
00295       sample.id             = id.id;
00296 
00297       if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00298         OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
00299         OpenDDS::DCPS::RepoIdConverter pub_converter(sample.id);
00300         ACE_DEBUG((LM_DEBUG,
00301                    ACE_TEXT("(%P|%t) Federator::ManagerImpl::destroy( PublicationUpdate): ")
00302                    ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"),
00303                    this->id().id(),
00304                    sample.domain,
00305                    std::string(part_converter).c_str(),
00306                    std::string(pub_converter).c_str()));
00307       }
00308 
00309       this->publicationWriter_->write(sample, DDS::HANDLE_NIL);
00310     }
00311     break;
00312 
00313     case Update::DataReader: {
00314       if (CORBA::is_nil(this->subscriptionWriter_.in())) {
00315         // Decline to publish data until we can.
00316         return;
00317       }
00318 
00319       SubscriptionUpdate sample;
00320       sample.sender         = this->id().id();
00321       sample.action         = DestroyEntity;
00322 
00323       sample.domain         = id.domain;
00324       sample.participant    = id.participant;
00325       sample.id             = id.id;
00326 
00327       if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00328         OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
00329         OpenDDS::DCPS::RepoIdConverter sub_converter(sample.id);
00330         ACE_DEBUG((LM_DEBUG,
00331                    ACE_TEXT("(%P|%t) Federator::ManagerImpl::destroy( SubscriptionUpdate): ")
00332                    ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
00333                    this->id().id(),
00334                    sample.domain,
00335                    std::string(part_converter).c_str(),
00336                    std::string(sub_converter).c_str()));
00337       }
00338 
00339       this->subscriptionWriter_->write(sample, DDS::HANDLE_NIL);
00340     }
00341     break;
00342     }
00343 
00344     break;
00345   }
00346 }

CORBA::Boolean OpenDDS::Federator::ManagerImpl::discover_federation ( const char *  ior  )  [virtual]

Definition at line 902 of file FederatorManagerImpl.cpp.

References OpenDDS::DCPS::DCPS_debug_level.

00903 {
00904   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00905     ACE_DEBUG((LM_DEBUG,
00906                ACE_TEXT("(%P|%t) ManagerImpl::discover_federation( %C)\n"),
00907                ior));
00908   }
00909 
00910   ///@TODO: Implement this.
00911   return false;
00912 }

RepoKey OpenDDS::Federator::ManagerImpl::federation_id (  )  [virtual]

Definition at line 865 of file FederatorManagerImpl.cpp.

References OpenDDS::DCPS::DCPS_debug_level, TAO_DDS_DCPSFederationId::id(), and id().

00866 {
00867   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00868     ACE_DEBUG((LM_DEBUG,
00869                ACE_TEXT("(%P|%t) ManagerImpl::federation_id()\n")));
00870   }
00871 
00872   return this->id().id();
00873 }

void OpenDDS::Federator::ManagerImpl::finalize (  ) 

Release resources gracefully.

Definition at line 787 of file FederatorManagerImpl.cpp.

References OpenDDS::DCPS::DCPS_debug_level, federated_, orb_, ownerListener_, participantListener_, peers_, publicationListener_, DDS::RETCODE_PRECONDITION_NOT_MET, subscriptionListener_, TheParticipantFactory, and topicListener_.

Referenced by InfoRepo::finalize(), and InfoRepo::handle_exception().

00788 {
00789   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00790     ACE_DEBUG((LM_DEBUG,
00791                ACE_TEXT("(%P|%t) Federator::ManagerImpl::finalize()\n")));
00792   }
00793 
00794   ownerListener_.stop();
00795   topicListener_.stop();
00796   participantListener_.stop();
00797   publicationListener_.stop();
00798   subscriptionListener_.stop();
00799   ownerListener_.join();
00800   topicListener_.join();
00801   participantListener_.join();
00802   publicationListener_.join();
00803   subscriptionListener_.join();
00804 
00805   if (this->federated_) {
00806     try {
00807       IdToManagerMap::iterator where = this->peers_.find(this->joinRepo_);
00808 
00809       if (where == this->peers_.end()) {
00810         ACE_DEBUG((LM_DEBUG,
00811                    ACE_TEXT("(%P|%t) Federator::Manager::finalize: ")
00812                    ACE_TEXT("repository %d - all attachment to federation left.\n"),
00813                    this->id().id()));
00814 
00815       } else {
00816         if (CORBA::is_nil(where->second.in())) {
00817           ACE_ERROR((LM_ERROR,
00818                      ACE_TEXT("(%P|%t) ERROR: Federator::Manager::finalize: ")
00819                      ACE_TEXT("repository %d not currently attached to a federation.\n"),
00820                      this->id().id()));
00821 
00822         } else {
00823           where->second->leave_federation(this->id().id());
00824           this->federated_ = false;
00825         }
00826       }
00827 
00828     } catch (const CORBA::Exception& ex) {
00829       ex._tao_print_exception(
00830         ACE_TEXT("ERROR: Federator::ManagerImpl::finalize() - ")
00831         ACE_TEXT("unable to leave remote federation "));
00832       throw Incomplete();
00833     }
00834   }
00835 
00836   if (!CORBA::is_nil(this->orb_.in()) && (0 != this->orb_->orb_core())) {
00837     this->orb_->orb_core()->reactor()->remove_handler(
00838       &this->multicastResponder_,
00839       ACE_Event_Handler::READ_MASK | ACE_Event_Handler::DONT_CALL);
00840   }
00841 
00842   // Remove our local participant and contained entities.
00843   if (0 == CORBA::is_nil(this->federationParticipant_.in())) {
00844     DDS::DomainParticipantFactory_var dpf = TheParticipantFactory;
00845     if (DDS::RETCODE_PRECONDITION_NOT_MET
00846         == this->federationParticipant_->delete_contained_entities()) {
00847       ACE_ERROR((LM_ERROR,
00848                  ACE_TEXT("(%P|%t) ERROR: Federator::Manager ")
00849                  ACE_TEXT("unable to release resources for repository %d.\n"),
00850                  this->id().id()));
00851 
00852     } else if (DDS::RETCODE_PRECONDITION_NOT_MET
00853                == dpf->delete_participant(this->federationParticipant_.in())) {
00854       ACE_ERROR((LM_ERROR,
00855                  ACE_TEXT("(%P|%t) ERROR: Federator::Manager ")
00856                  ACE_TEXT("unable to release the participant for repository %d.\n"),
00857                  this->id().id()));
00858     }
00859   }
00860 }

ACE_INLINE const TAO_DDS_DCPSFederationId & OpenDDS::Federator::ManagerImpl::id (  )  const

Accessors for the federation Id value.

Definition at line 19 of file FederatorManagerImpl.inl.

References config_, and OpenDDS::Federator::Config::federationId().

Referenced by create(), destroy(), federation_id(), InfoRepo::init(), pushState(), and update().

00020 {
00021   return this->config_.federationId();
00022 }

ACE_INLINE TAO_DDS_DCPSInfo_i * OpenDDS::Federator::ManagerImpl::info (  )  const

Definition at line 33 of file FederatorManagerImpl.inl.

References info_.

00034 {
00035   return this->info_;
00036 }

ACE_INLINE TAO_DDS_DCPSInfo_i *& OpenDDS::Federator::ManagerImpl::info (  ) 

Accessors for the DCPSInfo reference.

Definition at line 26 of file FederatorManagerImpl.inl.

References info_.

Referenced by InfoRepo::init().

00027 {
00028   return this->info_;
00029 }

void OpenDDS::Federator::ManagerImpl::initialize (  ) 

Establish the update publications and subscriptions.

Definition at line 76 of file FederatorManagerImpl.cpp.

References config_, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::TransportRegistry::DEFAULT_INST_PREFIX, OpenDDS::DCPS::DEFAULT_STATUS_MASK, OpenDDS::Federator::Defaults::DiscoveryRequestPort, DDS::DataWriterQos::durability, DDS::DataReaderQos::durability, OpenDDS::Federator::Config::federationDomain(), federationParticipant_, OpenDDS::DCPS::DataWriterImpl::get_publication_id(), OpenDDS::DCPS::DataReaderImpl::get_subscription_id(), DDS::DataWriterQos::history, DDS::DataReaderQos::history, OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::TransportRegistry::instance(), DDS::KEEP_LAST_HISTORY_QOS, orb_, ownerListener_, OpenDDS::Federator::OWNERUPDATETOPICNAME, OpenDDS::Federator::OWNERUPDATETYPENAME, ownerWriter_, PARTICIPANT_QOS_DEFAULT, participantListener_, OpenDDS::Federator::PARTICIPANTUPDATETOPICNAME, OpenDDS::Federator::PARTICIPANTUPDATETYPENAME, participantWriter_, publicationListener_, OpenDDS::Federator::PUBLICATIONUPDATETOPICNAME, OpenDDS::Federator::PUBLICATIONUPDATETYPENAME, publicationWriter_, PUBLISHER_QOS_DEFAULT, DDS::DataWriterQos::reliability, DDS::DataReaderQos::reliability, DDS::RELIABLE_RELIABILITY_QOS, DDS::RETCODE_OK, SUBSCRIBER_QOS_DEFAULT, subscriptionListener_, OpenDDS::Federator::SUBSCRIPTIONUPDATETOPICNAME, OpenDDS::Federator::SUBSCRIPTIONUPDATETYPENAME, subscriptionWriter_, TheParticipantFactory, TOPIC_QOS_DEFAULT, topicListener_, OpenDDS::Federator::TOPICUPDATETOPICNAME, OpenDDS::Federator::TOPICUPDATETYPENAME, topicWriter_, and DDS::TRANSIENT_LOCAL_DURABILITY_QOS.

Referenced by join_federation().

00077 {
00078   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00079     ACE_DEBUG((LM_DEBUG,
00080                ACE_TEXT("(%P|%t) Federation::ManagerImpl::initialize()\n")));
00081   }
00082 
00083   // Let the listeners know which repository we are to filter samples at
00084   // the earliest opportunity.
00085   this->ownerListener_.federationId(this->id());
00086   this->topicListener_.federationId(this->id());
00087   this->participantListener_.federationId(this->id());
00088   this->publicationListener_.federationId(this->id());
00089   this->subscriptionListener_.federationId(this->id());
00090 
00091   // Add participant for Federation domain
00092   DDS::DomainParticipantFactory_var dpf = TheParticipantFactory;
00093   this->federationParticipant_
00094   = dpf->create_participant(
00095       this->config_.federationDomain(),
00096       PARTICIPANT_QOS_DEFAULT,
00097       DDS::DomainParticipantListener::_nil(),
00098       OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00099 
00100   if (CORBA::is_nil(this->federationParticipant_.in())) {
00101     ACE_ERROR((LM_ERROR,
00102                ACE_TEXT("(%P|%t) ERROR: create_participant failed for ")
00103                ACE_TEXT("repository %d in federation domain %d.\n"),
00104                this->id().id(),
00105                this->config_.federationDomain()));
00106     throw Incomplete();
00107   }
00108   //
00109   // Add type support for update topics
00110   //
00111 
00112   OwnerUpdateTypeSupportImpl* ownerUpdate = new OwnerUpdateTypeSupportImpl();
00113 
00114   if (DDS::RETCODE_OK != ownerUpdate->register_type(
00115         this->federationParticipant_.in(),
00116         OWNERUPDATETYPENAME)) {
00117     ACE_ERROR((LM_ERROR,
00118                ACE_TEXT("(%P|%t) ERROR: Unable to install ")
00119                ACE_TEXT("OwnerUpdate type support for repository %d.\n"),
00120                this->id().id()));
00121     throw Incomplete();
00122   }
00123 
00124   ParticipantUpdateTypeSupportImpl* participantUpdate = new ParticipantUpdateTypeSupportImpl();
00125 
00126   if (DDS::RETCODE_OK != participantUpdate->register_type(
00127         this->federationParticipant_.in(),
00128         PARTICIPANTUPDATETYPENAME)) {
00129     ACE_ERROR((LM_ERROR,
00130                ACE_TEXT("(%P|%t) ERROR: Unable to install ")
00131                ACE_TEXT("ParticipantUpdate type support for repository %d.\n"),
00132                this->id().id()));
00133     throw Incomplete();
00134   }
00135 
00136   TopicUpdateTypeSupportImpl* topicUpdate = new TopicUpdateTypeSupportImpl();
00137 
00138   if (DDS::RETCODE_OK != topicUpdate->register_type(
00139         this->federationParticipant_.in(),
00140         TOPICUPDATETYPENAME)) {
00141     ACE_ERROR((LM_ERROR,
00142                ACE_TEXT("(%P|%t) ERROR: Unable to install ")
00143                ACE_TEXT("TopicUpdate type support for repository %d.\n"),
00144                this->id().id()));
00145     throw Incomplete();
00146   }
00147 
00148   PublicationUpdateTypeSupportImpl* publicationUpdate = new PublicationUpdateTypeSupportImpl();
00149 
00150   if (DDS::RETCODE_OK != publicationUpdate->register_type(
00151         this->federationParticipant_.in(),
00152         PUBLICATIONUPDATETYPENAME)) {
00153     ACE_ERROR((LM_ERROR,
00154                ACE_TEXT("(%P|%t) ERROR: Unable to install ")
00155                ACE_TEXT("PublicationUpdate type support for repository %d.\n"),
00156                this->id().id()));
00157     throw Incomplete();
00158   }
00159 
00160   SubscriptionUpdateTypeSupportImpl* subscriptionUpdate = new SubscriptionUpdateTypeSupportImpl();
00161 
00162   if (DDS::RETCODE_OK != subscriptionUpdate->register_type(
00163         this->federationParticipant_.in(),
00164         SUBSCRIPTIONUPDATETYPENAME)) {
00165     ACE_ERROR((LM_ERROR,
00166                ACE_TEXT("(%P|%t) ERROR: Unable to install ")
00167                ACE_TEXT("SubscriptionUpdate type support for repository %d.\n"),
00168                this->id().id()));
00169     throw Incomplete();
00170   }
00171 
00172   //
00173   // Create a transport config for use with federation entities.
00174   //
00175   std::string config_name =
00176     OpenDDS::DCPS::TransportRegistry::DEFAULT_INST_PREFIX
00177     + std::string("FederationBITTransportConfig");
00178   OpenDDS::DCPS::TransportConfig_rch config =
00179     OpenDDS::DCPS::TransportRegistry::instance()->create_config(config_name);
00180 
00181   std::string inst_name = OpenDDS::DCPS::TransportRegistry::DEFAULT_INST_PREFIX
00182     + std::string("FederationBITTCPTransportInst");
00183   OpenDDS::DCPS::TransportInst_rch inst =
00184     OpenDDS::DCPS::TransportRegistry::instance()->create_inst(inst_name,
00185                                                               "tcp");
00186   config->instances_.push_back(inst);
00187 
00188   //
00189   // Create the subscriber for the update topics.
00190   //
00191 
00192   DDS::Subscriber_var subscriber
00193   = this->federationParticipant_->create_subscriber(
00194       SUBSCRIBER_QOS_DEFAULT,
00195       DDS::SubscriberListener::_nil(),
00196       OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00197 
00198   if (CORBA::is_nil(subscriber.in())) {
00199     ACE_ERROR((LM_ERROR,
00200                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00201                ACE_TEXT("failed to create subscriber for repository %d\n"),
00202                this->id().id()));
00203     throw Incomplete();
00204 
00205   } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00206     ACE_DEBUG((LM_DEBUG,
00207                ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00208                ACE_TEXT("created federation subscriber for repository %d\n"),
00209                this->id().id()));
00210 
00211   }
00212 
00213   // Attach the transport to it.
00214 
00215   try {
00216     OpenDDS::DCPS::TransportRegistry::instance()->bind_config(config,
00217                                                               subscriber.in());
00218   } catch (const OpenDDS::DCPS::Transport::Exception&) {
00219     ACE_ERROR((LM_ERROR,
00220                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00221                ACE_TEXT("failed to bind transport config to federation subscriber.\n")));
00222     throw Incomplete();
00223   }
00224 
00225   //
00226   // Create the publisher for the update topics.
00227   //
00228 
00229   DDS::Publisher_var publisher
00230   = this->federationParticipant_->create_publisher(
00231       PUBLISHER_QOS_DEFAULT,
00232       DDS::PublisherListener::_nil(),
00233       OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00234 
00235   if (CORBA::is_nil(publisher.in())) {
00236     ACE_ERROR((LM_ERROR,
00237                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00238                ACE_TEXT("failed to create publisher for repository %d\n"),
00239                this->id().id()));
00240     throw Incomplete();
00241 
00242   } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00243     ACE_DEBUG((LM_DEBUG,
00244                ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00245                ACE_TEXT("created federation publisher for repository %d\n"),
00246                this->id().id()));
00247 
00248   }
00249 
00250   // Attach the transport to it.
00251 
00252   try {
00253     OpenDDS::DCPS::TransportRegistry::instance()->bind_config(config,
00254                                                               publisher.in());
00255   } catch (const OpenDDS::DCPS::Transport::Exception&) {
00256     ACE_ERROR((LM_ERROR,
00257                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00258                ACE_TEXT("failed to bind transport config to federation publisher.\n")));
00259     throw Incomplete();
00260   }
00261 
00262   //
00263   // Some useful items for adding the subscriptions.
00264   //
00265   DDS::Topic_var            topic;
00266   DDS::TopicDescription_var description;
00267   DDS::DataReader_var       dataReader;
00268   DDS::DataWriter_var       dataWriter;
00269 
00270   DDS::DataReaderQos readerQos;
00271   subscriber->get_default_datareader_qos(readerQos);
00272   readerQos.durability.kind                          = DDS::TRANSIENT_LOCAL_DURABILITY_QOS;
00273   readerQos.history.kind                             = DDS::KEEP_LAST_HISTORY_QOS;
00274   readerQos.history.depth                            = 50;
00275   readerQos.reliability.kind                         = DDS::RELIABLE_RELIABILITY_QOS;
00276   readerQos.reliability.max_blocking_time.sec        = 0;
00277   readerQos.reliability.max_blocking_time.nanosec    = 0;
00278 
00279   DDS::DataWriterQos writerQos;
00280   publisher->get_default_datawriter_qos(writerQos);
00281   writerQos.durability.kind                          = DDS::TRANSIENT_LOCAL_DURABILITY_QOS;
00282   writerQos.history.kind                             = DDS::KEEP_LAST_HISTORY_QOS;
00283   writerQos.history.depth                            = 50;
00284   writerQos.reliability.kind                         = DDS::RELIABLE_RELIABILITY_QOS;
00285   writerQos.reliability.max_blocking_time.sec        = 0;
00286   writerQos.reliability.max_blocking_time.nanosec    = 0;
00287 
00288   //
00289   // Add update subscriptions
00290   //
00291   // NOTE: Its ok to lose the references to the objects here since they
00292   //       are not needed after this point.  The only thing we will do
00293   //       with them is to destroy them, and that will be done via a
00294   //       cascade delete from the participant.  The listeners will
00295   //       survive and can be used within other participants as well,
00296   //       since the only state they retain is the manager, which is the
00297   //       same for all.
00298   //
00299 
00300   topic = this->federationParticipant_->create_topic(
00301             OWNERUPDATETOPICNAME,
00302             OWNERUPDATETYPENAME,
00303             TOPIC_QOS_DEFAULT,
00304             DDS::TopicListener::_nil(),
00305             OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00306 
00307   dataWriter = publisher->create_datawriter(
00308                  topic.in(),
00309                  writerQos,
00310                  DDS::DataWriterListener::_nil(),
00311                  OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00312 
00313   if (CORBA::is_nil(dataWriter.in())) {
00314     ACE_ERROR((LM_ERROR,
00315                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00316                ACE_TEXT("failed to create OwnerUpdate writer for repository %d\n"),
00317                this->id().id()));
00318     throw Incomplete();
00319   }
00320 
00321   this->ownerWriter_ = OwnerUpdateDataWriter::_narrow(dataWriter.in());
00322 
00323   if (::CORBA::is_nil(this->ownerWriter_.in())) {
00324     ACE_ERROR((LM_ERROR,
00325                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00326                ACE_TEXT("failed to extract typed OwnerUpdate writer.\n")));
00327     throw Incomplete();
00328 
00329   } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00330     OpenDDS::DCPS::DataWriterImpl* servant
00331     = dynamic_cast<OpenDDS::DCPS::DataWriterImpl*>(dataWriter.in());
00332 
00333     if (0 == servant) {
00334       ACE_DEBUG((LM_WARNING,
00335                  ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
00336                  ACE_TEXT("unable to extract typed OwnerUpdate writer.\n")));
00337 
00338     } else {
00339       OpenDDS::DCPS::RepoIdConverter converter(servant->get_publication_id());
00340       ACE_DEBUG((LM_DEBUG,
00341                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00342                  ACE_TEXT("created federation OwnerUpdate writer %C for repository %d\n"),
00343                  std::string(converter).c_str(),
00344                  this->id().id()));
00345     }
00346   }
00347 
00348   description = this->federationParticipant_->lookup_topicdescription(OWNERUPDATETOPICNAME);
00349   dataReader  = subscriber->create_datareader(
00350                   description.in(),
00351                   readerQos,
00352                   &this->ownerListener_,
00353                   OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00354 
00355   if (CORBA::is_nil(dataReader.in())) {
00356     ACE_ERROR((LM_ERROR,
00357                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00358                ACE_TEXT("failed to create OwnerUpdate reader for repository %d\n"),
00359                this->id().id()));
00360     throw Incomplete();
00361 
00362   } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00363     OpenDDS::DCPS::DataReaderImpl* servant
00364     = dynamic_cast<OpenDDS::DCPS::DataReaderImpl*>(dataReader.in());
00365 
00366     if (0 == servant) {
00367       ACE_DEBUG((LM_WARNING,
00368                  ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
00369                  ACE_TEXT("unable to extract typed OwnerUpdate reader.\n")));
00370 
00371     } else {
00372       OpenDDS::DCPS::RepoIdConverter converter(servant->get_subscription_id());
00373       ACE_DEBUG((LM_DEBUG,
00374                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00375                  ACE_TEXT("created federation OwnerUpdate reader %C for repository %d\n"),
00376                  std::string(converter).c_str(),
00377                  this->id().id()));
00378     }
00379   }
00380 
00381   topic = this->federationParticipant_->create_topic(
00382             TOPICUPDATETOPICNAME,
00383             TOPICUPDATETYPENAME,
00384             TOPIC_QOS_DEFAULT,
00385             DDS::TopicListener::_nil(),
00386             OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00387   dataWriter = publisher->create_datawriter(
00388                  topic.in(),
00389                  writerQos,
00390                  DDS::DataWriterListener::_nil(),
00391                  OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00392 
00393   if (CORBA::is_nil(dataWriter.in())) {
00394     ACE_ERROR((LM_ERROR,
00395                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00396                ACE_TEXT("failed to create TopicUpdate writer for repository %d\n"),
00397                this->id().id()));
00398     throw Incomplete();
00399   }
00400 
00401   this->topicWriter_
00402   = TopicUpdateDataWriter::_narrow(dataWriter.in());
00403 
00404   if (::CORBA::is_nil(this->topicWriter_.in())) {
00405     ACE_ERROR((LM_ERROR,
00406                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00407                ACE_TEXT("failed to extract typed TopicUpdate writer.\n")));
00408     throw Incomplete();
00409 
00410   } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00411     OpenDDS::DCPS::DataWriterImpl* servant
00412     = dynamic_cast<OpenDDS::DCPS::DataWriterImpl*>(dataWriter.in());
00413 
00414     if (0 == servant) {
00415       ACE_DEBUG((LM_WARNING,
00416                  ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
00417                  ACE_TEXT("unable to extract typed TopicUpdate writer.\n")));
00418 
00419     } else {
00420       OpenDDS::DCPS::RepoIdConverter converter(servant->get_publication_id());
00421       ACE_DEBUG((LM_DEBUG,
00422                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00423                  ACE_TEXT("created federation TopicUpdate writer %C for repository %d\n"),
00424                  std::string(converter).c_str(),
00425                  this->id().id()));
00426     }
00427   }
00428 
00429   description = this->federationParticipant_->lookup_topicdescription(TOPICUPDATETOPICNAME);
00430   dataReader  = subscriber->create_datareader(
00431                   description.in(),
00432                   readerQos,
00433                   &this->topicListener_,
00434                   OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00435 
00436   if (CORBA::is_nil(dataReader.in())) {
00437     ACE_ERROR((LM_ERROR,
00438                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00439                ACE_TEXT("failed to create TopicUpdate reader for repository %d\n"),
00440                this->id().id()));
00441     throw Incomplete();
00442 
00443   } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00444     OpenDDS::DCPS::DataReaderImpl* servant
00445     = dynamic_cast<OpenDDS::DCPS::DataReaderImpl*>(dataReader.in());
00446 
00447     if (0 == servant) {
00448       ACE_DEBUG((LM_WARNING,
00449                  ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
00450                  ACE_TEXT("unable to extract typed TopicUpdate reader.\n")));
00451 
00452     } else {
00453       OpenDDS::DCPS::RepoIdConverter converter(servant->get_subscription_id());
00454       ACE_DEBUG((LM_DEBUG,
00455                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00456                  ACE_TEXT("created federation TopicUpdate reader %C for repository %d\n"),
00457                  std::string(converter).c_str(),
00458                  this->id().id()));
00459     }
00460   }
00461 
00462   topic = this->federationParticipant_->create_topic(
00463             PARTICIPANTUPDATETOPICNAME,
00464             PARTICIPANTUPDATETYPENAME,
00465             TOPIC_QOS_DEFAULT,
00466             DDS::TopicListener::_nil(),
00467             OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00468   dataWriter = publisher->create_datawriter(
00469                  topic.in(),
00470                  writerQos,
00471                  DDS::DataWriterListener::_nil(),
00472                  OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00473 
00474   if (CORBA::is_nil(dataWriter.in())) {
00475     ACE_ERROR((LM_ERROR,
00476                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00477                ACE_TEXT("failed to create ParticipantUpdate writer for repository %d\n"),
00478                this->id().id()));
00479     throw Incomplete();
00480   }
00481 
00482   this->participantWriter_
00483   = ParticipantUpdateDataWriter::_narrow(dataWriter.in());
00484 
00485   if (::CORBA::is_nil(this->participantWriter_.in())) {
00486     ACE_ERROR((LM_ERROR,
00487                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00488                ACE_TEXT("failed to extract typed ParticipantUpdate writer.\n")));
00489     throw Incomplete();
00490 
00491   } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00492     OpenDDS::DCPS::DataWriterImpl* servant
00493     = dynamic_cast<OpenDDS::DCPS::DataWriterImpl*>(dataWriter.in());
00494 
00495     if (0 == servant) {
00496       ACE_DEBUG((LM_WARNING,
00497                  ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
00498                  ACE_TEXT("unable to extract typed ParticipantUpdate writer.\n")));
00499 
00500     } else {
00501       OpenDDS::DCPS::RepoIdConverter converter(servant->get_publication_id());
00502       ACE_DEBUG((LM_DEBUG,
00503                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00504                  ACE_TEXT("created federation ParticipantUpdate writer %C for repository %d\n"),
00505                  std::string(converter).c_str(),
00506                  this->id().id()));
00507     }
00508   }
00509 
00510   description = this->federationParticipant_->lookup_topicdescription(PARTICIPANTUPDATETOPICNAME);
00511   dataReader  = subscriber->create_datareader(
00512                   description.in(),
00513                   readerQos,
00514                   &this->participantListener_,
00515                   OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00516 
00517   if (CORBA::is_nil(dataReader.in())) {
00518     ACE_ERROR((LM_ERROR,
00519                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00520                ACE_TEXT("failed to create ParticipantUpdate reader for repository %d\n"),
00521                this->id().id()));
00522     throw Incomplete();
00523 
00524   } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00525     OpenDDS::DCPS::DataReaderImpl* servant
00526     = dynamic_cast<OpenDDS::DCPS::DataReaderImpl*>(dataReader.in());
00527 
00528     if (0 == servant) {
00529       ACE_DEBUG((LM_WARNING,
00530                  ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
00531                  ACE_TEXT("unable to extract typed ParticipantUpdate reader.\n")));
00532 
00533     } else {
00534       OpenDDS::DCPS::RepoIdConverter converter(servant->get_subscription_id());
00535       ACE_DEBUG((LM_DEBUG,
00536                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00537                  ACE_TEXT("created federation ParticipantUpdate reader %C for repository %d\n"),
00538                  std::string(converter).c_str(),
00539                  this->id().id()));
00540     }
00541   }
00542 
00543   topic = this->federationParticipant_->create_topic(
00544             PUBLICATIONUPDATETOPICNAME,
00545             PUBLICATIONUPDATETYPENAME,
00546             TOPIC_QOS_DEFAULT,
00547             DDS::TopicListener::_nil(),
00548             OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00549   dataWriter = publisher->create_datawriter(
00550                  topic.in(),
00551                  writerQos,
00552                  DDS::DataWriterListener::_nil(),
00553                  OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00554 
00555   if (CORBA::is_nil(dataWriter.in())) {
00556     ACE_ERROR((LM_ERROR,
00557                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00558                ACE_TEXT("failed to create PublicationUpdate writer for repository %d\n"),
00559                this->id().id()));
00560     throw Incomplete();
00561   }
00562 
00563   this->publicationWriter_
00564   = PublicationUpdateDataWriter::_narrow(dataWriter.in());
00565 
00566   if (::CORBA::is_nil(this->publicationWriter_.in())) {
00567     ACE_ERROR((LM_ERROR,
00568                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00569                ACE_TEXT("failed to extract typed PublicationUpdate writer.\n")));
00570     throw Incomplete();
00571 
00572   } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00573     OpenDDS::DCPS::DataWriterImpl* servant
00574     = dynamic_cast<OpenDDS::DCPS::DataWriterImpl*>(dataWriter.in());
00575 
00576     if (0 == servant) {
00577       ACE_DEBUG((LM_WARNING,
00578                  ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
00579                  ACE_TEXT("unable to extract typed PublicationUpdate writer.\n")));
00580 
00581     } else {
00582       OpenDDS::DCPS::RepoIdConverter converter(servant->get_publication_id());
00583       ACE_DEBUG((LM_DEBUG,
00584                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00585                  ACE_TEXT("created federation PublicationUpdate writer %C for repository %d\n"),
00586                  std::string(converter).c_str(),
00587                  this->id().id()));
00588     }
00589   }
00590 
00591   description = this->federationParticipant_->lookup_topicdescription(PUBLICATIONUPDATETOPICNAME);
00592   dataReader  = subscriber->create_datareader(
00593                   description.in(),
00594                   readerQos,
00595                   &this->publicationListener_,
00596                   OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00597 
00598   if (CORBA::is_nil(dataReader.in())) {
00599     ACE_ERROR((LM_ERROR,
00600                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00601                ACE_TEXT("failed to create PublicationUpdate reader for repository %d\n"),
00602                this->id().id()));
00603     throw Incomplete();
00604 
00605   } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00606     OpenDDS::DCPS::DataReaderImpl* servant
00607     = dynamic_cast<OpenDDS::DCPS::DataReaderImpl*>(dataReader.in());
00608 
00609     if (0 == servant) {
00610       ACE_DEBUG((LM_WARNING,
00611                  ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
00612                  ACE_TEXT("unable to extract typed PublicationUpdate reader.\n")));
00613 
00614     } else {
00615       OpenDDS::DCPS::RepoIdConverter converter(servant->get_subscription_id());
00616       ACE_DEBUG((LM_DEBUG,
00617                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00618                  ACE_TEXT("created federation PublicationUpdate reader %C for repository %d\n"),
00619                  std::string(converter).c_str(),
00620                  this->id().id()));
00621     }
00622   }
00623 
00624   topic = this->federationParticipant_->create_topic(
00625             SUBSCRIPTIONUPDATETOPICNAME,
00626             SUBSCRIPTIONUPDATETYPENAME,
00627             TOPIC_QOS_DEFAULT,
00628             DDS::TopicListener::_nil(),
00629             OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00630   dataWriter = publisher->create_datawriter(
00631                  topic.in(),
00632                  writerQos,
00633                  DDS::DataWriterListener::_nil(),
00634                  OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00635 
00636   if (CORBA::is_nil(dataWriter.in())) {
00637     ACE_ERROR((LM_ERROR,
00638                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00639                ACE_TEXT("failed to create SubscriptionUpdate writer for repository %d\n"),
00640                this->id().id()));
00641     throw Incomplete();
00642   }
00643 
00644   this->subscriptionWriter_
00645   = SubscriptionUpdateDataWriter::_narrow(dataWriter.in());
00646 
00647   if (::CORBA::is_nil(this->subscriptionWriter_.in())) {
00648     ACE_ERROR((LM_ERROR,
00649                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00650                ACE_TEXT("failed to extract typed SubscriptionUpdate writer.\n")));
00651     throw Incomplete();
00652 
00653   } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00654     OpenDDS::DCPS::DataWriterImpl* servant
00655     = dynamic_cast<OpenDDS::DCPS::DataWriterImpl*>(dataWriter.in());
00656 
00657     if (0 == servant) {
00658       ACE_DEBUG((LM_WARNING,
00659                  ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
00660                  ACE_TEXT("unable to extract typed SubscriptionUpdate writer.\n")));
00661 
00662     } else {
00663       OpenDDS::DCPS::RepoIdConverter converter(servant->get_publication_id());
00664       ACE_DEBUG((LM_DEBUG,
00665                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00666                  ACE_TEXT("created federation SubscriptionUpdate writer %C for repository %d\n"),
00667                  std::string(converter).c_str(),
00668                  this->id().id()));
00669     }
00670   }
00671 
00672   description = this->federationParticipant_->lookup_topicdescription(SUBSCRIPTIONUPDATETOPICNAME);
00673   dataReader  = subscriber->create_datareader(
00674                   description.in(),
00675                   readerQos,
00676                   &this->subscriptionListener_,
00677                   OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00678 
00679   if (CORBA::is_nil(dataReader.in())) {
00680     ACE_ERROR((LM_ERROR,
00681                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00682                ACE_TEXT("failed to create SubscriptionUpdate reader for repository %d\n"),
00683                this->id().id()));
00684     throw Incomplete();
00685 
00686   } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00687     OpenDDS::DCPS::DataReaderImpl* servant
00688     = dynamic_cast<OpenDDS::DCPS::DataReaderImpl*>(dataReader.in());
00689 
00690     if (0 == servant) {
00691       ACE_DEBUG((LM_WARNING,
00692                  ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
00693                  ACE_TEXT("unable to extract typed SubscriptionUpdate reader.\n")));
00694 
00695     } else {
00696       OpenDDS::DCPS::RepoIdConverter converter(servant->get_subscription_id());
00697       ACE_DEBUG((LM_DEBUG,
00698                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00699                  ACE_TEXT("created federation SubscriptionUpdate reader %C for repository %d\n"),
00700                  std::string(converter).c_str(),
00701                  this->id().id()));
00702     }
00703   }
00704 
00705   // JSP
00706 #if defined (ACE_HAS_IP_MULTICAST)
00707 
00708   if (this->multicastEnabled_) {
00709     //
00710     // Install ior multicast handler.
00711     //
00712     // Get reactor instance from TAO.
00713     ACE_Reactor *reactor = this->orb_->orb_core()->reactor();
00714 
00715     // See if the -ORBMulticastDiscoveryEndpoint option was specified.
00716     ACE_CString mde(this->orb_->orb_core()->orb_params()->mcast_discovery_endpoint());
00717 
00718     // First, see if the user has given us a multicast port number
00719     // on the command-line;
00720     u_short port = 0;
00721 
00722     // Check environment var. for multicast port.
00723     const char *port_number = ACE_OS::getenv("OpenDDSFederationPort");
00724 
00725     if (port_number != 0) {
00726       port = static_cast<u_short>(ACE_OS::atoi(port_number));
00727     }
00728 
00729     // Port wasn't specified on the command-line -
00730     // use the default.
00731     if (port == 0)
00732       port = OpenDDS::Federator::Defaults::DiscoveryRequestPort;
00733 
00734     // Initialize the handler
00735     if (mde.length() != 0) {
00736       if (this->multicastResponder_.init(
00737             this->orb_.in(),
00738             mde.c_str()) == -1) {
00739         ACE_ERROR((LM_ERROR,
00740                    ACE_TEXT("(%P|%t) ERROR: Unable to initialize ")
00741                    ACE_TEXT("the multicast responder for repository %d.\n"),
00742                    this->id().id()));
00743         throw Incomplete();
00744       }
00745 
00746     } else {
00747       if (this->multicastResponder_.init(
00748             this->orb_.in(),
00749             port,
00750 #if defined (ACE_HAS_IPV6)
00751             ACE_DEFAULT_MULTICASTV6_ADDR
00752 #else
00753             ACE_DEFAULT_MULTICAST_ADDR
00754 #endif /* ACE_HAS_IPV6 */
00755           )) {
00756         ACE_ERROR((LM_ERROR,
00757                    ACE_TEXT("(%P|%t) ERROR: Unable to initialize ")
00758                    ACE_TEXT("the multicast responder for repository %d.\n"),
00759                    this->id().id()));
00760         throw Incomplete();
00761       }
00762     }
00763 
00764     // Register event handler for the ior multicast.
00765     if (reactor->register_handler(&this->multicastResponder_,
00766                                   ACE_Event_Handler::READ_MASK) == -1) {
00767       ACE_ERROR((LM_ERROR,
00768                  ACE_TEXT("(%P|%t) ERROR: Unable to register event handler ")
00769                  ACE_TEXT("for repository %d.\n"),
00770                  this->id().id()));
00771       throw Incomplete();
00772     }
00773 
00774     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00775       ACE_DEBUG((LM_DEBUG,
00776                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00777                  ACE_TEXT("multicast server setup is complete.\n")));
00778     }
00779   }
00780 
00781 #else
00782   ACE_UNUSED_ARG(this->multicastEnabled_);
00783 #endif /* ACE_HAS_IP_MULTICAST */
00784 }

void OpenDDS::Federator::ManagerImpl::initializeOwner ( const OpenDDS::Federator::OwnerUpdate data  )  [virtual]

Definition at line 1127 of file FederatorManagerImpl.cpp.

References processCreate().

01129 {
01130   this->processCreate(&data, 0);
01131 }

void OpenDDS::Federator::ManagerImpl::initializeParticipant ( const OpenDDS::Federator::ParticipantUpdate data  )  [virtual]

Definition at line 1141 of file FederatorManagerImpl.cpp.

References processCreate().

01143 {
01144   this->processCreate(&data, 0);
01145 }

void OpenDDS::Federator::ManagerImpl::initializePublication ( const OpenDDS::Federator::PublicationUpdate data  )  [virtual]

Definition at line 1148 of file FederatorManagerImpl.cpp.

References processCreate().

01150 {
01151   this->processCreate(&data, 0);
01152 }

void OpenDDS::Federator::ManagerImpl::initializeSubscription ( const OpenDDS::Federator::SubscriptionUpdate data  )  [virtual]

Definition at line 1155 of file FederatorManagerImpl.cpp.

References processCreate().

01157 {
01158   this->processCreate(&data, 0);
01159 }

void OpenDDS::Federator::ManagerImpl::initializeTopic ( const OpenDDS::Federator::TopicUpdate data  )  [virtual]

Definition at line 1134 of file FederatorManagerImpl.cpp.

References processCreate().

01136 {
01137   this->processCreate(&data, 0);
01138 }

Manager_ptr OpenDDS::Federator::ManagerImpl::join_federation ( Manager_ptr  peer,
FederationDomain  federation 
) [virtual]

Definition at line 915 of file FederatorManagerImpl.cpp.

References config_, OpenDDS::DCPS::DCPS_debug_level, federated_, OpenDDS::Federator::Config::federationDomain(), initialize(), joiner_, joining_, joinRepo_, OpenDDS::Federator::NIL_REPOSITORY, orb(), peers_, pushState(), and TheServiceParticipant.

00919 {
00920   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00921     ACE_DEBUG((LM_DEBUG,
00922                ACE_TEXT("(%P|%t) ManagerImpl::join_federation( peer, %d)\n"),
00923                federation));
00924   }
00925 
00926   RepoKey remote = NIL_REPOSITORY;
00927 
00928   try {
00929     // Obtain the remote repository federator Id value.
00930     remote = peer->federation_id();
00931 
00932     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00933       ACE_DEBUG((LM_DEBUG,
00934                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::join_federation() - ")
00935                  ACE_TEXT("repo id %d entered from repository with id %d.\n"),
00936                  this->id().id(),
00937                  remote));
00938     }
00939 
00940   } catch (const CORBA::Exception& ex) {
00941     ex._tao_print_exception(
00942       ACE_TEXT("ERROR: Federator::ManagerImpl::join_federation() - ")
00943       ACE_TEXT("unable to obtain remote federation Id value: "));
00944     throw Incomplete();
00945   }
00946 
00947   // If we are recursing, then we are done.
00948   if (this->joiner_ == remote) {
00949     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00950       ACE_DEBUG((LM_DEBUG,
00951                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::join_federation() - ")
00952                  ACE_TEXT("repo id %d leaving after reentry from repository with id %d.\n"),
00953                  this->id().id(),
00954                  remote));
00955     }
00956 
00957     return this->_this();
00958 
00959   } else {
00960     // Block while any different repository is joining.
00961     ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->lock_, 0);
00962 
00963     while (this->joiner_ != NIL_REPOSITORY) {
00964       // This releases the lock while we block.
00965       this->joining_.wait();
00966 
00967       // We are now recursing - curses!
00968       if (this->joiner_ == remote) {
00969         return this->_this();
00970       }
00971     }
00972 
00973     // Note that we are joining the remote repository now.
00974     this->joiner_ = remote;
00975   }
00976 
00977   //
00978   // We only reach this point if:
00979   //   1) No other repository is processing past this point;
00980   //   2) We are not recursing.
00981   //
00982 
00983   // Check if we already have Federation repository.
00984   // Check if we are already federated.
00985   if (this->federated_ == false) {
00986     // Go ahead and add the joining repository as our Federation
00987     // repository.
00988     try {
00989       // Mark this repository as the point to which we are joined to
00990       // the federation.
00991       this->joinRepo_ = remote;
00992 
00993       // Obtain a reference to the remote repository.
00994       OpenDDS::DCPS::DCPSInfo_var remoteRepo = peer->repository();
00995 
00996       CORBA::ORB_var orb = remoteRepo->_get_orb();
00997       CORBA::String_var remoteRepoIor = orb->object_to_string(remoteRepo.in());
00998       if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00999         ACE_DEBUG((LM_DEBUG,
01000                    ACE_TEXT("(%P|%t) FederatorManagerImpl::join_federation() - ")
01001                    ACE_TEXT("id %d obtained reference to id %d:\n")
01002                    ACE_TEXT("\t%C\n"),
01003                    this->id().id(),
01004                    remote,
01005                    remoteRepoIor.in()));
01006       }
01007 
01008       // Add remote repository to Service_Participant in the Federation domain
01009       std::ostringstream oss;
01010       oss << remote;
01011       std::string key_string = oss.str();
01012       TheServiceParticipant->set_repo_ior(remoteRepoIor.in(), key_string);
01013       TheServiceParticipant->set_repo_domain(this->config_.federationDomain(), key_string);
01014 
01015     } catch (const CORBA::Exception& ex) {
01016       ex._tao_print_exception(
01017         "ERROR: Federator::ManagerImpl::join_federation() - Unable to join with remote: ");
01018       throw Incomplete();
01019     }
01020   }
01021 
01022   // Symmetrical joining behavior.
01023   try {
01024     Manager_var remoteManager
01025     = peer->join_federation(this->_this(), this->config_.federationDomain());
01026 
01027     if (this->joinRepo_ == remote) {
01028       this->peers_[ this->joinRepo_]
01029       = OpenDDS::Federator::Manager::_duplicate(remoteManager.in());
01030     }
01031 
01032     //
01033     // Push our initial state out to the joining repository *after* we call
01034     // him back to join.  This reduces the amount of duplicate data pushed
01035     // when a new (empty) repository is joining an existing federation.
01036     //
01037     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
01038       ACE_DEBUG((LM_DEBUG,
01039                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::join_federation() - ")
01040                  ACE_TEXT("repo id %d pushing state to repository with id %d.\n"),
01041                  this->id().id(),
01042                  remote));
01043     }
01044 
01045     this->pushState(peer);
01046 
01047   } catch (const CORBA::Exception& ex) {
01048     ex._tao_print_exception(
01049       "ERROR: Federator::ManagerImpl::join_federation() - unsuccessful call to remote->join: ");
01050     throw Incomplete();
01051   }
01052 
01053   if (CORBA::is_nil(this->participantWriter_.in())) {
01054     //
01055     // Establish our update publications and subscriptions *after* we
01056     // have exchanged internal state with the first joining repository.
01057     //
01058     this->initialize();
01059   }
01060 
01061   // Adjust our joining state and give others the opportunity to proceed.
01062   if (OpenDDS::DCPS::DCPS_debug_level > 4) {
01063     ACE_DEBUG((LM_DEBUG,
01064                ACE_TEXT("(%P|%t) Federator::ManagerImpl::join_federation() - ")
01065                ACE_TEXT("repo id %d joined to repository with id %d.\n"),
01066                this->id().id(),
01067                remote));
01068   }
01069 
01070   this->federated_ = true;
01071   this->joiner_    = NIL_REPOSITORY;
01072   this->joining_.signal();
01073   return this->_this();
01074 }

void OpenDDS::Federator::ManagerImpl::leave_and_shutdown (  )  [virtual]

Definition at line 1107 of file FederatorManagerImpl.cpp.

References info_, and TAO_DDS_DCPSInfo_i::shutdown().

01109 {
01110   // Shutdown the process via the repository object.
01111   this->info_->shutdown();
01112 }

void OpenDDS::Federator::ManagerImpl::leave_federation ( RepoKey  id  )  [virtual]

Definition at line 1077 of file FederatorManagerImpl.cpp.

References OpenDDS::DCPS::DCPS_debug_level, and peers_.

01079 {
01080   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
01081     ACE_DEBUG((LM_DEBUG,
01082                ACE_TEXT("(%P|%t) ManagerImpl::leave_federation( %d)\n"),
01083                this->id().id()));
01084   }
01085 
01086   // Remove the leaving repository from our outbound mappings.
01087   IdToManagerMap::iterator where = this->peers_.find(id);
01088 
01089   if (where != this->peers_.end()) {
01090     this->peers_.erase(where);
01091   }
01092 
01093   // Remove all the internal Entities owned by the leaving repository.
01094   if (false
01095       == this->info_->remove_by_owner(this->config_.federationDomain(), id)) {
01096     throw Incomplete();
01097   }
01098 
01099   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
01100     ACE_DEBUG((LM_DEBUG,
01101                ACE_TEXT("(%P|%t) ManagerImpl::leave_federation( %d) complete.\n"),
01102                this->id().id()));
01103   }
01104 }

ACE_INLINE void OpenDDS::Federator::ManagerImpl::localRepo ( ::OpenDDS::DCPS::DCPSInfo_ptr  repo  ) 

Capture a remote callable reference to the DCPSInfo.

Definition at line 40 of file FederatorManagerImpl.inl.

References localRepo_.

Referenced by InfoRepo::init().

00041 {
00042   this->localRepo_ = OpenDDS::DCPS::DCPSInfo::_duplicate(repo);
00043 }

ACE_INLINE void OpenDDS::Federator::ManagerImpl::orb ( CORBA::ORB_ptr  value  ) 

Definition at line 54 of file FederatorManagerImpl.inl.

References orb_.

00055 {
00056   this->orb_ = CORBA::ORB::_duplicate(value);
00057 }

ACE_INLINE CORBA::ORB_ptr OpenDDS::Federator::ManagerImpl::orb (  ) 

Accessors for the ORB.

Definition at line 47 of file FederatorManagerImpl.inl.

References orb_.

Referenced by InfoRepo::init(), join_federation(), and pushState().

00048 {
00049   return this->orb_.ptr();
00050 }

void OpenDDS::Federator::ManagerImpl::processCreate ( const TopicUpdate sample,
const DDS::SampleInfo info 
)

Create a proxy for a new topic.

Definition at line 729 of file FederatorManagerImpl_updates.cpp.

References OpenDDS::Federator::TopicUpdate::datatype, OpenDDS::DCPS::DCPS_debug_level, deferredTopics_, OpenDDS::Federator::TopicUpdate::domain, OpenDDS::Federator::TopicUpdate::id, OpenDDS::Federator::TopicUpdate::participant, processDeferred(), OpenDDS::Federator::TopicUpdate::qos, and OpenDDS::Federator::TopicUpdate::topic.

00730 {
00731   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00732     OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
00733     OpenDDS::DCPS::RepoIdConverter topic_converter(sample->id);
00734     ACE_DEBUG((LM_DEBUG,
00735                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( TopicUpdate): ")
00736                ACE_TEXT("repo %d - [ domain %d/ participant %C/ topic %C ]\n"),
00737                this->id().id(),
00738                sample->domain,
00739                std::string(part_converter).c_str(),
00740                std::string(topic_converter).c_str()));
00741   }
00742 
00743   if (false == this->info_->add_topic(sample->id,
00744                                       sample->domain,
00745                                       sample->participant,
00746                                       sample->topic,
00747                                       sample->datatype,
00748                                       sample->qos)) {
00749     {
00750       ACE_GUARD(ACE_Thread_Mutex,
00751                 guard,
00752                 this->deferred_lock_);
00753       this->deferredTopics_.push_back(*sample);
00754     }
00755 
00756     if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00757       ACE_DEBUG((LM_DEBUG,
00758                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( TopicUpdate): ")
00759                  ACE_TEXT("deferred update.\n")));
00760     }
00761   }
00762 
00763   this->processDeferred();
00764 }

void OpenDDS::Federator::ManagerImpl::processCreate ( const ParticipantUpdate sample,
const DDS::SampleInfo info 
)

Create a proxy for a new participant.

Definition at line 697 of file FederatorManagerImpl_updates.cpp.

References TAO_DDS_DCPSInfo_i::add_domain_participant(), TAO_DDS_DCPSInfo_i::changeOwnership(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::ParticipantUpdate::domain, OpenDDS::Federator::ParticipantUpdate::id, info_, OpenDDS::Federator::ParticipantUpdate::owner, processDeferred(), OpenDDS::Federator::ParticipantUpdate::qos, and OpenDDS::Federator::ParticipantUpdate::sender.

00698 {
00699   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00700     OpenDDS::DCPS::RepoIdConverter converter(sample->id);
00701     ACE_DEBUG((LM_DEBUG,
00702                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( ParticipantUpdate): ")
00703                ACE_TEXT("repo %d - [ domain %d/ participant %C/ owner %d ]\n"),
00704                this->id().id(),
00705                sample->domain,
00706                std::string(converter).c_str(),
00707                sample->owner));
00708   }
00709 
00710   this->info_->add_domain_participant(
00711     sample->domain,
00712     sample->id,
00713     sample->qos);
00714   bool ownershipChanged = this->info_->changeOwnership(
00715     sample->domain,
00716     sample->id,
00717     sample->sender,
00718     sample->owner);
00719   if (!ownershipChanged) {
00720     ACE_ERROR((LM_ERROR,
00721                ACE_TEXT("(%P|%t) ERROR: ")
00722                ACE_TEXT("OpenDDS::Federator::ManagerImpl::processCreate(), ")
00723                ACE_TEXT("Could not change ownership\n")));
00724   }
00725   this->processDeferred();
00726 }

void OpenDDS::Federator::ManagerImpl::processCreate ( const SubscriptionUpdate sample,
const DDS::SampleInfo info 
)

Create a proxy for a new subscription.

Definition at line 653 of file FederatorManagerImpl_updates.cpp.

References OpenDDS::Federator::SubscriptionUpdate::callback, OpenDDS::Federator::SubscriptionUpdate::datareader_qos, OpenDDS::DCPS::DCPS_debug_level, deferredSubscriptions_, OpenDDS::Federator::SubscriptionUpdate::domain, OpenDDS::Federator::SubscriptionUpdate::expression_params, OpenDDS::Federator::SubscriptionUpdate::filter_class_name, OpenDDS::Federator::SubscriptionUpdate::filter_expression, OpenDDS::Federator::SubscriptionUpdate::id, OpenDDS::Federator::SubscriptionUpdate::participant, processDeferred(), OpenDDS::Federator::SubscriptionUpdate::subscriber_qos, OpenDDS::Federator::SubscriptionUpdate::topic, and OpenDDS::Federator::SubscriptionUpdate::transport_info.

00654 {
00655   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00656     OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
00657     OpenDDS::DCPS::RepoIdConverter sub_converter(sample->id);
00658     ACE_DEBUG((LM_DEBUG,
00659                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( SubscriptionUpdate): ")
00660                ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
00661                this->id().id(),
00662                sample->domain,
00663                std::string(part_converter).c_str(),
00664                std::string(sub_converter).c_str()));
00665   }
00666 
00667   if (false == this->info_->add_subscription(sample->domain,
00668                                              sample->participant,
00669                                              sample->topic,
00670                                              sample->id,
00671                                              sample->callback,
00672                                              sample->datareader_qos,
00673                                              sample->transport_info,
00674                                              sample->subscriber_qos,
00675                                              sample->filter_class_name,
00676                                              sample->filter_expression,
00677                                              sample->expression_params,
00678                                              true)) {
00679     {
00680       ACE_GUARD(ACE_Thread_Mutex,
00681                 guard,
00682                 this->deferred_lock_);
00683       this->deferredSubscriptions_.push_back(*sample);
00684     }
00685 
00686     if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00687       ACE_DEBUG((LM_DEBUG,
00688                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( SubscriptionUpdate): ")
00689                  ACE_TEXT("deferred update.\n")));
00690     }
00691   }
00692 
00693   this->processDeferred();
00694 }

void OpenDDS::Federator::ManagerImpl::processCreate ( const PublicationUpdate sample,
const DDS::SampleInfo info 
)

Create a proxy for a new publication.

Definition at line 612 of file FederatorManagerImpl_updates.cpp.

References OpenDDS::Federator::PublicationUpdate::callback, OpenDDS::Federator::PublicationUpdate::datawriter_qos, OpenDDS::DCPS::DCPS_debug_level, deferredPublications_, OpenDDS::Federator::PublicationUpdate::domain, OpenDDS::Federator::PublicationUpdate::id, OpenDDS::Federator::PublicationUpdate::participant, processDeferred(), OpenDDS::Federator::PublicationUpdate::publisher_qos, OpenDDS::Federator::PublicationUpdate::topic, and OpenDDS::Federator::PublicationUpdate::transport_info.

00613 {
00614   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00615     OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
00616     OpenDDS::DCPS::RepoIdConverter pub_converter(sample->id);
00617     ACE_DEBUG((LM_DEBUG,
00618                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( PublicationUpdate): ")
00619                ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"),
00620                this->id().id(),
00621                sample->domain,
00622                std::string(part_converter).c_str(),
00623                std::string(pub_converter).c_str()));
00624   }
00625 
00626   if (false == this->info_->add_publication(sample->domain,
00627                                             sample->participant,
00628                                             sample->topic,
00629                                             sample->id,
00630                                             sample->callback,
00631                                             sample->datawriter_qos,
00632                                             sample->transport_info,
00633                                             sample->publisher_qos,
00634                                             true)) {
00635     {
00636       ACE_GUARD(ACE_Thread_Mutex,
00637                 guard,
00638                 this->deferred_lock_);
00639       this->deferredPublications_.push_back(*sample);
00640     }
00641 
00642     if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00643       ACE_DEBUG((LM_DEBUG,
00644                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( PublicationUpdate): ")
00645                  ACE_TEXT("deferred update.\n")));
00646     }
00647   }
00648 
00649   this->processDeferred();
00650 }

void OpenDDS::Federator::ManagerImpl::processCreate ( const OwnerUpdate sample,
const DDS::SampleInfo info 
)

Null implementation for OwnerUpdate samples.

Definition at line 575 of file FederatorManagerImpl_updates.cpp.

References OpenDDS::DCPS::DCPS_debug_level, deferredOwnerships_, OpenDDS::Federator::OwnerUpdate::domain, OpenDDS::Federator::OwnerUpdate::owner, OpenDDS::Federator::OwnerUpdate::participant, processDeferred(), and OpenDDS::Federator::OwnerUpdate::sender.

Referenced by initializeOwner(), initializeParticipant(), initializePublication(), initializeSubscription(), and initializeTopic().

00576 {
00577   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00578     OpenDDS::DCPS::RepoIdConverter converter(sample->participant);
00579     ACE_DEBUG((LM_DEBUG,
00580                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( OwnerUpdate): ")
00581                ACE_TEXT("repo %d - [ domain %d/ participant %C/ sender %d/ owner %d ]\n"),
00582                this->id().id(),
00583                sample->domain,
00584                std::string(converter).c_str(),
00585                sample->sender,
00586                sample->owner));
00587   }
00588 
00589   // We could generate an error message here.  Instead we let action be irrelevant.
00590   if (false == this->info_->changeOwnership(sample->domain,
00591                                             sample->participant,
00592                                             sample->sender,
00593                                             sample->owner)) {
00594     {
00595       ACE_GUARD(ACE_Thread_Mutex,
00596                 guard,
00597                 this->deferred_lock_);
00598       this->deferredOwnerships_.push_back(*sample);
00599     }
00600 
00601     if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00602       ACE_DEBUG((LM_DEBUG,
00603                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( OwnerUpdate): ")
00604                  ACE_TEXT("deferred update.\n")));
00605     }
00606   }
00607 
00608   this->processDeferred();
00609 }

void OpenDDS::Federator::ManagerImpl::processDeferred (  ) 

Handle any deferred updates that might have become processable.

Definition at line 767 of file FederatorManagerImpl_updates.cpp.

References OpenDDS::DCPS::DCPS_debug_level, deferredOwnerships_, deferredPublications_, deferredSubscriptions_, and deferredTopics_.

Referenced by processCreate().

00768 {
00769   ACE_GUARD(ACE_Thread_Mutex,
00770             guard,
00771             this->deferred_lock_);
00772 
00773   {
00774     std::list<OwnerUpdate>::iterator current = this->deferredOwnerships_.begin();
00775 
00776     while (current != this->deferredOwnerships_.end()) {
00777       if (this->info_->changeOwnership(current->domain,
00778                                        current->participant,
00779                                        current->sender,
00780                                        current->owner)) {
00781         if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00782           OpenDDS::DCPS::RepoIdConverter converter(current->participant);
00783           ACE_DEBUG((LM_DEBUG,
00784                      ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDeferred( OwnerUpdate): ")
00785                      ACE_TEXT("repo %d - [ domain %d/ participant %C/ sender %d/ owner %d ]\n"),
00786                      this->id().id(),
00787                      current->domain,
00788                      std::string(converter).c_str(),
00789                      current->sender,
00790                      current->owner));
00791         }
00792 
00793         current = this->deferredOwnerships_.erase(current);
00794 
00795       } else {
00796         ++ current;
00797       }
00798     }
00799   }
00800 
00801   {
00802     std::list<TopicUpdate>::iterator current = this->deferredTopics_.begin();
00803 
00804     while (current != this->deferredTopics_.end()) {
00805       if (true == this->info_->add_topic(current->id,
00806                                          current->domain,
00807                                          current->participant,
00808                                          current->topic,
00809                                          current->datatype,
00810                                          current->qos)) {
00811         if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00812           OpenDDS::DCPS::RepoIdConverter part_converter(current->participant);
00813           OpenDDS::DCPS::RepoIdConverter topic_converter(current->id);
00814           ACE_DEBUG((LM_DEBUG,
00815                      ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDeferred( TopicUpdate): ")
00816                      ACE_TEXT("repo %d - [ domain %d/ participant %C/ topic %C ]\n"),
00817                      this->id().id(),
00818                      current->domain,
00819                      std::string(part_converter).c_str(),
00820                      std::string(topic_converter).c_str()));
00821         }
00822 
00823         current = this->deferredTopics_.erase(current);
00824 
00825       } else {
00826         ++ current;
00827       }
00828     }
00829   }
00830 
00831   {
00832     std::list<PublicationUpdate>::iterator current = this->deferredPublications_.begin();
00833 
00834     while (current != this->deferredPublications_.end()) {
00835 
00836       if (true == this->info_->add_publication(current->domain,
00837                                                current->participant,
00838                                                current->topic,
00839                                                current->id,
00840                                                current->callback,
00841                                                current->datawriter_qos,
00842                                                current->transport_info,
00843                                                current->publisher_qos,
00844                                                true)) {
00845         if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00846           OpenDDS::DCPS::RepoIdConverter part_converter(current->participant);
00847           OpenDDS::DCPS::RepoIdConverter pub_converter(current->id);
00848           ACE_DEBUG((LM_DEBUG,
00849                      ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDeferred( PublicationUpdate): ")
00850                      ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"),
00851                      this->id().id(),
00852                      current->domain,
00853                      std::string(part_converter).c_str(),
00854                      std::string(pub_converter).c_str()));
00855         }
00856 
00857         current = this->deferredPublications_.erase(current);
00858 
00859       } else {
00860         ++ current;
00861       }
00862     }
00863   }
00864 
00865   {
00866     std::list<SubscriptionUpdate>::iterator current = this->deferredSubscriptions_.begin();
00867 
00868     while (current != this->deferredSubscriptions_.end()) {
00869 
00870       if (true == this->info_->add_subscription(current->domain,
00871                                                 current->participant,
00872                                                 current->topic,
00873                                                 current->id,
00874                                                 current->callback,
00875                                                 current->datareader_qos,
00876                                                 current->transport_info,
00877                                                 current->subscriber_qos,
00878                                                 current->filter_class_name,
00879                                                 current->filter_expression,
00880                                                 current->expression_params,
00881                                                 true)) {
00882         if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00883           OpenDDS::DCPS::RepoIdConverter part_converter(current->participant);
00884           OpenDDS::DCPS::RepoIdConverter sub_converter(current->id);
00885           ACE_DEBUG((LM_DEBUG,
00886                      ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDeferred( SubscriptionUpdate): ")
00887                      ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
00888                      this->id().id(),
00889                      current->domain,
00890                      std::string(part_converter).c_str(),
00891                      std::string(sub_converter).c_str()));
00892         }
00893 
00894         current = this->deferredSubscriptions_.erase(current);
00895 
00896       } else {
00897         ++ current;
00898       }
00899     }
00900   }
00901 
00902 }

void OpenDDS::Federator::ManagerImpl::processDelete ( const TopicUpdate sample,
const DDS::SampleInfo info 
)

Delete a proxy for a topic.

Definition at line 1206 of file FederatorManagerImpl_updates.cpp.

References OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::TopicUpdate::domain, OpenDDS::Federator::TopicUpdate::id, info_, OpenDDS::Federator::TopicUpdate::participant, and TAO_DDS_DCPSInfo_i::remove_topic().

01207 {
01208   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01209     OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
01210     OpenDDS::DCPS::RepoIdConverter topic_converter(sample->id);
01211     ACE_DEBUG((LM_DEBUG,
01212                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( TopicUpdate): ")
01213                ACE_TEXT("repo %d - [ domain %d/ participant %C/ topic %C ]\n"),
01214                this->id().id(),
01215                sample->domain,
01216                std::string(part_converter).c_str(),
01217                std::string(topic_converter).c_str()));
01218   }
01219 
01220   try {
01221     this->info_->remove_topic(
01222       sample->domain,
01223       sample->participant,
01224       sample->id);
01225 
01226   } catch (OpenDDS::DCPS::Invalid_Participant&) {
01227     if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01228       ACE_DEBUG((LM_DEBUG,
01229                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( TopicUpdate): ")
01230                  ACE_TEXT("the participant was already removed.\n")));
01231     }
01232   }
01233 }

void OpenDDS::Federator::ManagerImpl::processDelete ( const ParticipantUpdate sample,
const DDS::SampleInfo info 
)

Delete a proxy for a participant.

Definition at line 1181 of file FederatorManagerImpl_updates.cpp.

References OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::ParticipantUpdate::domain, OpenDDS::Federator::ParticipantUpdate::id, info_, and TAO_DDS_DCPSInfo_i::remove_domain_participant().

01182 {
01183   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01184     OpenDDS::DCPS::RepoIdConverter converter(sample->id);
01185     ACE_DEBUG((LM_DEBUG,
01186                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( ParticipantUpdate): ")
01187                ACE_TEXT("repo %d - [ domain %d/ participant %C ]\n"),
01188                this->id().id(),
01189                sample->domain,
01190                std::string(converter).c_str()));
01191   }
01192   try {
01193     this->info_->remove_domain_participant(
01194       sample->domain,
01195       sample->id);
01196   } catch (OpenDDS::DCPS::Invalid_Participant&) {
01197     if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01198       ACE_DEBUG((LM_DEBUG,
01199                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( ParticipantUpdate): ")
01200                  ACE_TEXT("the participant was already removed.\n")));
01201     }
01202   }
01203 }

void OpenDDS::Federator::ManagerImpl::processDelete ( const SubscriptionUpdate sample,
const DDS::SampleInfo info 
)

Delete a proxy for a subscription.

Definition at line 1151 of file FederatorManagerImpl_updates.cpp.

References OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::SubscriptionUpdate::domain, OpenDDS::Federator::SubscriptionUpdate::id, info_, OpenDDS::Federator::SubscriptionUpdate::participant, and TAO_DDS_DCPSInfo_i::remove_subscription().

01152 {
01153   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01154     OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
01155     OpenDDS::DCPS::RepoIdConverter sub_converter(sample->id);
01156     ACE_DEBUG((LM_DEBUG,
01157                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( SubscriptionUpdate): ")
01158                ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
01159                this->id().id(),
01160                sample->domain,
01161                std::string(part_converter).c_str(),
01162                std::string(sub_converter).c_str()));
01163   }
01164 
01165   try {
01166     this->info_->remove_subscription(
01167       sample->domain,
01168       sample->participant,
01169       sample->id);
01170 
01171   } catch (OpenDDS::DCPS::Invalid_Participant&) {
01172     if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01173       ACE_DEBUG((LM_DEBUG,
01174                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( SubscriptionUpdate): ")
01175                  ACE_TEXT("the participant was already removed.\n")));
01176     }
01177   }
01178 }

void OpenDDS::Federator::ManagerImpl::processDelete ( const PublicationUpdate sample,
const DDS::SampleInfo info 
)

Delete a proxy for a publication.

Definition at line 1121 of file FederatorManagerImpl_updates.cpp.

References OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::PublicationUpdate::domain, OpenDDS::Federator::PublicationUpdate::id, info_, OpenDDS::Federator::PublicationUpdate::participant, and TAO_DDS_DCPSInfo_i::remove_publication().

01122 {
01123   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01124     OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
01125     OpenDDS::DCPS::RepoIdConverter pub_converter(sample->id);
01126     ACE_DEBUG((LM_DEBUG,
01127                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( PublicationUpdate): ")
01128                ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"),
01129                this->id().id(),
01130                sample->domain,
01131                std::string(part_converter).c_str(),
01132                std::string(pub_converter).c_str()));
01133   }
01134 
01135   try {
01136     this->info_->remove_publication(
01137       sample->domain,
01138       sample->participant,
01139       sample->id);
01140 
01141   } catch (OpenDDS::DCPS::Invalid_Participant&) {
01142     if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01143       ACE_DEBUG((LM_DEBUG,
01144                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( PublicationUpdate): ")
01145                  ACE_TEXT("the participant was already removed.\n")));
01146     }
01147   }
01148 }

void OpenDDS::Federator::ManagerImpl::processDelete ( const OwnerUpdate sample,
const DDS::SampleInfo info 
)

Null implementation for OwnerUpdate samples.

Definition at line 1089 of file FederatorManagerImpl_updates.cpp.

References OpenDDS::DCPS::DCPS_debug_level, deferredOwnerships_, OpenDDS::Federator::OwnerUpdate::domain, OpenDDS::Federator::OwnerUpdate::owner, OpenDDS::Federator::OwnerUpdate::participant, and OpenDDS::Federator::OwnerUpdate::sender.

01090 {
01091   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01092     OpenDDS::DCPS::RepoIdConverter converter(sample->participant);
01093     ACE_DEBUG((LM_DEBUG,
01094                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( OwnerUpdate): ")
01095                ACE_TEXT("repo %d - [ domain %d/ participant %C/ sender %d/ owner %d ]\n"),
01096                this->id().id(),
01097                sample->domain,
01098                std::string(converter).c_str(),
01099                sample->sender,
01100                sample->owner));
01101   }
01102 
01103   // We could generate an error message here.  Instead we let action be irrelevant.
01104   if (false == this->info_->changeOwnership(sample->domain,
01105                                             sample->participant,
01106                                             sample->sender,
01107                                             sample->owner)) {
01108     {
01109       ACE_GUARD(ACE_Thread_Mutex,
01110                 guard,
01111                 this->deferred_lock_);
01112       this->deferredOwnerships_.push_back(*sample);
01113     }
01114     ACE_DEBUG((LM_DEBUG,
01115                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( OwnerUpdate): ")
01116                ACE_TEXT("deferred update.\n")));
01117   }
01118 }

void OpenDDS::Federator::ManagerImpl::processUpdateFilterExpressionParams ( const SubscriptionUpdate sample,
const DDS::SampleInfo info 
)

Update the proxy filter expression params for a subscription.

Definition at line 1025 of file FederatorManagerImpl_updates.cpp.

References OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::SubscriptionUpdate::domain, OpenDDS::Federator::SubscriptionUpdate::expression_params, OpenDDS::Federator::SubscriptionUpdate::id, info_, OpenDDS::Federator::SubscriptionUpdate::participant, and TAO_DDS_DCPSInfo_i::update_subscription_params().

01027 {
01028   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01029     OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
01030     OpenDDS::DCPS::RepoIdConverter sub_converter(sample->id);
01031     ACE_DEBUG((LM_DEBUG,
01032                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateFilterExpressionParams(SubscriptionUpdate): ")
01033                ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
01034                this->id().id(),
01035                sample->domain,
01036                std::string(part_converter).c_str(),
01037                std::string(sub_converter).c_str()));
01038   }
01039 
01040   this->info_->update_subscription_params(
01041     sample->domain,
01042     sample->participant,
01043     sample->id,
01044     sample->expression_params);
01045 }

void OpenDDS::Federator::ManagerImpl::processUpdateQos1 ( const TopicUpdate sample,
const DDS::SampleInfo info 
)

Update the proxy TopicQos for a topic.

Definition at line 1067 of file FederatorManagerImpl_updates.cpp.

References OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::TopicUpdate::domain, OpenDDS::Federator::TopicUpdate::id, info_, OpenDDS::Federator::TopicUpdate::participant, OpenDDS::Federator::TopicUpdate::qos, and TAO_DDS_DCPSInfo_i::update_topic_qos().

01068 {
01069   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01070     OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
01071     OpenDDS::DCPS::RepoIdConverter topic_converter(sample->id);
01072     ACE_DEBUG((LM_DEBUG,
01073                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos1( TopicUpdate): ")
01074                ACE_TEXT("repo %d - [ domain %d/ participant %C/ topic %C ]\n"),
01075                this->id().id(),
01076                sample->domain,
01077                std::string(part_converter).c_str(),
01078                std::string(topic_converter).c_str()));
01079   }
01080 
01081   this->info_->update_topic_qos(
01082     sample->id,
01083     sample->domain,
01084     sample->participant,
01085     sample->qos);
01086 }

void OpenDDS::Federator::ManagerImpl::processUpdateQos1 ( const ParticipantUpdate sample,
const DDS::SampleInfo info 
)

Update the proxy ParticipantQos for a participant.

Definition at line 1048 of file FederatorManagerImpl_updates.cpp.

References OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::ParticipantUpdate::domain, OpenDDS::Federator::ParticipantUpdate::id, info_, OpenDDS::Federator::ParticipantUpdate::qos, and TAO_DDS_DCPSInfo_i::update_domain_participant_qos().

01049 {
01050   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01051     OpenDDS::DCPS::RepoIdConverter converter(sample->id);
01052     ACE_DEBUG((LM_DEBUG,
01053                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos1( ParticipantUpdate): ")
01054                ACE_TEXT("repo %d - [ domain %d/ participant %C ]\n"),
01055                this->id().id(),
01056                sample->domain,
01057                std::string(converter).c_str()));
01058   }
01059 
01060   this->info_->update_domain_participant_qos(
01061     sample->domain,
01062     sample->id,
01063     sample->qos);
01064 }

void OpenDDS::Federator::ManagerImpl::processUpdateQos1 ( const SubscriptionUpdate sample,
const DDS::SampleInfo info 
)

Update the proxy DataReaderQos for a subscription.

Definition at line 981 of file FederatorManagerImpl_updates.cpp.

References OpenDDS::Federator::SubscriptionUpdate::datareader_qos, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::SubscriptionUpdate::domain, OpenDDS::Federator::SubscriptionUpdate::id, info_, OpenDDS::Federator::SubscriptionUpdate::participant, and TAO_DDS_DCPSInfo_i::update_subscription_qos().

00982 {
00983   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00984     OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
00985     OpenDDS::DCPS::RepoIdConverter sub_converter(sample->id);
00986     ACE_DEBUG((LM_DEBUG,
00987                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos1( SubscriptionUpdate): ")
00988                ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
00989                this->id().id(),
00990                sample->domain,
00991                std::string(part_converter).c_str(),
00992                std::string(sub_converter).c_str()));
00993   }
00994 
00995   this->info_->update_subscription_qos(
00996     sample->domain,
00997     sample->participant,
00998     sample->id,
00999     sample->datareader_qos);
01000 }

void OpenDDS::Federator::ManagerImpl::processUpdateQos1 ( const PublicationUpdate sample,
const DDS::SampleInfo info 
)

Update the proxy DataWriterQos for a publication.

Definition at line 937 of file FederatorManagerImpl_updates.cpp.

References OpenDDS::Federator::PublicationUpdate::datawriter_qos, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::PublicationUpdate::domain, OpenDDS::Federator::PublicationUpdate::id, info_, OpenDDS::Federator::PublicationUpdate::participant, and TAO_DDS_DCPSInfo_i::update_publication_qos().

00938 {
00939   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00940     OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
00941     OpenDDS::DCPS::RepoIdConverter pub_converter(sample->id);
00942     ACE_DEBUG((LM_DEBUG,
00943                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos1( PublicationUpdate): ")
00944                ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"),
00945                this->id().id(),
00946                sample->domain,
00947                std::string(part_converter).c_str(),
00948                std::string(pub_converter).c_str()));
00949   }
00950 
00951   this->info_->update_publication_qos(
00952     sample->domain,
00953     sample->participant,
00954     sample->id,
00955     sample->datawriter_qos);
00956 }

void OpenDDS::Federator::ManagerImpl::processUpdateQos1 ( const OwnerUpdate sample,
const DDS::SampleInfo info 
)

Process ownership changes.

Definition at line 905 of file FederatorManagerImpl_updates.cpp.

References OpenDDS::DCPS::DCPS_debug_level, deferredOwnerships_, OpenDDS::Federator::OwnerUpdate::domain, OpenDDS::Federator::OwnerUpdate::owner, OpenDDS::Federator::OwnerUpdate::participant, and OpenDDS::Federator::OwnerUpdate::sender.

00906 {
00907   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00908     OpenDDS::DCPS::RepoIdConverter converter(sample->participant);
00909     ACE_DEBUG((LM_DEBUG,
00910                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos1( OwnerUpdate): ")
00911                ACE_TEXT("repo %d - [ domain %d/ participant %C/ sender %d/ owner %d ]\n"),
00912                this->id().id(),
00913                sample->domain,
00914                std::string(converter).c_str(),
00915                sample->sender,
00916                sample->owner));
00917   }
00918 
00919   if (false == this->info_->changeOwnership(sample->domain,
00920                                             sample->participant,
00921                                             sample->sender,
00922                                             sample->owner)) {
00923     {
00924       ACE_GUARD(ACE_Thread_Mutex,
00925                 guard,
00926                 this->deferred_lock_);
00927 
00928       this->deferredOwnerships_.push_back(*sample);
00929     }
00930     ACE_DEBUG((LM_DEBUG,
00931                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos1( OwnerUpdate): ")
00932                ACE_TEXT("deferred update.\n")));
00933   }
00934 }

void OpenDDS::Federator::ManagerImpl::processUpdateQos2 ( const SubscriptionUpdate sample,
const DDS::SampleInfo info 
)

Update the proxy SubscriberQos for a subscription.

Definition at line 1003 of file FederatorManagerImpl_updates.cpp.

References OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::SubscriptionUpdate::domain, OpenDDS::Federator::SubscriptionUpdate::id, info_, OpenDDS::Federator::SubscriptionUpdate::participant, OpenDDS::Federator::SubscriptionUpdate::subscriber_qos, and TAO_DDS_DCPSInfo_i::update_subscription_qos().

01004 {
01005   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01006     OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
01007     OpenDDS::DCPS::RepoIdConverter sub_converter(sample->id);
01008     ACE_DEBUG((LM_DEBUG,
01009                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos2( SubscriptionUpdate): ")
01010                ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
01011                this->id().id(),
01012                sample->domain,
01013                std::string(part_converter).c_str(),
01014                std::string(sub_converter).c_str()));
01015   }
01016 
01017   this->info_->update_subscription_qos(
01018     sample->domain,
01019     sample->participant,
01020     sample->id,
01021     sample->subscriber_qos);
01022 }

void OpenDDS::Federator::ManagerImpl::processUpdateQos2 ( const PublicationUpdate sample,
const DDS::SampleInfo info 
)

Update the proxy PublisherQos for a publication.

Definition at line 959 of file FederatorManagerImpl_updates.cpp.

References OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::PublicationUpdate::domain, OpenDDS::Federator::PublicationUpdate::id, info_, OpenDDS::Federator::PublicationUpdate::participant, OpenDDS::Federator::PublicationUpdate::publisher_qos, and TAO_DDS_DCPSInfo_i::update_publication_qos().

00960 {
00961   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00962     OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
00963     OpenDDS::DCPS::RepoIdConverter pub_converter(sample->id);
00964     ACE_DEBUG((LM_DEBUG,
00965                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos2( PublicationUpdate): ")
00966                ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"),
00967                this->id().id(),
00968                sample->domain,
00969                std::string(part_converter).c_str(),
00970                std::string(pub_converter).c_str()));
00971   }
00972 
00973   this->info_->update_publication_qos(
00974     sample->domain,
00975     sample->participant,
00976     sample->id,
00977     sample->publisher_qos);
00978 }

void OpenDDS::Federator::ManagerImpl::pushState ( Manager_ptr  peer  ) 

Push our current state to a remote repository.

Definition at line 1236 of file FederatorManagerImpl_updates.cpp.

References OpenDDS::Federator::OwnerUpdate::action, OpenDDS::Federator::ParticipantUpdate::action, config_, OpenDDS::Federator::CreateEntity, OpenDDS::Federator::OwnerUpdate::domain, OpenDDS::Federator::ParticipantUpdate::domain, TAO_DDS_DCPSInfo_i::domains(), OpenDDS::Federator::Config::federationDomain(), DCPS_IR_Subscription::get_datareader_qos(), DCPS_IR_Publication::get_datawriter_qos(), DCPS_IR_Subscription::get_expr_params(), DCPS_IR_Subscription::get_filter_expression(), DCPS_IR_Subscription::get_id(), DCPS_IR_Publication::get_id(), DCPS_IR_Subscription::get_participant_id(), DCPS_IR_Publication::get_participant_id(), DCPS_IR_Publication::get_publisher_qos(), DCPS_IR_Subscription::get_subscriber_qos(), DCPS_IR_Subscription::get_topic_id(), DCPS_IR_Publication::get_topic_id(), DCPS_IR_Subscription::get_transportLocatorSeq(), DCPS_IR_Publication::get_transportLocatorSeq(), OpenDDS::Federator::ParticipantUpdate::id, TAO_DDS_DCPSFederationId::id(), id(), info_, TAO_DDS_DCPSInfo_i::orb(), orb(), OpenDDS::Federator::OwnerUpdate::owner, OpenDDS::Federator::ParticipantUpdate::owner, OpenDDS::Federator::OwnerUpdate::participant, OpenDDS::Federator::ParticipantUpdate::qos, DCPS_IR_Subscription::reader(), OpenDDS::Federator::OwnerUpdate::sender, OpenDDS::Federator::ParticipantUpdate::sender, and DCPS_IR_Publication::writer().

Referenced by join_federation().

01237 {
01238   // foreach DCPS_IR_Domain
01239   //   foreach DCPS_IR_Participant
01240   //     peer->initializeParticipant(...)
01241   //     peer->initializeOwner(...)
01242   //   foreach DCPS_IR_Participant
01243   //     foreach DCPS_IR_Topic
01244   //       peer->initializeTopic(...)
01245   //     foreach DCPS_IR_Publication
01246   //       peer->initializePublication(...)
01247   //     foreach DCPS_IR_Subscription
01248   //       peer->initializeSubscription(...)
01249 
01250   // Process each domain within the repository.
01251   for (DCPS_IR_Domain_Map::const_iterator currentDomain
01252        = this->info_->domains().begin();
01253        currentDomain != this->info_->domains().end();
01254        ++currentDomain) {
01255 
01256     if (currentDomain->second->get_id() == this->config_.federationDomain()) {
01257       // Do not push the Federation domain publications.
01258       //continue;
01259     }
01260 
01261     // Process each participant within the current domain.
01262     for (DCPS_IR_Participant_Map::const_iterator currentParticipant
01263          = currentDomain->second->participants().begin();
01264          currentParticipant != currentDomain->second->participants().end();
01265          ++currentParticipant) {
01266 
01267       if (currentParticipant->second->isBitPublisher() == true) {
01268         // Do not push the built-in topic publications.
01269         continue;
01270       }
01271 
01272       // Initialize the participant on the peer.
01273       ParticipantUpdate participantSample;
01274       participantSample.sender = this->id().id();
01275       participantSample.action = CreateEntity;
01276 
01277       participantSample.owner  =  currentParticipant->second->owner();
01278       participantSample.domain =  currentDomain->second->get_id();
01279       participantSample.id     =  currentParticipant->second->get_id();
01280       participantSample.qos    = *currentParticipant->second->get_qos();
01281 
01282       peer->initializeParticipant(participantSample);
01283 
01284       // Initialize the ownership of the participant on the peer.
01285       OwnerUpdate ownerSample;
01286       ownerSample.sender      = this->id().id();
01287       ownerSample.action      = CreateEntity;
01288 
01289       ownerSample.domain      = currentDomain->second->get_id();
01290       ownerSample.participant = currentParticipant->second->get_id();
01291       ownerSample.owner       = currentParticipant->second->owner();
01292 
01293       peer->initializeOwner(ownerSample);
01294     }
01295 
01296     // Process each participant within the current domain.
01297     for (DCPS_IR_Participant_Map::const_iterator currentParticipant
01298          = currentDomain->second->participants().begin();
01299          currentParticipant != currentDomain->second->participants().end();
01300          ++currentParticipant) {
01301 
01302       if (currentParticipant->second->isBitPublisher() == true) {
01303         // Do not push the built-in topic publications.
01304         continue;
01305       }
01306 
01307       // Process each topic within the current particpant.
01308       for (DCPS_IR_Topic_Map::const_iterator currentTopic
01309            = currentParticipant->second->topics().begin();
01310            currentTopic != currentParticipant->second->topics().end();
01311            ++currentTopic) {
01312         TopicUpdate topicSample;
01313         topicSample.sender      = this->id().id();
01314         topicSample.action      = CreateEntity;
01315 
01316         topicSample.id          = currentTopic->second->get_id();
01317         topicSample.domain      = currentDomain->second->get_id();
01318         topicSample.participant = currentTopic->second->get_participant_id();
01319         topicSample.topic       = currentTopic->second->get_topic_description()->get_name();
01320         topicSample.datatype    = currentTopic->second->get_topic_description()->get_dataTypeName();
01321         topicSample.qos         = *currentTopic->second->get_topic_qos();
01322 
01323         peer->initializeTopic(topicSample);
01324       }
01325 
01326       // Process each publication within the current particpant.
01327       for (DCPS_IR_Publication_Map::const_iterator currentPublication
01328            = currentParticipant->second->publications().begin();
01329            currentPublication != currentParticipant->second->publications().end();
01330            ++currentPublication) {
01331         PublicationUpdate publicationSample;
01332         publicationSample.sender         = this->id().id();
01333         publicationSample.action         = CreateEntity;
01334 
01335         DCPS_IR_Publication* p = currentPublication->second;
01336         CORBA::ORB_var orb = this->info_->orb();
01337         CORBA::String_var callback = orb->object_to_string(p->writer());
01338 
01339         publicationSample.domain         = currentDomain->second->get_id();
01340         publicationSample.participant    = p->get_participant_id();
01341         publicationSample.topic          = p->get_topic_id();
01342         publicationSample.id             = p->get_id();
01343         publicationSample.callback       = callback.in();
01344         publicationSample.datawriter_qos = *p->get_datawriter_qos();
01345         publicationSample.publisher_qos  = *p->get_publisher_qos();
01346         publicationSample.transport_info = p->get_transportLocatorSeq();
01347 
01348         peer->initializePublication(publicationSample);
01349       }
01350 
01351       // Process each subscription within the current particpant.
01352       for (DCPS_IR_Subscription_Map::const_iterator currentSubscription
01353            = currentParticipant->second->subscriptions().begin();
01354            currentSubscription != currentParticipant->second->subscriptions().end();
01355            ++currentSubscription) {
01356         SubscriptionUpdate subscriptionSample;
01357         subscriptionSample.sender         = this->id().id();
01358         subscriptionSample.action         = CreateEntity;
01359 
01360         DCPS_IR_Subscription* s = currentSubscription->second;
01361         CORBA::ORB_var orb = this->info_->orb();
01362         CORBA::String_var callback = orb->object_to_string(s->reader());
01363 
01364         subscriptionSample.domain         = currentDomain->second->get_id();
01365         subscriptionSample.participant    = s->get_participant_id();
01366         subscriptionSample.topic          = s->get_topic_id();
01367         subscriptionSample.id             = s->get_id();
01368         subscriptionSample.callback       = callback.in();
01369         subscriptionSample.datareader_qos = *s->get_datareader_qos();
01370         subscriptionSample.subscriber_qos = *s->get_subscriber_qos();
01371         subscriptionSample.transport_info = s->get_transportLocatorSeq();
01372         subscriptionSample.filter_expression = s->get_filter_expression().c_str();
01373         subscriptionSample.expression_params = s->get_expr_params();
01374 
01375         peer->initializeSubscription(subscriptionSample);
01376       }
01377     }
01378   }
01379 }

OpenDDS::DCPS::DCPSInfo_ptr OpenDDS::Federator::ManagerImpl::repository (  )  [virtual]

Definition at line 876 of file FederatorManagerImpl.cpp.

References OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::RcHandle< T >::is_nil(), and TheServiceParticipant.

00877 {
00878   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00879     ACE_DEBUG((LM_DEBUG,
00880                ACE_TEXT("(%P|%t) ManagerImpl::repository()\n")));
00881   }
00882 
00883   OpenDDS::DCPS::Discovery_rch disco
00884   = TheServiceParticipant->get_discovery(
00885       this->config_.federationDomain());
00886   OpenDDS::DCPS::DCPSInfo_var repo;
00887   if (!disco.is_nil()) {
00888     OpenDDS::DCPS::InfoRepoDiscovery_rch irDisco =
00889       DCPS::static_rchandle_cast<DCPS::InfoRepoDiscovery>(disco);
00890     repo = irDisco->get_dcps_info();
00891   }
00892 
00893   if (CORBA::is_nil(repo.in())) {
00894     return OpenDDS::DCPS::DCPSInfo::_duplicate(this->localRepo_.in());
00895 
00896   } else {
00897     return OpenDDS::DCPS::DCPSInfo::_duplicate(repo.in());
00898   }
00899 }

void OpenDDS::Federator::ManagerImpl::requestImage (  )  [virtual]

Implements Update::Updater.

Definition at line 26 of file FederatorManagerImpl_updates.cpp.

00027 {
00028   /* This method intentionally left unimplemented. */
00029 }

void OpenDDS::Federator::ManagerImpl::shutdown (  )  [virtual]

Definition at line 1115 of file FederatorManagerImpl.cpp.

References federated_, info_, and TAO_DDS_DCPSInfo_i::shutdown().

01117 {
01118   // Prevent the removal of this repository from the federation during
01119   // shutdown processing.
01120   this->federated_ = false;
01121 
01122   // Shutdown the process via the repository object.
01123   this->info_->shutdown();
01124 }

void OpenDDS::Federator::ManagerImpl::unregisterCallback (  )  [virtual]

Definition at line 20 of file FederatorManagerImpl_updates.cpp.

00021 {
00022   /* This method intentionally left unimplemented. */
00023 }

void OpenDDS::Federator::ManagerImpl::update ( const Update::IdPath id,
const DDS::StringSeq exprParams 
) [virtual]

Implements Update::Updater.

Definition at line 506 of file FederatorManagerImpl_updates.cpp.

References OpenDDS::Federator::SubscriptionUpdate::action, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::SubscriptionUpdate::domain, OpenDDS::Federator::SubscriptionUpdate::expression_params, DDS::HANDLE_NIL, OpenDDS::Federator::SubscriptionUpdate::id, TAO_DDS_DCPSFederationId::id(), id(), OpenDDS::Federator::SubscriptionUpdate::participant, OpenDDS::Federator::SubscriptionUpdate::sender, subscriptionWriter_, and OpenDDS::Federator::UpdateFilterExpressionParams.

00507 {
00508   if (CORBA::is_nil(this->subscriptionWriter_.in())) {
00509     // Decline to publish data until we can.
00510     return;
00511   }
00512 
00513   SubscriptionUpdate sample;
00514   sample.sender            = this->id().id();
00515   sample.action            = UpdateFilterExpressionParams;
00516   sample.domain            = id.domain;
00517   sample.participant       = id.participant;
00518   sample.id                = id.id;
00519   sample.expression_params = params;
00520 
00521   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00522     OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
00523     OpenDDS::DCPS::RepoIdConverter sub_converter(sample.id);
00524     ACE_DEBUG((LM_DEBUG,
00525                ACE_TEXT("(%P|%t) Federator::ManagerImpl::update(FilterParams): ")
00526                ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
00527                this->id().id(),
00528                sample.domain,
00529                std::string(part_converter).c_str(),
00530                std::string(sub_converter).c_str()));
00531   }
00532 
00533   this->subscriptionWriter_->write(sample, DDS::HANDLE_NIL);
00534 }

void OpenDDS::Federator::ManagerImpl::update ( const Update::IdPath id,
const DDS::SubscriberQos qos 
) [virtual]

Implements Update::Updater.

Definition at line 537 of file FederatorManagerImpl_updates.cpp.

References OpenDDS::Federator::SubscriptionUpdate::action, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::SubscriptionUpdate::domain, DDS::HANDLE_NIL, OpenDDS::Federator::SubscriptionUpdate::id, TAO_DDS_DCPSFederationId::id(), id(), OpenDDS::Federator::SubscriptionUpdate::participant, OpenDDS::Federator::SubscriptionUpdate::sender, OpenDDS::Federator::SubscriptionUpdate::subscriber_qos, subscriptionWriter_, and OpenDDS::Federator::UpdateQosValue2.

00538 {
00539   if (CORBA::is_nil(this->subscriptionWriter_.in())) {
00540     // Decline to publish data until we can.
00541     return;
00542   }
00543 
00544   SubscriptionUpdate sample;
00545   sample.sender         = this->id().id();
00546   sample.action         = UpdateQosValue2;
00547 
00548   sample.domain         = id.domain;
00549   sample.participant    = id.participant;
00550   sample.id             = id.id;
00551   sample.subscriber_qos = qos;
00552 
00553   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00554     OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
00555     OpenDDS::DCPS::RepoIdConverter sub_converter(sample.id);
00556     ACE_DEBUG((LM_DEBUG,
00557                ACE_TEXT("(%P|%t) Federator::ManagerImpl::update( SubscriberUpdate): ")
00558                ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
00559                this->id().id(),
00560                sample.domain,
00561                std::string(part_converter).c_str(),
00562                std::string(sub_converter).c_str()));
00563   }
00564 
00565   this->subscriptionWriter_->write(sample, DDS::HANDLE_NIL);
00566 }

void OpenDDS::Federator::ManagerImpl::update ( const Update::IdPath id,
const DDS::DataReaderQos qos 
) [virtual]

Implements Update::Updater.

Definition at line 474 of file FederatorManagerImpl_updates.cpp.

References OpenDDS::Federator::SubscriptionUpdate::action, OpenDDS::Federator::SubscriptionUpdate::datareader_qos, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::SubscriptionUpdate::domain, DDS::HANDLE_NIL, OpenDDS::Federator::SubscriptionUpdate::id, TAO_DDS_DCPSFederationId::id(), id(), OpenDDS::Federator::SubscriptionUpdate::participant, OpenDDS::Federator::SubscriptionUpdate::sender, subscriptionWriter_, and OpenDDS::Federator::UpdateQosValue1.

00475 {
00476   if (CORBA::is_nil(this->subscriptionWriter_.in())) {
00477     // Decline to publish data until we can.
00478     return;
00479   }
00480 
00481   SubscriptionUpdate sample;
00482   sample.sender         = this->id().id();
00483   sample.action         = UpdateQosValue1;
00484 
00485   sample.domain         = id.domain;
00486   sample.participant    = id.participant;
00487   sample.id             = id.id;
00488   sample.datareader_qos = qos;
00489 
00490   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00491     OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
00492     OpenDDS::DCPS::RepoIdConverter sub_converter(sample.id);
00493     ACE_DEBUG((LM_DEBUG,
00494                ACE_TEXT("(%P|%t) Federator::ManagerImpl::update( ReaderUpdate): ")
00495                ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
00496                this->id().id(),
00497                sample.domain,
00498                std::string(part_converter).c_str(),
00499                std::string(sub_converter).c_str()));
00500   }
00501 
00502   this->subscriptionWriter_->write(sample, DDS::HANDLE_NIL);
00503 }

void OpenDDS::Federator::ManagerImpl::update ( const Update::IdPath id,
const DDS::PublisherQos qos 
) [virtual]

Implements Update::Updater.

Definition at line 442 of file FederatorManagerImpl_updates.cpp.

References OpenDDS::Federator::PublicationUpdate::action, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::PublicationUpdate::domain, DDS::HANDLE_NIL, OpenDDS::Federator::PublicationUpdate::id, TAO_DDS_DCPSFederationId::id(), id(), OpenDDS::Federator::PublicationUpdate::participant, publicationWriter_, OpenDDS::Federator::PublicationUpdate::publisher_qos, OpenDDS::Federator::PublicationUpdate::sender, and OpenDDS::Federator::UpdateQosValue2.

00443 {
00444   if (CORBA::is_nil(this->publicationWriter_.in())) {
00445     // Decline to publish data until we can.
00446     return;
00447   }
00448 
00449   PublicationUpdate sample;
00450   sample.sender         = this->id().id();
00451   sample.action         = UpdateQosValue2;
00452 
00453   sample.domain         = id.domain;
00454   sample.participant    = id.participant;
00455   sample.id             = id.id;
00456   sample.publisher_qos  = qos;
00457 
00458   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00459     OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
00460     OpenDDS::DCPS::RepoIdConverter pub_converter(sample.id);
00461     ACE_DEBUG((LM_DEBUG,
00462                ACE_TEXT("(%P|%t) Federator::ManagerImpl::update( PublisherUpdate): ")
00463                ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"),
00464                this->id().id(),
00465                sample.domain,
00466                std::string(part_converter).c_str(),
00467                std::string(pub_converter).c_str()));
00468   }
00469 
00470   this->publicationWriter_->write(sample, DDS::HANDLE_NIL);
00471 }

void OpenDDS::Federator::ManagerImpl::update ( const Update::IdPath id,
const DDS::DataWriterQos qos 
) [virtual]

Implements Update::Updater.

Definition at line 410 of file FederatorManagerImpl_updates.cpp.

References OpenDDS::Federator::PublicationUpdate::action, OpenDDS::Federator::PublicationUpdate::datawriter_qos, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::PublicationUpdate::domain, DDS::HANDLE_NIL, OpenDDS::Federator::PublicationUpdate::id, TAO_DDS_DCPSFederationId::id(), id(), OpenDDS::Federator::PublicationUpdate::participant, publicationWriter_, OpenDDS::Federator::PublicationUpdate::sender, and OpenDDS::Federator::UpdateQosValue1.

00411 {
00412   if (CORBA::is_nil(this->publicationWriter_.in())) {
00413     // Decline to publish data until we can.
00414     return;
00415   }
00416 
00417   PublicationUpdate sample;
00418   sample.sender         = this->id().id();
00419   sample.action         = UpdateQosValue1;
00420 
00421   sample.domain         = id.domain;
00422   sample.participant    = id.participant;
00423   sample.id             = id.id;
00424   sample.datawriter_qos = qos;
00425 
00426   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00427     OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
00428     OpenDDS::DCPS::RepoIdConverter pub_converter(sample.id);
00429     ACE_DEBUG((LM_DEBUG,
00430                ACE_TEXT("(%P|%t) Federator::ManagerImpl::update( WriterUpdate): ")
00431                ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"),
00432                this->id().id(),
00433                sample.domain,
00434                std::string(part_converter).c_str(),
00435                std::string(pub_converter).c_str()));
00436   }
00437 
00438   this->publicationWriter_->write(sample, DDS::HANDLE_NIL);
00439 }

void OpenDDS::Federator::ManagerImpl::update ( const Update::IdPath id,
const DDS::TopicQos qos 
) [virtual]

Implements Update::Updater.

Definition at line 378 of file FederatorManagerImpl_updates.cpp.

References OpenDDS::Federator::TopicUpdate::action, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::TopicUpdate::domain, DDS::HANDLE_NIL, OpenDDS::Federator::TopicUpdate::id, TAO_DDS_DCPSFederationId::id(), id(), OpenDDS::Federator::TopicUpdate::participant, OpenDDS::Federator::TopicUpdate::qos, OpenDDS::Federator::TopicUpdate::sender, topicWriter_, and OpenDDS::Federator::UpdateQosValue1.

00379 {
00380   if (CORBA::is_nil(this->topicWriter_.in())) {
00381     // Decline to publish data until we can.
00382     return;
00383   }
00384 
00385   TopicUpdate sample;
00386   sample.sender      = this->id().id();
00387   sample.action      = UpdateQosValue1;
00388 
00389   sample.id          = id.id;
00390   sample.domain      = id.domain;
00391   sample.participant = id.participant;
00392   sample.qos         = qos;
00393 
00394   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00395     OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
00396     OpenDDS::DCPS::RepoIdConverter topic_converter(sample.id);
00397     ACE_DEBUG((LM_DEBUG,
00398                ACE_TEXT("(%P|%t) Federator::ManagerImpl::update( TopicUpdate): ")
00399                ACE_TEXT("repo %d - [ domain %d/ participant %C/ topic %C ]\n"),
00400                this->id().id(),
00401                sample.domain,
00402                std::string(part_converter).c_str(),
00403                std::string(topic_converter).c_str()));
00404   }
00405 
00406   this->topicWriter_->write(sample, DDS::HANDLE_NIL);
00407 }

void OpenDDS::Federator::ManagerImpl::update ( const Update::IdPath id,
const DDS::DomainParticipantQos qos 
) [virtual]

Implements Update::Updater.

Definition at line 349 of file FederatorManagerImpl_updates.cpp.

References OpenDDS::Federator::ParticipantUpdate::action, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::ParticipantUpdate::domain, DDS::HANDLE_NIL, OpenDDS::Federator::ParticipantUpdate::id, TAO_DDS_DCPSFederationId::id(), id(), participantWriter_, OpenDDS::Federator::ParticipantUpdate::qos, OpenDDS::Federator::ParticipantUpdate::sender, and OpenDDS::Federator::UpdateQosValue1.

00350 {
00351   if (CORBA::is_nil(this->participantWriter_.in())) {
00352     // Decline to publish data until we can.
00353     return;
00354   }
00355 
00356   ParticipantUpdate sample;
00357   sample.sender = this->id().id();
00358   sample.action = UpdateQosValue1;
00359 
00360   sample.domain = id.domain;
00361   sample.id     = id.id;
00362   sample.qos    = qos;
00363 
00364   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00365     OpenDDS::DCPS::RepoIdConverter converter(sample.id);
00366     ACE_DEBUG((LM_DEBUG,
00367                ACE_TEXT("(%P|%t) Federator::ManagerImpl::update( ParticipantUpdate): ")
00368                ACE_TEXT("repo %d - [ domain %d/ participant %C ]\n"),
00369                this->id().id(),
00370                sample.domain,
00371                std::string(converter).c_str()));
00372   }
00373 
00374   this->participantWriter_->write(sample, DDS::HANDLE_NIL);
00375 }


Member Data Documentation

Config& OpenDDS::Federator::ManagerImpl::config_ [private]

The configuration information for this manager.

Definition at line 231 of file FederatorManagerImpl.h.

Referenced by id(), initialize(), join_federation(), and pushState().

ACE_Thread_Mutex OpenDDS::Federator::ManagerImpl::deferred_lock_ [private]

Protect deferred updates.

Definition at line 294 of file FederatorManagerImpl.h.

std::list<OwnerUpdate> OpenDDS::Federator::ManagerImpl::deferredOwnerships_ [private]

Deferred ownership updates.

Definition at line 279 of file FederatorManagerImpl.h.

Referenced by processCreate(), processDeferred(), processDelete(), and processUpdateQos1().

std::list<PublicationUpdate> OpenDDS::Federator::ManagerImpl::deferredPublications_ [private]

Deferred publication updates.

Definition at line 285 of file FederatorManagerImpl.h.

Referenced by processCreate(), and processDeferred().

std::list<SubscriptionUpdate> OpenDDS::Federator::ManagerImpl::deferredSubscriptions_ [private]

Deferred subscription updates.

Definition at line 288 of file FederatorManagerImpl.h.

Referenced by processCreate(), and processDeferred().

std::list<TopicUpdate> OpenDDS::Federator::ManagerImpl::deferredTopics_ [private]

Deferred topic updates.

Definition at line 282 of file FederatorManagerImpl.h.

Referenced by processCreate(), and processDeferred().

bool OpenDDS::Federator::ManagerImpl::federated_ [private]

Flag indicating that we are actively participating in a federation of repositories.

Definition at line 219 of file FederatorManagerImpl.h.

Referenced by finalize(), join_federation(), and shutdown().

DDS::DomainParticipant_var OpenDDS::Federator::ManagerImpl::federationParticipant_ [private]

local DomainParticipant

Definition at line 246 of file FederatorManagerImpl.h.

Referenced by initialize().

TAO_DDS_DCPSInfo_i* OpenDDS::Federator::ManagerImpl::info_ [private]

The Info object reference to update.

Definition at line 234 of file FederatorManagerImpl.h.

Referenced by info(), leave_and_shutdown(), processCreate(), processDelete(), processUpdateFilterExpressionParams(), processUpdateQos1(), processUpdateQos2(), pushState(), and shutdown().

RepoKey OpenDDS::Federator::ManagerImpl::joiner_ [private]

Simple recursion avoidance during the join operations.

Definition at line 212 of file FederatorManagerImpl.h.

Referenced by join_federation().

ACE_Condition<ACE_SYNCH_MUTEX> OpenDDS::Federator::ManagerImpl::joining_ [private]

Condition used to gate joining activities.

Definition at line 209 of file FederatorManagerImpl.h.

Referenced by join_federation().

RepoKey OpenDDS::Federator::ManagerImpl::joinRepo_ [private]

Repository to which we joined.

Definition at line 215 of file FederatorManagerImpl.h.

Referenced by join_federation().

OpenDDS::DCPS::DCPSInfo_var OpenDDS::Federator::ManagerImpl::localRepo_ [private]

Remotely callable reference to the local repository.

Definition at line 237 of file FederatorManagerImpl.h.

Referenced by localRepo().

ACE_SYNCH_MUTEX OpenDDS::Federator::ManagerImpl::lock_ [private]

Critical section MUTEX.

Definition at line 206 of file FederatorManagerImpl.h.

bool OpenDDS::Federator::ManagerImpl::multicastEnabled_ [private]

Is multicast enabled?

Definition at line 291 of file FederatorManagerImpl.h.

Referenced by ManagerImpl().

InfoRepoMulticastResponder OpenDDS::Federator::ManagerImpl::multicastResponder_ [private]

Multicast responder.

Definition at line 243 of file FederatorManagerImpl.h.

CORBA::ORB_var OpenDDS::Federator::ManagerImpl::orb_ [private]

The ORB in which we are activated.

Definition at line 240 of file FederatorManagerImpl.h.

Referenced by finalize(), initialize(), and orb().

UpdateListener<OwnerUpdate, OwnerUpdateDataReader> OpenDDS::Federator::ManagerImpl::ownerListener_ [private]

TopicUpdate listener.

Definition at line 249 of file FederatorManagerImpl.h.

Referenced by finalize(), and initialize().

OwnerUpdateDataWriter_var OpenDDS::Federator::ManagerImpl::ownerWriter_ [private]

TopicUpdate writer.

Definition at line 264 of file FederatorManagerImpl.h.

Referenced by create(), and initialize().

UpdateListener<ParticipantUpdate, ParticipantUpdateDataReader> OpenDDS::Federator::ManagerImpl::participantListener_ [private]

ParticipantUpdate listener.

Definition at line 255 of file FederatorManagerImpl.h.

Referenced by finalize(), and initialize().

ParticipantUpdateDataWriter_var OpenDDS::Federator::ManagerImpl::participantWriter_ [private]

ParticipantUpdate writer.

Definition at line 270 of file FederatorManagerImpl.h.

Referenced by create(), destroy(), initialize(), and update().

IdToManagerMap OpenDDS::Federator::ManagerImpl::peers_ [private]

The peer with which we have federated.

Definition at line 225 of file FederatorManagerImpl.h.

Referenced by finalize(), join_federation(), and leave_federation().

UpdateListener<PublicationUpdate, PublicationUpdateDataReader> OpenDDS::Federator::ManagerImpl::publicationListener_ [private]

PublicationUpdate listener.

Definition at line 258 of file FederatorManagerImpl.h.

Referenced by finalize(), and initialize().

PublicationUpdateDataWriter_var OpenDDS::Federator::ManagerImpl::publicationWriter_ [private]

PublicationUpdate writer.

Definition at line 273 of file FederatorManagerImpl.h.

Referenced by destroy(), initialize(), and update().

OpenDDS::DCPS::SequenceNumber OpenDDS::Federator::ManagerImpl::sequence_ [private]

The packet sequence number for data that we publish.

Definition at line 228 of file FederatorManagerImpl.h.

UpdateListener<SubscriptionUpdate, SubscriptionUpdateDataReader> OpenDDS::Federator::ManagerImpl::subscriptionListener_ [private]

SubscriptionUpdate listener.

Definition at line 261 of file FederatorManagerImpl.h.

Referenced by finalize(), and initialize().

SubscriptionUpdateDataWriter_var OpenDDS::Federator::ManagerImpl::subscriptionWriter_ [private]

SubscriptionUpdate writer.

Definition at line 276 of file FederatorManagerImpl.h.

Referenced by create(), destroy(), initialize(), and update().

UpdateListener<TopicUpdate, TopicUpdateDataReader> OpenDDS::Federator::ManagerImpl::topicListener_ [private]

TopicUpdate listener.

Definition at line 252 of file FederatorManagerImpl.h.

Referenced by finalize(), and initialize().

TopicUpdateDataWriter_var OpenDDS::Federator::ManagerImpl::topicWriter_ [private]

TopicUpdate writer.

Definition at line 267 of file FederatorManagerImpl.h.

Referenced by create(), destroy(), initialize(), and update().


The documentation for this class was generated from the following files:
Generated on Fri Feb 12 20:06:47 2016 for OpenDDS by  doxygen 1.4.7