OpenDDS::Federator::ManagerImpl Class Reference

#include <FederatorManagerImpl.h>

Inheritance diagram for OpenDDS::Federator::ManagerImpl:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::Federator::ManagerImpl:
Collaboration graph
[legend]

List of all members.

Public Member Functions

 ManagerImpl (Config &config)
virtual ~ManagerImpl ()
virtual CORBA::Boolean discover_federation (const char *ior)
virtual Manager_ptr join_federation (Manager_ptr peer, FederationDomain federation)
virtual void leave_federation (RepoKey id)
virtual RepoKey federation_id ()
virtual OpenDDS::DCPS::DCPSInfo_ptr repository ()
virtual void initializeOwner (const OpenDDS::Federator::OwnerUpdate &data)
virtual void initializeTopic (const OpenDDS::Federator::TopicUpdate &data)
virtual void initializeParticipant (const OpenDDS::Federator::ParticipantUpdate &data)
virtual void initializePublication (const OpenDDS::Federator::PublicationUpdate &data)
virtual void initializeSubscription (const OpenDDS::Federator::SubscriptionUpdate &data)
virtual void leave_and_shutdown ()
virtual void shutdown ()
void initialize ()
 Establish the update publications and subscriptions.
void finalize ()
 Release resources gracefully.
TAO_DDS_DCPSInfo_i *& info ()
 Accessors for the DCPSInfo reference.
TAO_DDS_DCPSInfo_iinfo () const
void localRepo (::OpenDDS::DCPS::DCPSInfo_ptr repo)
 Capture a remote callable reference to the DCPSInfo.
const TAO_DDS_DCPSFederationIdid () const
 Accessors for the federation Id value.
CORBA::ORB_ptr orb ()
 Accessors for the ORB.
void orb (CORBA::ORB_ptr value)
void pushState (Manager_ptr peer)
 Push our current state to a remote repository.
void processDeferred ()
 Handle any deferred updates that might have become processable.
virtual void unregisterCallback ()
virtual void requestImage ()
virtual void create (const Update::UTopic &topic)
virtual void create (const Update::UParticipant &participant)
virtual void create (const Update::URActor &reader)
virtual void create (const Update::UWActor &writer)
virtual void create (const Update::OwnershipData &data)
virtual void update (const Update::IdPath &id, const DDS::DomainParticipantQos &qos)
virtual void update (const Update::IdPath &id, const DDS::TopicQos &qos)
virtual void update (const Update::IdPath &id, const DDS::DataWriterQos &qos)
virtual void update (const Update::IdPath &id, const DDS::PublisherQos &qos)
virtual void update (const Update::IdPath &id, const DDS::DataReaderQos &qos)
virtual void update (const Update::IdPath &id, const DDS::SubscriberQos &qos)
virtual void update (const Update::IdPath &id, const DDS::StringSeq &exprParams)
virtual void destroy (const Update::IdPath &id, Update::ItemType type, Update::ActorType actor)
 Propagate that an entity has been destroyed.
void processCreate (const OwnerUpdate *sample, const DDS::SampleInfo *info)
 Null implementation for OwnerUpdate samples.
void processCreate (const PublicationUpdate *sample, const DDS::SampleInfo *info)
 Create a proxy for a new publication.
void processCreate (const SubscriptionUpdate *sample, const DDS::SampleInfo *info)
 Create a proxy for a new subscription.
void processCreate (const ParticipantUpdate *sample, const DDS::SampleInfo *info)
 Create a proxy for a new participant.
void processCreate (const TopicUpdate *sample, const DDS::SampleInfo *info)
 Create a proxy for a new topic.
void processUpdateQos1 (const OwnerUpdate *sample, const DDS::SampleInfo *info)
 Process ownership changes.
void processUpdateQos1 (const PublicationUpdate *sample, const DDS::SampleInfo *info)
 Update the proxy DataWriterQos for a publication.
void processUpdateQos2 (const PublicationUpdate *sample, const DDS::SampleInfo *info)
 Update the proxy PublisherQos for a publication.
void processUpdateQos1 (const SubscriptionUpdate *sample, const DDS::SampleInfo *info)
 Update the proxy DataReaderQos for a subscription.
void processUpdateQos2 (const SubscriptionUpdate *sample, const DDS::SampleInfo *info)
 Update the proxy SubscriberQos for a subscription.
void processUpdateFilterExpressionParams (const SubscriptionUpdate *sample, const DDS::SampleInfo *info)
 Update the proxy filter expression params for a subscription.
void processUpdateQos1 (const ParticipantUpdate *sample, const DDS::SampleInfo *info)
 Update the proxy ParticipantQos for a participant.
void processUpdateQos1 (const TopicUpdate *sample, const DDS::SampleInfo *info)
 Update the proxy TopicQos for a topic.
void processDelete (const OwnerUpdate *sample, const DDS::SampleInfo *info)
 Null implementation for OwnerUpdate samples.
void processDelete (const PublicationUpdate *sample, const DDS::SampleInfo *info)
 Delete a proxy for a publication.
void processDelete (const SubscriptionUpdate *sample, const DDS::SampleInfo *info)
 Delete a proxy for a subscription.
void processDelete (const ParticipantUpdate *sample, const DDS::SampleInfo *info)
 Delete a proxy for a participant.
void processDelete (const TopicUpdate *sample, const DDS::SampleInfo *info)
 Delete a proxy for a topic.

Private Types

typedef std::map< RepoKey,
Manager_var > 
IdToManagerMap
 Map type to hold references to federated repository Managers.

Private Attributes

ACE_SYNCH_MUTEX lock_
 Critical section MUTEX.
ACE_Condition< ACE_SYNCH_MUTEX > joining_
 Condition used to gate joining activities.
RepoKey joiner_
 Simple recursion avoidance during the join operations.
RepoKey joinRepo_
 Repository to which we joined.
bool federated_
IdToManagerMap peers_
 The peer with which we have federated.
OpenDDS::DCPS::SequenceNumber sequence_
 The packet sequence number for data that we publish.
Configconfig_
 The configuration information for this manager.
TAO_DDS_DCPSInfo_iinfo_
 The Info object reference to update.
OpenDDS::DCPS::DCPSInfo_var localRepo_
 Remotely callable reference to the local repository.
CORBA::ORB_var orb_
 The ORB in which we are activated.
InfoRepoMulticastResponder multicastResponder_
 Multicast responder.
DDS::DomainParticipant_var federationParticipant_
 local DomainParticipant
UpdateListener< OwnerUpdate,
OwnerUpdateDataReader > 
ownerListener_
 TopicUpdate listener.
UpdateListener< TopicUpdate,
TopicUpdateDataReader > 
topicListener_
 TopicUpdate listener.
UpdateListener
< ParticipantUpdate,
ParticipantUpdateDataReader > 
participantListener_
 ParticipantUpdate listener.
UpdateListener
< PublicationUpdate,
PublicationUpdateDataReader > 
publicationListener_
 PublicationUpdate listener.
UpdateListener
< SubscriptionUpdate,
SubscriptionUpdateDataReader > 
subscriptionListener_
 SubscriptionUpdate listener.
OwnerUpdateDataWriter_var ownerWriter_
 TopicUpdate writer.
TopicUpdateDataWriter_var topicWriter_
 TopicUpdate writer.
ParticipantUpdateDataWriter_var participantWriter_
 ParticipantUpdate writer.
PublicationUpdateDataWriter_var publicationWriter_
 PublicationUpdate writer.
SubscriptionUpdateDataWriter_var subscriptionWriter_
 SubscriptionUpdate writer.
std::list< OwnerUpdatedeferredOwnerships_
 Deferred ownership updates.
std::list< TopicUpdatedeferredTopics_
 Deferred topic updates.
std::list< PublicationUpdatedeferredPublications_
 Deferred publication updates.
std::list< SubscriptionUpdatedeferredSubscriptions_
 Deferred subscription updates.
bool multicastEnabled_
 Is multicast enabled?
ACE_Thread_Mutex deferred_lock_
 Protect deferred updates.

Detailed Description

Definition at line 36 of file FederatorManagerImpl.h.


Member Typedef Documentation

typedef std::map<RepoKey, Manager_var> OpenDDS::Federator::ManagerImpl::IdToManagerMap [private]

Map type to hold references to federated repository Managers.

Definition at line 222 of file FederatorManagerImpl.h.


Constructor & Destructor Documentation

OpenDDS::Federator::ManagerImpl::ManagerImpl ( Config config  ) 

Definition at line 39 of file FederatorManagerImpl.cpp.

References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, ACE_OS::getenv(), LM_DEBUG, and multicastEnabled_.

00040   : joining_(this->lock_),
00041     joiner_(NIL_REPOSITORY),
00042     joinRepo_(NIL_REPOSITORY),
00043     federated_(false),
00044     config_(config),
00045     info_(0),
00046     ownerListener_(*this),
00047     topicListener_(*this),
00048     participantListener_(*this),
00049     publicationListener_(*this),
00050     subscriptionListener_(*this),
00051     multicastEnabled_(false)
00052 {
00053   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00054     ACE_DEBUG((LM_DEBUG,
00055                ACE_TEXT("(%P|%t) Federator::ManagerImpl::ManagerImpl()\n")));
00056   }
00057 
00058   char* mdec = ACE_OS::getenv("MulticastDiscoveryEnabled");
00059 
00060   if (mdec != 0) {
00061     std::string mde(ACE_OS::getenv("MulticastDiscoveryEnabled"));
00062 
00063     if (mde != "0") {
00064       multicastEnabled_ = true;
00065     }
00066   }
00067 }

Here is the call graph for this function:

OpenDDS::Federator::ManagerImpl::~ManagerImpl (  )  [virtual]

Definition at line 69 of file FederatorManagerImpl.cpp.

References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, and LM_DEBUG.

00070 {
00071   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00072     ACE_DEBUG((LM_DEBUG,
00073                ACE_TEXT("(%P|%t) Federator::ManagerImpl::~ManagerImpl()\n")));
00074   }
00075 }

Here is the call graph for this function:


Member Function Documentation

void OpenDDS::Federator::ManagerImpl::create ( const Update::OwnershipData data  )  [virtual]

Propagate an entity has been created.

Implements Update::Updater.

Definition at line 179 of file FederatorManagerImpl_updates.cpp.

References ACE_TEXT(), OpenDDS::Federator::OwnerUpdate::action, OpenDDS::Federator::CreateEntity, OpenDDS::DCPS::DCPS_debug_level, Update::OwnershipData::domain, OpenDDS::Federator::OwnerUpdate::domain, DDS::HANDLE_NIL, TAO_DDS_DCPSFederationId::id(), id(), CORBA::is_nil(), LM_DEBUG, Update::OwnershipData::owner, OpenDDS::Federator::OwnerUpdate::owner, ownerWriter_, Update::OwnershipData::participant, OpenDDS::Federator::OwnerUpdate::participant, and OpenDDS::Federator::OwnerUpdate::sender.

00180 {
00181   if (CORBA::is_nil(this->ownerWriter_.in())) {
00182     // Decline to publish data until we can.
00183     return;
00184   }
00185 
00186   OwnerUpdate sample;
00187   sample.sender      = this->id().id();
00188   sample.action      = CreateEntity;
00189 
00190   sample.domain      = data.domain;
00191   sample.participant = data.participant;
00192   sample.owner       = data.owner;
00193 
00194   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00195     OpenDDS::DCPS::RepoIdConverter converter(sample.participant);
00196     ACE_DEBUG((LM_DEBUG,
00197                ACE_TEXT("(%P|%t) Federator::ManagerImpl::create( OwnerUpdate): ")
00198                ACE_TEXT("repo %d - [ domain %d/ participant %C/ sender %d/ owner %d ]\n"),
00199                this->id().id(),
00200                sample.domain,
00201                std::string(converter).c_str(),
00202                sample.sender,
00203                sample.owner));
00204   }
00205 
00206   this->ownerWriter_->write(sample, DDS::HANDLE_NIL);
00207 }

Here is the call graph for this function:

virtual void OpenDDS::Federator::ManagerImpl::create ( const Update::UWActor actor  )  [virtual]

Propagate an entity has been created.

Implements Update::Updater.

void OpenDDS::Federator::ManagerImpl::create ( const Update::URActor actor  )  [virtual]

Propagate an entity has been created.

Implements Update::Updater.

Definition at line 104 of file FederatorManagerImpl_updates.cpp.

References ACE_TEXT(), OpenDDS::Federator::SubscriptionUpdate::action, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::actorId, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::callback, OpenDDS::Federator::SubscriptionUpdate::callback, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::contentSubscriptionProfile, OpenDDS::Federator::CreateEntity, OpenDDS::Federator::SubscriptionUpdate::datareader_qos, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::SubscriptionUpdate::domain, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::domainId, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::drdwQos, OpenDDS::Federator::SubscriptionUpdate::expression_params, OpenDDS::Federator::SubscriptionUpdate::filter_class_name, OpenDDS::Federator::SubscriptionUpdate::filter_expression, DDS::HANDLE_NIL, OpenDDS::Federator::SubscriptionUpdate::id, TAO_DDS_DCPSFederationId::id(), id(), CORBA::is_nil(), LM_DEBUG, OpenDDS::Federator::SubscriptionUpdate::participant, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::participantId, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::pubsubQos, OpenDDS::Federator::SubscriptionUpdate::sender, OpenDDS::Federator::SubscriptionUpdate::subscriber_qos, subscriptionWriter_, OpenDDS::Federator::SubscriptionUpdate::topic, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::topicId, OpenDDS::Federator::SubscriptionUpdate::transport_info, and Update::ActorStrt< PSQ, RWQ, C, T, CSP >::transportInterfaceInfo.

00105 {
00106   if (CORBA::is_nil(this->subscriptionWriter_.in())) {
00107     // Decline to publish data until we can.
00108     return;
00109   }
00110 
00111   SubscriptionUpdate sample;
00112   sample.sender         = this->id().id();
00113   sample.action         = CreateEntity;
00114 
00115   sample.domain         = reader.domainId;
00116   sample.participant    = reader.participantId;
00117   sample.topic          = reader.topicId;
00118   sample.id             = reader.actorId;
00119   sample.callback       = reader.callback.c_str();
00120   sample.datareader_qos = reader.drdwQos;
00121   sample.subscriber_qos = reader.pubsubQos;
00122   sample.transport_info = reader.transportInterfaceInfo;
00123   sample.filter_class_name = reader.contentSubscriptionProfile.filterClassName;
00124   sample.filter_expression = reader.contentSubscriptionProfile.filterExpr;
00125   sample.expression_params = reader.contentSubscriptionProfile.exprParams;
00126 
00127   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00128     OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
00129     OpenDDS::DCPS::RepoIdConverter sub_converter(sample.id);
00130     ACE_DEBUG((LM_DEBUG,
00131                ACE_TEXT("(%P|%t) Federator::ManagerImpl::create( SubscriptionUpdate): ")
00132                ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
00133                this->id().id(),
00134                sample.domain,
00135                std::string(part_converter).c_str(),
00136                std::string(sub_converter).c_str()));
00137   }
00138 
00139   this->subscriptionWriter_->write(sample, DDS::HANDLE_NIL);
00140 }

Here is the call graph for this function:

void OpenDDS::Federator::ManagerImpl::create ( const Update::UParticipant participant  )  [virtual]

Propagate an entity has been created.

Implements Update::Updater.

Definition at line 74 of file FederatorManagerImpl_updates.cpp.

References ACE_TEXT(), OpenDDS::Federator::ParticipantUpdate::action, OpenDDS::Federator::CreateEntity, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::ParticipantUpdate::domain, Update::ParticipantStrt< Q >::domainId, DDS::HANDLE_NIL, OpenDDS::Federator::ParticipantUpdate::id, TAO_DDS_DCPSFederationId::id(), id(), CORBA::is_nil(), LM_DEBUG, Update::ParticipantStrt< Q >::owner, OpenDDS::Federator::ParticipantUpdate::owner, Update::ParticipantStrt< Q >::participantId, Update::ParticipantStrt< Q >::participantQos, participantWriter_, OpenDDS::Federator::ParticipantUpdate::qos, and OpenDDS::Federator::ParticipantUpdate::sender.

00075 {
00076   if (CORBA::is_nil(this->participantWriter_.in())) {
00077     // Decline to publish data until we can.
00078     return;
00079   }
00080 
00081   ParticipantUpdate sample;
00082   sample.sender = this->id().id();
00083   sample.action = CreateEntity;
00084 
00085   sample.owner  = participant.owner;
00086   sample.domain = participant.domainId;
00087   sample.id     = participant.participantId;
00088   sample.qos    = participant.participantQos;
00089 
00090   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00091     OpenDDS::DCPS::RepoIdConverter converter(sample.id);
00092     ACE_DEBUG((LM_DEBUG,
00093                ACE_TEXT("(%P|%t) Federator::ManagerImpl::create( ParticipantUpdate): ")
00094                ACE_TEXT("repo %d - [ domain %d/ participant %C ]\n"),
00095                this->id().id(),
00096                sample.domain,
00097                std::string(converter).c_str()));
00098   }
00099 
00100   this->participantWriter_->write(sample, DDS::HANDLE_NIL);
00101 }

Here is the call graph for this function:

void OpenDDS::Federator::ManagerImpl::create ( const Update::UTopic topic  )  [virtual]

Propagate an entity has been created.

Implements Update::Updater.

Definition at line 40 of file FederatorManagerImpl_updates.cpp.

References ACE_TEXT(), OpenDDS::Federator::TopicUpdate::action, OpenDDS::Federator::CreateEntity, Update::TopicStrt< Q, S >::dataType, OpenDDS::Federator::TopicUpdate::datatype, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::TopicUpdate::domain, Update::TopicStrt< Q, S >::domainId, DDS::HANDLE_NIL, OpenDDS::Federator::TopicUpdate::id, TAO_DDS_DCPSFederationId::id(), id(), CORBA::is_nil(), LM_DEBUG, Update::TopicStrt< Q, S >::name, OpenDDS::Federator::TopicUpdate::participant, Update::TopicStrt< Q, S >::participantId, OpenDDS::Federator::TopicUpdate::qos, OpenDDS::Federator::TopicUpdate::sender, OpenDDS::Federator::TopicUpdate::topic, Update::TopicStrt< Q, S >::topicId, Update::TopicStrt< Q, S >::topicQos, and topicWriter_.

00041 {
00042   if (CORBA::is_nil(this->topicWriter_.in())) {
00043     // Decline to publish data until we can.
00044     return;
00045   }
00046 
00047   TopicUpdate sample;
00048   sample.sender      = this->id().id();
00049   sample.action      = CreateEntity;
00050 
00051   sample.id          = topic.topicId;
00052   sample.domain      = topic.domainId;
00053   sample.participant = topic.participantId;
00054   sample.topic       = topic.name.c_str();
00055   sample.datatype    = topic.dataType.c_str();
00056   sample.qos         = topic.topicQos;
00057 
00058   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00059     OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
00060     OpenDDS::DCPS::RepoIdConverter topic_converter(sample.id);
00061     ACE_DEBUG((LM_DEBUG,
00062                ACE_TEXT("(%P|%t) Federator::ManagerImpl::create( TopicUpdate): ")
00063                ACE_TEXT("repo %d - [ domain %d/ participant %C/ topic %C ]\n"),
00064                this->id().id(),
00065                sample.domain,
00066                std::string(part_converter).c_str(),
00067                std::string(topic_converter).c_str()));
00068   }
00069 
00070   this->topicWriter_->write(sample, DDS::HANDLE_NIL);
00071 }

Here is the call graph for this function:

void OpenDDS::Federator::ManagerImpl::destroy ( const Update::IdPath id,
Update::ItemType  type,
Update::ActorType  actor 
) [virtual]

Propagate that an entity has been destroyed.

Implements Update::Updater.

Definition at line 210 of file FederatorManagerImpl_updates.cpp.

References ACE_TEXT(), OpenDDS::Federator::SubscriptionUpdate::action, OpenDDS::Federator::PublicationUpdate::action, OpenDDS::Federator::ParticipantUpdate::action, OpenDDS::Federator::TopicUpdate::action, Update::Actor, config_, Update::DataReader, Update::DataWriter, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::DestroyEntity, OpenDDS::Federator::SubscriptionUpdate::domain, OpenDDS::Federator::PublicationUpdate::domain, OpenDDS::Federator::ParticipantUpdate::domain, OpenDDS::Federator::TopicUpdate::domain, OpenDDS::Federator::Config::federationDomain(), DDS::HANDLE_NIL, OpenDDS::Federator::SubscriptionUpdate::id, OpenDDS::Federator::PublicationUpdate::id, OpenDDS::Federator::ParticipantUpdate::id, OpenDDS::Federator::TopicUpdate::id, TAO_DDS_DCPSFederationId::id(), id(), CORBA::is_nil(), LM_DEBUG, OpenDDS::Federator::SubscriptionUpdate::participant, OpenDDS::Federator::PublicationUpdate::participant, Update::Participant, OpenDDS::Federator::TopicUpdate::participant, participantWriter_, publicationWriter_, OpenDDS::Federator::SubscriptionUpdate::sender, OpenDDS::Federator::PublicationUpdate::sender, OpenDDS::Federator::ParticipantUpdate::sender, OpenDDS::Federator::TopicUpdate::sender, subscriptionWriter_, Update::Topic, and topicWriter_.

00214 {
00215   //
00216   // Do not propagate any destroy() messages within the FederationDomain.
00217   // This domain will be managed separately.
00218   //
00219   if (id.domain == this->config_.federationDomain()) {
00220     return;
00221   }
00222 
00223   switch (type) {
00224   case Update::Topic: {
00225     if (CORBA::is_nil(this->topicWriter_.in())) {
00226       // Decline to publish data until we can.
00227       return;
00228     }
00229 
00230     TopicUpdate sample;
00231     sample.sender      = this->id().id();
00232     sample.action      = DestroyEntity;
00233 
00234     sample.id          = id.id;
00235     sample.domain      = id.domain;
00236     sample.participant = id.participant;
00237 
00238     if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00239       OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
00240       OpenDDS::DCPS::RepoIdConverter topic_converter(sample.id);
00241       ACE_DEBUG((LM_DEBUG,
00242                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::destroy( TopicUpdate): ")
00243                  ACE_TEXT("repo %d - [ domain %d/ participant %C/ topic %C ]\n"),
00244                  this->id().id(),
00245                  sample.domain,
00246                  std::string(part_converter).c_str(),
00247                  std::string(topic_converter).c_str()));
00248     }
00249 
00250     this->topicWriter_->write(sample, DDS::HANDLE_NIL);
00251   }
00252   break;
00253 
00254   case Update::Participant: {
00255     if (CORBA::is_nil(this->participantWriter_.in())) {
00256       // Decline to publish data until we can.
00257       return;
00258     }
00259 
00260     ParticipantUpdate sample;
00261     sample.sender = this->id().id();
00262     sample.action = DestroyEntity;
00263 
00264     sample.domain = id.domain;
00265     sample.id     = id.id;
00266 
00267     if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00268       OpenDDS::DCPS::RepoIdConverter converter(sample.id);
00269       ACE_DEBUG((LM_DEBUG,
00270                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::destroy( ParticipantUpdate): ")
00271                  ACE_TEXT("repo %d - [ domain %d/ participant %C ]\n"),
00272                  this->id().id(),
00273                  sample.domain,
00274                  std::string(converter).c_str()));
00275     }
00276 
00277     this->participantWriter_->write(sample, DDS::HANDLE_NIL);
00278   }
00279   break;
00280 
00281   case Update::Actor:
00282 
00283     // This is VERY annoying.
00284     switch (actor) {
00285     case Update::DataWriter: {
00286       if (CORBA::is_nil(this->publicationWriter_.in())) {
00287         // Decline to publish data until we can.
00288         return;
00289       }
00290 
00291       PublicationUpdate sample;
00292       sample.sender         = this->id().id();
00293       sample.action         = DestroyEntity;
00294 
00295       sample.domain         = id.domain;
00296       sample.participant    = id.participant;
00297       sample.id             = id.id;
00298 
00299       if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00300         OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
00301         OpenDDS::DCPS::RepoIdConverter pub_converter(sample.id);
00302         ACE_DEBUG((LM_DEBUG,
00303                    ACE_TEXT("(%P|%t) Federator::ManagerImpl::destroy( PublicationUpdate): ")
00304                    ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"),
00305                    this->id().id(),
00306                    sample.domain,
00307                    std::string(part_converter).c_str(),
00308                    std::string(pub_converter).c_str()));
00309       }
00310 
00311       this->publicationWriter_->write(sample, DDS::HANDLE_NIL);
00312     }
00313     break;
00314 
00315     case Update::DataReader: {
00316       if (CORBA::is_nil(this->subscriptionWriter_.in())) {
00317         // Decline to publish data until we can.
00318         return;
00319       }
00320 
00321       SubscriptionUpdate sample;
00322       sample.sender         = this->id().id();
00323       sample.action         = DestroyEntity;
00324 
00325       sample.domain         = id.domain;
00326       sample.participant    = id.participant;
00327       sample.id             = id.id;
00328 
00329       if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00330         OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
00331         OpenDDS::DCPS::RepoIdConverter sub_converter(sample.id);
00332         ACE_DEBUG((LM_DEBUG,
00333                    ACE_TEXT("(%P|%t) Federator::ManagerImpl::destroy( SubscriptionUpdate): ")
00334                    ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
00335                    this->id().id(),
00336                    sample.domain,
00337                    std::string(part_converter).c_str(),
00338                    std::string(sub_converter).c_str()));
00339       }
00340 
00341       this->subscriptionWriter_->write(sample, DDS::HANDLE_NIL);
00342     }
00343     break;
00344     }
00345 
00346     break;
00347   }
00348 }

Here is the call graph for this function:

CORBA::Boolean OpenDDS::Federator::ManagerImpl::discover_federation ( const char *  ior  )  [virtual]

: Implement this.

Definition at line 904 of file FederatorManagerImpl.cpp.

References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, and LM_DEBUG.

00905 {
00906   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00907     ACE_DEBUG((LM_DEBUG,
00908                ACE_TEXT("(%P|%t) ManagerImpl::discover_federation( %C)\n"),
00909                ior));
00910   }
00911 
00912   ///@TODO: Implement this.
00913   return false;
00914 }

Here is the call graph for this function:

RepoKey OpenDDS::Federator::ManagerImpl::federation_id (  )  [virtual]

Definition at line 867 of file FederatorManagerImpl.cpp.

References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, TAO_DDS_DCPSFederationId::id(), id(), and LM_DEBUG.

00868 {
00869   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00870     ACE_DEBUG((LM_DEBUG,
00871                ACE_TEXT("(%P|%t) ManagerImpl::federation_id()\n")));
00872   }
00873 
00874   return this->id().id();
00875 }

Here is the call graph for this function:

void OpenDDS::Federator::ManagerImpl::finalize ( void   ) 

Release resources gracefully.

Definition at line 789 of file FederatorManagerImpl.cpp.

References CORBA::Exception::_tao_print_exception(), ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, ACE_Event_Handler::DONT_CALL, federated_, federationParticipant_, TAO_Pseudo_Var_T< T >::in(), CORBA::is_nil(), OpenDDS::Federator::UpdateListener< DataType, ReaderType >::join(), joinRepo_, LM_DEBUG, LM_ERROR, multicastResponder_, orb_, ownerListener_, participantListener_, peers_, publicationListener_, ACE_Event_Handler::READ_MASK, DDS::RETCODE_PRECONDITION_NOT_MET, OpenDDS::Federator::UpdateListener< DataType, ReaderType >::stop(), subscriptionListener_, TheParticipantFactory, and topicListener_.

Referenced by InfoRepo::finalize(), and InfoRepo::handle_exception().

00790 {
00791   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00792     ACE_DEBUG((LM_DEBUG,
00793                ACE_TEXT("(%P|%t) Federator::ManagerImpl::finalize()\n")));
00794   }
00795 
00796   ownerListener_.stop();
00797   topicListener_.stop();
00798   participantListener_.stop();
00799   publicationListener_.stop();
00800   subscriptionListener_.stop();
00801   ownerListener_.join();
00802   topicListener_.join();
00803   participantListener_.join();
00804   publicationListener_.join();
00805   subscriptionListener_.join();
00806 
00807   if (this->federated_) {
00808     try {
00809       IdToManagerMap::iterator where = this->peers_.find(this->joinRepo_);
00810 
00811       if (where == this->peers_.end()) {
00812         ACE_DEBUG((LM_DEBUG,
00813                    ACE_TEXT("(%P|%t) Federator::Manager::finalize: ")
00814                    ACE_TEXT("repository %d - all attachment to federation left.\n"),
00815                    this->id().id()));
00816 
00817       } else {
00818         if (CORBA::is_nil(where->second.in())) {
00819           ACE_ERROR((LM_ERROR,
00820                      ACE_TEXT("(%P|%t) ERROR: Federator::Manager::finalize: ")
00821                      ACE_TEXT("repository %d not currently attached to a federation.\n"),
00822                      this->id().id()));
00823 
00824         } else {
00825           where->second->leave_federation(this->id().id());
00826           this->federated_ = false;
00827         }
00828       }
00829 
00830     } catch (const CORBA::Exception& ex) {
00831       ex._tao_print_exception(
00832         ACE_TEXT("ERROR: Federator::ManagerImpl::finalize() - ")
00833         ACE_TEXT("unable to leave remote federation "));
00834       throw Incomplete();
00835     }
00836   }
00837 
00838   if (!CORBA::is_nil(this->orb_.in()) && (0 != this->orb_->orb_core())) {
00839     this->orb_->orb_core()->reactor()->remove_handler(
00840       &this->multicastResponder_,
00841       ACE_Event_Handler::READ_MASK | ACE_Event_Handler::DONT_CALL);
00842   }
00843 
00844   // Remove our local participant and contained entities.
00845   if (0 == CORBA::is_nil(this->federationParticipant_.in())) {
00846     DDS::DomainParticipantFactory_var dpf = TheParticipantFactory;
00847     if (DDS::RETCODE_PRECONDITION_NOT_MET
00848         == this->federationParticipant_->delete_contained_entities()) {
00849       ACE_ERROR((LM_ERROR,
00850                  ACE_TEXT("(%P|%t) ERROR: Federator::Manager ")
00851                  ACE_TEXT("unable to release resources for repository %d.\n"),
00852                  this->id().id()));
00853 
00854     } else if (DDS::RETCODE_PRECONDITION_NOT_MET
00855                == dpf->delete_participant(this->federationParticipant_.in())) {
00856       ACE_ERROR((LM_ERROR,
00857                  ACE_TEXT("(%P|%t) ERROR: Federator::Manager ")
00858                  ACE_TEXT("unable to release the participant for repository %d.\n"),
00859                  this->id().id()));
00860     }
00861   }
00862 }

Here is the call graph for this function:

Here is the caller graph for this function:

OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL ACE_INLINE const TAO_DDS_DCPSFederationId & OpenDDS::Federator::ManagerImpl::id ( void   )  const

Accessors for the federation Id value.

Definition at line 12 of file FederatorManagerImpl.inl.

References config_, and OpenDDS::Federator::Config::federationId().

Referenced by create(), destroy(), federation_id(), InfoRepo::init(), pushState(), and update().

00013 {
00014   return this->config_.federationId();
00015 }

Here is the call graph for this function:

Here is the caller graph for this function:

ACE_INLINE TAO_DDS_DCPSInfo_i * OpenDDS::Federator::ManagerImpl::info ( void   )  const

Definition at line 26 of file FederatorManagerImpl.inl.

References info_.

00027 {
00028   return this->info_;
00029 }

ACE_INLINE TAO_DDS_DCPSInfo_i *& OpenDDS::Federator::ManagerImpl::info ( void   ) 

Accessors for the DCPSInfo reference.

Definition at line 19 of file FederatorManagerImpl.inl.

References info_.

Referenced by InfoRepo::init().

00020 {
00021   return this->info_;
00022 }

Here is the caller graph for this function:

void OpenDDS::Federator::ManagerImpl::initialize ( void   ) 

Establish the update publications and subscriptions.

Definition at line 78 of file FederatorManagerImpl.cpp.

References ACE_TEXT(), ACE_OS::atoi(), config_, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::TransportRegistry::DEFAULT_INST_PREFIX, OpenDDS::DCPS::DEFAULT_STATUS_MASK, OpenDDS::Federator::Defaults::DiscoveryRequestPort, DDS::DataWriterQos::durability, DDS::DataReaderQos::durability, OpenDDS::Federator::Config::federationDomain(), OpenDDS::Federator::UpdateListener< DataType, ReaderType >::federationId(), federationParticipant_, OpenDDS::DCPS::DataWriterImpl::get_publication_id(), OpenDDS::DCPS::DataReaderImpl::get_subscription_id(), ACE_OS::getenv(), DDS::DataWriterQos::history, DDS::DataReaderQos::history, OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::Federator::InfoRepoMulticastResponder::init(), OpenDDS::DCPS::TransportRegistry::instance(), CORBA::is_nil(), DDS::KEEP_LAST_HISTORY_QOS, LM_DEBUG, LM_ERROR, LM_WARNING, multicastEnabled_, multicastResponder_, orb_, ownerListener_, OpenDDS::Federator::OWNERUPDATETOPICNAME, OpenDDS::Federator::OWNERUPDATETYPENAME, ownerWriter_, PARTICIPANT_QOS_DEFAULT, participantListener_, OpenDDS::Federator::PARTICIPANTUPDATETOPICNAME, OpenDDS::Federator::PARTICIPANTUPDATETYPENAME, participantWriter_, publicationListener_, OpenDDS::Federator::PUBLICATIONUPDATETOPICNAME, OpenDDS::Federator::PUBLICATIONUPDATETYPENAME, publicationWriter_, PUBLISHER_QOS_DEFAULT, ACE_Event_Handler::READ_MASK, ACE_Reactor::register_handler(), DDS::DataWriterQos::reliability, DDS::DataReaderQos::reliability, DDS::RELIABLE_RELIABILITY_QOS, DDS::RETCODE_OK, SUBSCRIBER_QOS_DEFAULT, subscriptionListener_, OpenDDS::Federator::SUBSCRIPTIONUPDATETOPICNAME, OpenDDS::Federator::SUBSCRIPTIONUPDATETYPENAME, subscriptionWriter_, TheParticipantFactory, TOPIC_QOS_DEFAULT, topicListener_, OpenDDS::Federator::TOPICUPDATETOPICNAME, OpenDDS::Federator::TOPICUPDATETYPENAME, topicWriter_, and DDS::TRANSIENT_LOCAL_DURABILITY_QOS.

Referenced by join_federation().

00079 {
00080   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00081     ACE_DEBUG((LM_DEBUG,
00082                ACE_TEXT("(%P|%t) Federation::ManagerImpl::initialize()\n")));
00083   }
00084 
00085   // Let the listeners know which repository we are to filter samples at
00086   // the earliest opportunity.
00087   this->ownerListener_.federationId(this->id());
00088   this->topicListener_.federationId(this->id());
00089   this->participantListener_.federationId(this->id());
00090   this->publicationListener_.federationId(this->id());
00091   this->subscriptionListener_.federationId(this->id());
00092 
00093   // Add participant for Federation domain
00094   DDS::DomainParticipantFactory_var dpf = TheParticipantFactory;
00095   this->federationParticipant_
00096   = dpf->create_participant(
00097       this->config_.federationDomain(),
00098       PARTICIPANT_QOS_DEFAULT,
00099       DDS::DomainParticipantListener::_nil(),
00100       OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00101 
00102   if (CORBA::is_nil(this->federationParticipant_.in())) {
00103     ACE_ERROR((LM_ERROR,
00104                ACE_TEXT("(%P|%t) ERROR: create_participant failed for ")
00105                ACE_TEXT("repository %d in federation domain %d.\n"),
00106                this->id().id(),
00107                this->config_.federationDomain()));
00108     throw Incomplete();
00109   }
00110   //
00111   // Add type support for update topics
00112   //
00113 
00114   OwnerUpdateTypeSupportImpl::_var_type ownerUpdate = new OwnerUpdateTypeSupportImpl();
00115 
00116   if (DDS::RETCODE_OK != ownerUpdate->register_type(
00117         this->federationParticipant_.in(),
00118         OWNERUPDATETYPENAME)) {
00119     ACE_ERROR((LM_ERROR,
00120                ACE_TEXT("(%P|%t) ERROR: Unable to install ")
00121                ACE_TEXT("OwnerUpdate type support for repository %d.\n"),
00122                this->id().id()));
00123     throw Incomplete();
00124   }
00125 
00126   ParticipantUpdateTypeSupportImpl::_var_type participantUpdate = new ParticipantUpdateTypeSupportImpl();
00127 
00128   if (DDS::RETCODE_OK != participantUpdate->register_type(
00129         this->federationParticipant_.in(),
00130         PARTICIPANTUPDATETYPENAME)) {
00131     ACE_ERROR((LM_ERROR,
00132                ACE_TEXT("(%P|%t) ERROR: Unable to install ")
00133                ACE_TEXT("ParticipantUpdate type support for repository %d.\n"),
00134                this->id().id()));
00135     throw Incomplete();
00136   }
00137 
00138   TopicUpdateTypeSupportImpl::_var_type topicUpdate = new TopicUpdateTypeSupportImpl();
00139 
00140   if (DDS::RETCODE_OK != topicUpdate->register_type(
00141         this->federationParticipant_.in(),
00142         TOPICUPDATETYPENAME)) {
00143     ACE_ERROR((LM_ERROR,
00144                ACE_TEXT("(%P|%t) ERROR: Unable to install ")
00145                ACE_TEXT("TopicUpdate type support for repository %d.\n"),
00146                this->id().id()));
00147     throw Incomplete();
00148   }
00149 
00150   PublicationUpdateTypeSupportImpl::_var_type publicationUpdate = new PublicationUpdateTypeSupportImpl();
00151 
00152   if (DDS::RETCODE_OK != publicationUpdate->register_type(
00153         this->federationParticipant_.in(),
00154         PUBLICATIONUPDATETYPENAME)) {
00155     ACE_ERROR((LM_ERROR,
00156                ACE_TEXT("(%P|%t) ERROR: Unable to install ")
00157                ACE_TEXT("PublicationUpdate type support for repository %d.\n"),
00158                this->id().id()));
00159     throw Incomplete();
00160   }
00161 
00162   SubscriptionUpdateTypeSupportImpl::_var_type subscriptionUpdate = new SubscriptionUpdateTypeSupportImpl();
00163 
00164   if (DDS::RETCODE_OK != subscriptionUpdate->register_type(
00165         this->federationParticipant_.in(),
00166         SUBSCRIPTIONUPDATETYPENAME)) {
00167     ACE_ERROR((LM_ERROR,
00168                ACE_TEXT("(%P|%t) ERROR: Unable to install ")
00169                ACE_TEXT("SubscriptionUpdate type support for repository %d.\n"),
00170                this->id().id()));
00171     throw Incomplete();
00172   }
00173 
00174   //
00175   // Create a transport config for use with federation entities.
00176   //
00177   std::string config_name =
00178     OpenDDS::DCPS::TransportRegistry::DEFAULT_INST_PREFIX
00179     + std::string("FederationBITTransportConfig");
00180   OpenDDS::DCPS::TransportConfig_rch config =
00181     OpenDDS::DCPS::TransportRegistry::instance()->create_config(config_name);
00182 
00183   std::string inst_name = OpenDDS::DCPS::TransportRegistry::DEFAULT_INST_PREFIX
00184     + std::string("FederationBITTCPTransportInst");
00185   OpenDDS::DCPS::TransportInst_rch inst =
00186     OpenDDS::DCPS::TransportRegistry::instance()->create_inst(inst_name,
00187                                                               "tcp");
00188   config->instances_.push_back(inst);
00189 
00190   //
00191   // Create the subscriber for the update topics.
00192   //
00193 
00194   DDS::Subscriber_var subscriber
00195   = this->federationParticipant_->create_subscriber(
00196       SUBSCRIBER_QOS_DEFAULT,
00197       DDS::SubscriberListener::_nil(),
00198       OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00199 
00200   if (CORBA::is_nil(subscriber.in())) {
00201     ACE_ERROR((LM_ERROR,
00202                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00203                ACE_TEXT("failed to create subscriber for repository %d\n"),
00204                this->id().id()));
00205     throw Incomplete();
00206 
00207   } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00208     ACE_DEBUG((LM_DEBUG,
00209                ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00210                ACE_TEXT("created federation subscriber for repository %d\n"),
00211                this->id().id()));
00212 
00213   }
00214 
00215   // Attach the transport to it.
00216 
00217   try {
00218     OpenDDS::DCPS::TransportRegistry::instance()->bind_config(config,
00219                                                               subscriber.in());
00220   } catch (const OpenDDS::DCPS::Transport::Exception&) {
00221     ACE_ERROR((LM_ERROR,
00222                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00223                ACE_TEXT("failed to bind transport config to federation subscriber.\n")));
00224     throw Incomplete();
00225   }
00226 
00227   //
00228   // Create the publisher for the update topics.
00229   //
00230 
00231   DDS::Publisher_var publisher
00232   = this->federationParticipant_->create_publisher(
00233       PUBLISHER_QOS_DEFAULT,
00234       DDS::PublisherListener::_nil(),
00235       OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00236 
00237   if (CORBA::is_nil(publisher.in())) {
00238     ACE_ERROR((LM_ERROR,
00239                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00240                ACE_TEXT("failed to create publisher for repository %d\n"),
00241                this->id().id()));
00242     throw Incomplete();
00243 
00244   } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00245     ACE_DEBUG((LM_DEBUG,
00246                ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00247                ACE_TEXT("created federation publisher for repository %d\n"),
00248                this->id().id()));
00249 
00250   }
00251 
00252   // Attach the transport to it.
00253 
00254   try {
00255     OpenDDS::DCPS::TransportRegistry::instance()->bind_config(config,
00256                                                               publisher.in());
00257   } catch (const OpenDDS::DCPS::Transport::Exception&) {
00258     ACE_ERROR((LM_ERROR,
00259                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00260                ACE_TEXT("failed to bind transport config to federation publisher.\n")));
00261     throw Incomplete();
00262   }
00263 
00264   //
00265   // Some useful items for adding the subscriptions.
00266   //
00267   DDS::Topic_var            topic;
00268   DDS::TopicDescription_var description;
00269   DDS::DataReader_var       dataReader;
00270   DDS::DataWriter_var       dataWriter;
00271 
00272   DDS::DataReaderQos readerQos;
00273   subscriber->get_default_datareader_qos(readerQos);
00274   readerQos.durability.kind                          = DDS::TRANSIENT_LOCAL_DURABILITY_QOS;
00275   readerQos.history.kind                             = DDS::KEEP_LAST_HISTORY_QOS;
00276   readerQos.history.depth                            = 50;
00277   readerQos.reliability.kind                         = DDS::RELIABLE_RELIABILITY_QOS;
00278   readerQos.reliability.max_blocking_time.sec        = 0;
00279   readerQos.reliability.max_blocking_time.nanosec    = 0;
00280 
00281   DDS::DataWriterQos writerQos;
00282   publisher->get_default_datawriter_qos(writerQos);
00283   writerQos.durability.kind                          = DDS::TRANSIENT_LOCAL_DURABILITY_QOS;
00284   writerQos.history.kind                             = DDS::KEEP_LAST_HISTORY_QOS;
00285   writerQos.history.depth                            = 50;
00286   writerQos.reliability.kind                         = DDS::RELIABLE_RELIABILITY_QOS;
00287   writerQos.reliability.max_blocking_time.sec        = 0;
00288   writerQos.reliability.max_blocking_time.nanosec    = 0;
00289 
00290   //
00291   // Add update subscriptions
00292   //
00293   // NOTE: Its ok to lose the references to the objects here since they
00294   //       are not needed after this point.  The only thing we will do
00295   //       with them is to destroy them, and that will be done via a
00296   //       cascade delete from the participant.  The listeners will
00297   //       survive and can be used within other participants as well,
00298   //       since the only state they retain is the manager, which is the
00299   //       same for all.
00300   //
00301 
00302   topic = this->federationParticipant_->create_topic(
00303             OWNERUPDATETOPICNAME,
00304             OWNERUPDATETYPENAME,
00305             TOPIC_QOS_DEFAULT,
00306             DDS::TopicListener::_nil(),
00307             OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00308 
00309   dataWriter = publisher->create_datawriter(
00310                  topic.in(),
00311                  writerQos,
00312                  DDS::DataWriterListener::_nil(),
00313                  OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00314 
00315   if (CORBA::is_nil(dataWriter.in())) {
00316     ACE_ERROR((LM_ERROR,
00317                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00318                ACE_TEXT("failed to create OwnerUpdate writer for repository %d\n"),
00319                this->id().id()));
00320     throw Incomplete();
00321   }
00322 
00323   this->ownerWriter_ = OwnerUpdateDataWriter::_narrow(dataWriter.in());
00324 
00325   if (::CORBA::is_nil(this->ownerWriter_.in())) {
00326     ACE_ERROR((LM_ERROR,
00327                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00328                ACE_TEXT("failed to extract typed OwnerUpdate writer.\n")));
00329     throw Incomplete();
00330 
00331   } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00332     OpenDDS::DCPS::DataWriterImpl* servant
00333     = dynamic_cast<OpenDDS::DCPS::DataWriterImpl*>(dataWriter.in());
00334 
00335     if (0 == servant) {
00336       ACE_DEBUG((LM_WARNING,
00337                  ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
00338                  ACE_TEXT("unable to extract typed OwnerUpdate writer.\n")));
00339 
00340     } else {
00341       OpenDDS::DCPS::RepoIdConverter converter(servant->get_publication_id());
00342       ACE_DEBUG((LM_DEBUG,
00343                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00344                  ACE_TEXT("created federation OwnerUpdate writer %C for repository %d\n"),
00345                  std::string(converter).c_str(),
00346                  this->id().id()));
00347     }
00348   }
00349 
00350   description = this->federationParticipant_->lookup_topicdescription(OWNERUPDATETOPICNAME);
00351   dataReader  = subscriber->create_datareader(
00352                   description.in(),
00353                   readerQos,
00354                   &this->ownerListener_,
00355                   OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00356 
00357   if (CORBA::is_nil(dataReader.in())) {
00358     ACE_ERROR((LM_ERROR,
00359                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00360                ACE_TEXT("failed to create OwnerUpdate reader for repository %d\n"),
00361                this->id().id()));
00362     throw Incomplete();
00363 
00364   } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00365     OpenDDS::DCPS::DataReaderImpl* servant
00366     = dynamic_cast<OpenDDS::DCPS::DataReaderImpl*>(dataReader.in());
00367 
00368     if (0 == servant) {
00369       ACE_DEBUG((LM_WARNING,
00370                  ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
00371                  ACE_TEXT("unable to extract typed OwnerUpdate reader.\n")));
00372 
00373     } else {
00374       OpenDDS::DCPS::RepoIdConverter converter(servant->get_subscription_id());
00375       ACE_DEBUG((LM_DEBUG,
00376                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00377                  ACE_TEXT("created federation OwnerUpdate reader %C for repository %d\n"),
00378                  std::string(converter).c_str(),
00379                  this->id().id()));
00380     }
00381   }
00382 
00383   topic = this->federationParticipant_->create_topic(
00384             TOPICUPDATETOPICNAME,
00385             TOPICUPDATETYPENAME,
00386             TOPIC_QOS_DEFAULT,
00387             DDS::TopicListener::_nil(),
00388             OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00389   dataWriter = publisher->create_datawriter(
00390                  topic.in(),
00391                  writerQos,
00392                  DDS::DataWriterListener::_nil(),
00393                  OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00394 
00395   if (CORBA::is_nil(dataWriter.in())) {
00396     ACE_ERROR((LM_ERROR,
00397                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00398                ACE_TEXT("failed to create TopicUpdate writer for repository %d\n"),
00399                this->id().id()));
00400     throw Incomplete();
00401   }
00402 
00403   this->topicWriter_
00404   = TopicUpdateDataWriter::_narrow(dataWriter.in());
00405 
00406   if (::CORBA::is_nil(this->topicWriter_.in())) {
00407     ACE_ERROR((LM_ERROR,
00408                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00409                ACE_TEXT("failed to extract typed TopicUpdate writer.\n")));
00410     throw Incomplete();
00411 
00412   } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00413     OpenDDS::DCPS::DataWriterImpl* servant
00414     = dynamic_cast<OpenDDS::DCPS::DataWriterImpl*>(dataWriter.in());
00415 
00416     if (0 == servant) {
00417       ACE_DEBUG((LM_WARNING,
00418                  ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
00419                  ACE_TEXT("unable to extract typed TopicUpdate writer.\n")));
00420 
00421     } else {
00422       OpenDDS::DCPS::RepoIdConverter converter(servant->get_publication_id());
00423       ACE_DEBUG((LM_DEBUG,
00424                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00425                  ACE_TEXT("created federation TopicUpdate writer %C for repository %d\n"),
00426                  std::string(converter).c_str(),
00427                  this->id().id()));
00428     }
00429   }
00430 
00431   description = this->federationParticipant_->lookup_topicdescription(TOPICUPDATETOPICNAME);
00432   dataReader  = subscriber->create_datareader(
00433                   description.in(),
00434                   readerQos,
00435                   &this->topicListener_,
00436                   OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00437 
00438   if (CORBA::is_nil(dataReader.in())) {
00439     ACE_ERROR((LM_ERROR,
00440                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00441                ACE_TEXT("failed to create TopicUpdate reader for repository %d\n"),
00442                this->id().id()));
00443     throw Incomplete();
00444 
00445   } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00446     OpenDDS::DCPS::DataReaderImpl* servant
00447     = dynamic_cast<OpenDDS::DCPS::DataReaderImpl*>(dataReader.in());
00448 
00449     if (0 == servant) {
00450       ACE_DEBUG((LM_WARNING,
00451                  ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
00452                  ACE_TEXT("unable to extract typed TopicUpdate reader.\n")));
00453 
00454     } else {
00455       OpenDDS::DCPS::RepoIdConverter converter(servant->get_subscription_id());
00456       ACE_DEBUG((LM_DEBUG,
00457                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00458                  ACE_TEXT("created federation TopicUpdate reader %C for repository %d\n"),
00459                  std::string(converter).c_str(),
00460                  this->id().id()));
00461     }
00462   }
00463 
00464   topic = this->federationParticipant_->create_topic(
00465             PARTICIPANTUPDATETOPICNAME,
00466             PARTICIPANTUPDATETYPENAME,
00467             TOPIC_QOS_DEFAULT,
00468             DDS::TopicListener::_nil(),
00469             OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00470   dataWriter = publisher->create_datawriter(
00471                  topic.in(),
00472                  writerQos,
00473                  DDS::DataWriterListener::_nil(),
00474                  OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00475 
00476   if (CORBA::is_nil(dataWriter.in())) {
00477     ACE_ERROR((LM_ERROR,
00478                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00479                ACE_TEXT("failed to create ParticipantUpdate writer for repository %d\n"),
00480                this->id().id()));
00481     throw Incomplete();
00482   }
00483 
00484   this->participantWriter_
00485   = ParticipantUpdateDataWriter::_narrow(dataWriter.in());
00486 
00487   if (::CORBA::is_nil(this->participantWriter_.in())) {
00488     ACE_ERROR((LM_ERROR,
00489                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00490                ACE_TEXT("failed to extract typed ParticipantUpdate writer.\n")));
00491     throw Incomplete();
00492 
00493   } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00494     OpenDDS::DCPS::DataWriterImpl* servant
00495     = dynamic_cast<OpenDDS::DCPS::DataWriterImpl*>(dataWriter.in());
00496 
00497     if (0 == servant) {
00498       ACE_DEBUG((LM_WARNING,
00499                  ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
00500                  ACE_TEXT("unable to extract typed ParticipantUpdate writer.\n")));
00501 
00502     } else {
00503       OpenDDS::DCPS::RepoIdConverter converter(servant->get_publication_id());
00504       ACE_DEBUG((LM_DEBUG,
00505                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00506                  ACE_TEXT("created federation ParticipantUpdate writer %C for repository %d\n"),
00507                  std::string(converter).c_str(),
00508                  this->id().id()));
00509     }
00510   }
00511 
00512   description = this->federationParticipant_->lookup_topicdescription(PARTICIPANTUPDATETOPICNAME);
00513   dataReader  = subscriber->create_datareader(
00514                   description.in(),
00515                   readerQos,
00516                   &this->participantListener_,
00517                   OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00518 
00519   if (CORBA::is_nil(dataReader.in())) {
00520     ACE_ERROR((LM_ERROR,
00521                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00522                ACE_TEXT("failed to create ParticipantUpdate reader for repository %d\n"),
00523                this->id().id()));
00524     throw Incomplete();
00525 
00526   } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00527     OpenDDS::DCPS::DataReaderImpl* servant
00528     = dynamic_cast<OpenDDS::DCPS::DataReaderImpl*>(dataReader.in());
00529 
00530     if (0 == servant) {
00531       ACE_DEBUG((LM_WARNING,
00532                  ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
00533                  ACE_TEXT("unable to extract typed ParticipantUpdate reader.\n")));
00534 
00535     } else {
00536       OpenDDS::DCPS::RepoIdConverter converter(servant->get_subscription_id());
00537       ACE_DEBUG((LM_DEBUG,
00538                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00539                  ACE_TEXT("created federation ParticipantUpdate reader %C for repository %d\n"),
00540                  std::string(converter).c_str(),
00541                  this->id().id()));
00542     }
00543   }
00544 
00545   topic = this->federationParticipant_->create_topic(
00546             PUBLICATIONUPDATETOPICNAME,
00547             PUBLICATIONUPDATETYPENAME,
00548             TOPIC_QOS_DEFAULT,
00549             DDS::TopicListener::_nil(),
00550             OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00551   dataWriter = publisher->create_datawriter(
00552                  topic.in(),
00553                  writerQos,
00554                  DDS::DataWriterListener::_nil(),
00555                  OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00556 
00557   if (CORBA::is_nil(dataWriter.in())) {
00558     ACE_ERROR((LM_ERROR,
00559                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00560                ACE_TEXT("failed to create PublicationUpdate writer for repository %d\n"),
00561                this->id().id()));
00562     throw Incomplete();
00563   }
00564 
00565   this->publicationWriter_
00566   = PublicationUpdateDataWriter::_narrow(dataWriter.in());
00567 
00568   if (::CORBA::is_nil(this->publicationWriter_.in())) {
00569     ACE_ERROR((LM_ERROR,
00570                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00571                ACE_TEXT("failed to extract typed PublicationUpdate writer.\n")));
00572     throw Incomplete();
00573 
00574   } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00575     OpenDDS::DCPS::DataWriterImpl* servant
00576     = dynamic_cast<OpenDDS::DCPS::DataWriterImpl*>(dataWriter.in());
00577 
00578     if (0 == servant) {
00579       ACE_DEBUG((LM_WARNING,
00580                  ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
00581                  ACE_TEXT("unable to extract typed PublicationUpdate writer.\n")));
00582 
00583     } else {
00584       OpenDDS::DCPS::RepoIdConverter converter(servant->get_publication_id());
00585       ACE_DEBUG((LM_DEBUG,
00586                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00587                  ACE_TEXT("created federation PublicationUpdate writer %C for repository %d\n"),
00588                  std::string(converter).c_str(),
00589                  this->id().id()));
00590     }
00591   }
00592 
00593   description = this->federationParticipant_->lookup_topicdescription(PUBLICATIONUPDATETOPICNAME);
00594   dataReader  = subscriber->create_datareader(
00595                   description.in(),
00596                   readerQos,
00597                   &this->publicationListener_,
00598                   OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00599 
00600   if (CORBA::is_nil(dataReader.in())) {
00601     ACE_ERROR((LM_ERROR,
00602                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00603                ACE_TEXT("failed to create PublicationUpdate reader for repository %d\n"),
00604                this->id().id()));
00605     throw Incomplete();
00606 
00607   } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00608     OpenDDS::DCPS::DataReaderImpl* servant
00609     = dynamic_cast<OpenDDS::DCPS::DataReaderImpl*>(dataReader.in());
00610 
00611     if (0 == servant) {
00612       ACE_DEBUG((LM_WARNING,
00613                  ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
00614                  ACE_TEXT("unable to extract typed PublicationUpdate reader.\n")));
00615 
00616     } else {
00617       OpenDDS::DCPS::RepoIdConverter converter(servant->get_subscription_id());
00618       ACE_DEBUG((LM_DEBUG,
00619                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00620                  ACE_TEXT("created federation PublicationUpdate reader %C for repository %d\n"),
00621                  std::string(converter).c_str(),
00622                  this->id().id()));
00623     }
00624   }
00625 
00626   topic = this->federationParticipant_->create_topic(
00627             SUBSCRIPTIONUPDATETOPICNAME,
00628             SUBSCRIPTIONUPDATETYPENAME,
00629             TOPIC_QOS_DEFAULT,
00630             DDS::TopicListener::_nil(),
00631             OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00632   dataWriter = publisher->create_datawriter(
00633                  topic.in(),
00634                  writerQos,
00635                  DDS::DataWriterListener::_nil(),
00636                  OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00637 
00638   if (CORBA::is_nil(dataWriter.in())) {
00639     ACE_ERROR((LM_ERROR,
00640                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00641                ACE_TEXT("failed to create SubscriptionUpdate writer for repository %d\n"),
00642                this->id().id()));
00643     throw Incomplete();
00644   }
00645 
00646   this->subscriptionWriter_
00647   = SubscriptionUpdateDataWriter::_narrow(dataWriter.in());
00648 
00649   if (::CORBA::is_nil(this->subscriptionWriter_.in())) {
00650     ACE_ERROR((LM_ERROR,
00651                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00652                ACE_TEXT("failed to extract typed SubscriptionUpdate writer.\n")));
00653     throw Incomplete();
00654 
00655   } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00656     OpenDDS::DCPS::DataWriterImpl* servant
00657     = dynamic_cast<OpenDDS::DCPS::DataWriterImpl*>(dataWriter.in());
00658 
00659     if (0 == servant) {
00660       ACE_DEBUG((LM_WARNING,
00661                  ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
00662                  ACE_TEXT("unable to extract typed SubscriptionUpdate writer.\n")));
00663 
00664     } else {
00665       OpenDDS::DCPS::RepoIdConverter converter(servant->get_publication_id());
00666       ACE_DEBUG((LM_DEBUG,
00667                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00668                  ACE_TEXT("created federation SubscriptionUpdate writer %C for repository %d\n"),
00669                  std::string(converter).c_str(),
00670                  this->id().id()));
00671     }
00672   }
00673 
00674   description = this->federationParticipant_->lookup_topicdescription(SUBSCRIPTIONUPDATETOPICNAME);
00675   dataReader  = subscriber->create_datareader(
00676                   description.in(),
00677                   readerQos,
00678                   &this->subscriptionListener_,
00679                   OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00680 
00681   if (CORBA::is_nil(dataReader.in())) {
00682     ACE_ERROR((LM_ERROR,
00683                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00684                ACE_TEXT("failed to create SubscriptionUpdate reader for repository %d\n"),
00685                this->id().id()));
00686     throw Incomplete();
00687 
00688   } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00689     OpenDDS::DCPS::DataReaderImpl* servant
00690     = dynamic_cast<OpenDDS::DCPS::DataReaderImpl*>(dataReader.in());
00691 
00692     if (0 == servant) {
00693       ACE_DEBUG((LM_WARNING,
00694                  ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
00695                  ACE_TEXT("unable to extract typed SubscriptionUpdate reader.\n")));
00696 
00697     } else {
00698       OpenDDS::DCPS::RepoIdConverter converter(servant->get_subscription_id());
00699       ACE_DEBUG((LM_DEBUG,
00700                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00701                  ACE_TEXT("created federation SubscriptionUpdate reader %C for repository %d\n"),
00702                  std::string(converter).c_str(),
00703                  this->id().id()));
00704     }
00705   }
00706 
00707   // JSP
00708 #if defined (ACE_HAS_IP_MULTICAST)
00709 
00710   if (this->multicastEnabled_) {
00711     //
00712     // Install ior multicast handler.
00713     //
00714     // Get reactor instance from TAO.
00715     ACE_Reactor *reactor = this->orb_->orb_core()->reactor();
00716 
00717     // See if the -ORBMulticastDiscoveryEndpoint option was specified.
00718     ACE_CString mde(this->orb_->orb_core()->orb_params()->mcast_discovery_endpoint());
00719 
00720     // First, see if the user has given us a multicast port number
00721     // on the command-line;
00722     u_short port = 0;
00723 
00724     // Check environment var. for multicast port.
00725     const char *port_number = ACE_OS::getenv("OpenDDSFederationPort");
00726 
00727     if (port_number != 0) {
00728       port = static_cast<u_short>(ACE_OS::atoi(port_number));
00729     }
00730 
00731     // Port wasn't specified on the command-line -
00732     // use the default.
00733     if (port == 0)
00734       port = OpenDDS::Federator::Defaults::DiscoveryRequestPort;
00735 
00736     // Initialize the handler
00737     if (mde.length() != 0) {
00738       if (this->multicastResponder_.init(
00739             this->orb_.in(),
00740             mde.c_str()) == -1) {
00741         ACE_ERROR((LM_ERROR,
00742                    ACE_TEXT("(%P|%t) ERROR: Unable to initialize ")
00743                    ACE_TEXT("the multicast responder for repository %d.\n"),
00744                    this->id().id()));
00745         throw Incomplete();
00746       }
00747 
00748     } else {
00749       if (this->multicastResponder_.init(
00750             this->orb_.in(),
00751             port,
00752 #if defined (ACE_HAS_IPV6)
00753             ACE_DEFAULT_MULTICASTV6_ADDR
00754 #else
00755             ACE_DEFAULT_MULTICAST_ADDR
00756 #endif /* ACE_HAS_IPV6 */
00757           )) {
00758         ACE_ERROR((LM_ERROR,
00759                    ACE_TEXT("(%P|%t) ERROR: Unable to initialize ")
00760                    ACE_TEXT("the multicast responder for repository %d.\n"),
00761                    this->id().id()));
00762         throw Incomplete();
00763       }
00764     }
00765 
00766     // Register event handler for the ior multicast.
00767     if (reactor->register_handler(&this->multicastResponder_,
00768                                   ACE_Event_Handler::READ_MASK) == -1) {
00769       ACE_ERROR((LM_ERROR,
00770                  ACE_TEXT("(%P|%t) ERROR: Unable to register event handler ")
00771                  ACE_TEXT("for repository %d.\n"),
00772                  this->id().id()));
00773       throw Incomplete();
00774     }
00775 
00776     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00777       ACE_DEBUG((LM_DEBUG,
00778                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00779                  ACE_TEXT("multicast server setup is complete.\n")));
00780     }
00781   }
00782 
00783 #else
00784   ACE_UNUSED_ARG(this->multicastEnabled_);
00785 #endif /* ACE_HAS_IP_MULTICAST */
00786 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::Federator::ManagerImpl::initializeOwner ( const OpenDDS::Federator::OwnerUpdate data  )  [virtual]

Definition at line 1130 of file FederatorManagerImpl.cpp.

References processCreate().

01132 {
01133   this->processCreate(&data, 0);
01134 }

Here is the call graph for this function:

void OpenDDS::Federator::ManagerImpl::initializeParticipant ( const OpenDDS::Federator::ParticipantUpdate data  )  [virtual]

Definition at line 1144 of file FederatorManagerImpl.cpp.

References processCreate().

01146 {
01147   this->processCreate(&data, 0);
01148 }

Here is the call graph for this function:

void OpenDDS::Federator::ManagerImpl::initializePublication ( const OpenDDS::Federator::PublicationUpdate data  )  [virtual]

Definition at line 1151 of file FederatorManagerImpl.cpp.

References processCreate().

01153 {
01154   this->processCreate(&data, 0);
01155 }

Here is the call graph for this function:

void OpenDDS::Federator::ManagerImpl::initializeSubscription ( const OpenDDS::Federator::SubscriptionUpdate data  )  [virtual]

Definition at line 1158 of file FederatorManagerImpl.cpp.

References processCreate().

01160 {
01161   this->processCreate(&data, 0);
01162 }

Here is the call graph for this function:

void OpenDDS::Federator::ManagerImpl::initializeTopic ( const OpenDDS::Federator::TopicUpdate data  )  [virtual]

Definition at line 1137 of file FederatorManagerImpl.cpp.

References processCreate().

01139 {
01140   this->processCreate(&data, 0);
01141 }

Here is the call graph for this function:

Manager_ptr OpenDDS::Federator::ManagerImpl::join_federation ( Manager_ptr  peer,
FederationDomain  federation 
) [virtual]

Definition at line 917 of file FederatorManagerImpl.cpp.

References CORBA::Exception::_tao_print_exception(), ACE_TEXT(), config_, OpenDDS::DCPS::DCPS_debug_level, federated_, OpenDDS::Federator::Config::federationDomain(), initialize(), CORBA::is_nil(), joiner_, joining_, joinRepo_, LM_DEBUG, lock_, OpenDDS::Federator::NIL_REPOSITORY, orb(), participantWriter_, peers_, pushState(), ACE_Condition< MUTEX >::signal(), TheServiceParticipant, and ACE_Condition< MUTEX >::wait().

00921 {
00922   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00923     ACE_DEBUG((LM_DEBUG,
00924                ACE_TEXT("(%P|%t) ManagerImpl::join_federation( peer, %d)\n"),
00925                federation));
00926   }
00927 
00928   RepoKey remote = NIL_REPOSITORY;
00929 
00930   try {
00931     // Obtain the remote repository federator Id value.
00932     remote = peer->federation_id();
00933 
00934     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00935       ACE_DEBUG((LM_DEBUG,
00936                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::join_federation() - ")
00937                  ACE_TEXT("repo id %d entered from repository with id %d.\n"),
00938                  this->id().id(),
00939                  remote));
00940     }
00941 
00942   } catch (const CORBA::Exception& ex) {
00943     ex._tao_print_exception(
00944       ACE_TEXT("ERROR: Federator::ManagerImpl::join_federation() - ")
00945       ACE_TEXT("unable to obtain remote federation Id value: "));
00946     throw Incomplete();
00947   }
00948 
00949   // If we are recursing, then we are done.
00950   if (this->joiner_ == remote) {
00951     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00952       ACE_DEBUG((LM_DEBUG,
00953                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::join_federation() - ")
00954                  ACE_TEXT("repo id %d leaving after reentry from repository with id %d.\n"),
00955                  this->id().id(),
00956                  remote));
00957     }
00958 
00959     return this->_this();
00960 
00961   } else {
00962     // Block while any different repository is joining.
00963     ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->lock_, 0);
00964 
00965     while (this->joiner_ != NIL_REPOSITORY) {
00966       // This releases the lock while we block.
00967       this->joining_.wait();
00968 
00969       // We are now recursing - curses!
00970       if (this->joiner_ == remote) {
00971         return this->_this();
00972       }
00973     }
00974 
00975     // Note that we are joining the remote repository now.
00976     this->joiner_ = remote;
00977   }
00978 
00979   //
00980   // We only reach this point if:
00981   //   1) No other repository is processing past this point;
00982   //   2) We are not recursing.
00983   //
00984 
00985   // Check if we already have Federation repository.
00986   // Check if we are already federated.
00987   if (this->federated_ == false) {
00988     // Go ahead and add the joining repository as our Federation
00989     // repository.
00990     try {
00991       // Mark this repository as the point to which we are joined to
00992       // the federation.
00993       this->joinRepo_ = remote;
00994 
00995       // Obtain a reference to the remote repository.
00996       OpenDDS::DCPS::DCPSInfo_var remoteRepo = peer->repository();
00997 
00998       CORBA::ORB_var orb = remoteRepo->_get_orb();
00999       CORBA::String_var remoteRepoIor = orb->object_to_string(remoteRepo.in());
01000       if (OpenDDS::DCPS::DCPS_debug_level > 4) {
01001         ACE_DEBUG((LM_DEBUG,
01002                    ACE_TEXT("(%P|%t) FederatorManagerImpl::join_federation() - ")
01003                    ACE_TEXT("id %d obtained reference to id %d:\n")
01004                    ACE_TEXT("\t%C\n"),
01005                    this->id().id(),
01006                    remote,
01007                    remoteRepoIor.in()));
01008       }
01009 
01010       // Add remote repository to Service_Participant in the Federation domain
01011       std::ostringstream oss;
01012       oss << remote;
01013       std::string key_string = oss.str();
01014       TheServiceParticipant->set_repo_ior(remoteRepoIor.in(), key_string);
01015       TheServiceParticipant->set_repo_domain(this->config_.federationDomain(), key_string);
01016 
01017     } catch (const CORBA::Exception& ex) {
01018       ex._tao_print_exception(
01019         "ERROR: Federator::ManagerImpl::join_federation() - Unable to join with remote: ");
01020       throw Incomplete();
01021     }
01022   }
01023 
01024   // Symmetrical joining behavior.
01025   try {
01026     Manager_var self = this->_this();
01027     Manager_var remoteManager
01028     = peer->join_federation(self, this->config_.federationDomain());
01029 
01030     if (this->joinRepo_ == remote) {
01031       this->peers_[ this->joinRepo_]
01032       = OpenDDS::Federator::Manager::_duplicate(remoteManager.in());
01033     }
01034 
01035     //
01036     // Push our initial state out to the joining repository *after* we call
01037     // him back to join.  This reduces the amount of duplicate data pushed
01038     // when a new (empty) repository is joining an existing federation.
01039     //
01040     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
01041       ACE_DEBUG((LM_DEBUG,
01042                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::join_federation() - ")
01043                  ACE_TEXT("repo id %d pushing state to repository with id %d.\n"),
01044                  this->id().id(),
01045                  remote));
01046     }
01047 
01048     this->pushState(peer);
01049 
01050   } catch (const CORBA::Exception& ex) {
01051     ex._tao_print_exception(
01052       "ERROR: Federator::ManagerImpl::join_federation() - unsuccessful call to remote->join: ");
01053     throw Incomplete();
01054   }
01055 
01056   if (CORBA::is_nil(this->participantWriter_.in())) {
01057     //
01058     // Establish our update publications and subscriptions *after* we
01059     // have exchanged internal state with the first joining repository.
01060     //
01061     this->initialize();
01062   }
01063 
01064   // Adjust our joining state and give others the opportunity to proceed.
01065   if (OpenDDS::DCPS::DCPS_debug_level > 4) {
01066     ACE_DEBUG((LM_DEBUG,
01067                ACE_TEXT("(%P|%t) Federator::ManagerImpl::join_federation() - ")
01068                ACE_TEXT("repo id %d joined to repository with id %d.\n"),
01069                this->id().id(),
01070                remote));
01071   }
01072 
01073   this->federated_ = true;
01074   this->joiner_    = NIL_REPOSITORY;
01075   this->joining_.signal();
01076   return this->_this();
01077 }

Here is the call graph for this function:

void OpenDDS::Federator::ManagerImpl::leave_and_shutdown ( void   )  [virtual]

Definition at line 1110 of file FederatorManagerImpl.cpp.

References info_, and TAO_DDS_DCPSInfo_i::shutdown().

01112 {
01113   // Shutdown the process via the repository object.
01114   this->info_->shutdown();
01115 }

Here is the call graph for this function:

void OpenDDS::Federator::ManagerImpl::leave_federation ( RepoKey  id  )  [virtual]

Definition at line 1080 of file FederatorManagerImpl.cpp.

References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, info_, LM_DEBUG, peers_, and TAO_DDS_DCPSInfo_i::remove_by_owner().

01082 {
01083   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
01084     ACE_DEBUG((LM_DEBUG,
01085                ACE_TEXT("(%P|%t) ManagerImpl::leave_federation( %d)\n"),
01086                this->id().id()));
01087   }
01088 
01089   // Remove the leaving repository from our outbound mappings.
01090   IdToManagerMap::iterator where = this->peers_.find(id);
01091 
01092   if (where != this->peers_.end()) {
01093     this->peers_.erase(where);
01094   }
01095 
01096   // Remove all the internal Entities owned by the leaving repository.
01097   if (false
01098       == this->info_->remove_by_owner(this->config_.federationDomain(), id)) {
01099     throw Incomplete();
01100   }
01101 
01102   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
01103     ACE_DEBUG((LM_DEBUG,
01104                ACE_TEXT("(%P|%t) ManagerImpl::leave_federation( %d) complete.\n"),
01105                this->id().id()));
01106   }
01107 }

Here is the call graph for this function:

ACE_INLINE void OpenDDS::Federator::ManagerImpl::localRepo ( ::OpenDDS::DCPS::DCPSInfo_ptr  repo  ) 

Capture a remote callable reference to the DCPSInfo.

Definition at line 33 of file FederatorManagerImpl.inl.

References localRepo_.

Referenced by InfoRepo::init().

00034 {
00035   this->localRepo_ = OpenDDS::DCPS::DCPSInfo::_duplicate(repo);
00036 }

Here is the caller graph for this function:

ACE_INLINE void OpenDDS::Federator::ManagerImpl::orb ( CORBA::ORB_ptr  value  ) 

Definition at line 47 of file FederatorManagerImpl.inl.

References CORBA::ORB::_duplicate(), and orb_.

00048 {
00049   this->orb_ = CORBA::ORB::_duplicate(value);
00050 }

Here is the call graph for this function:

ACE_INLINE CORBA::ORB_ptr OpenDDS::Federator::ManagerImpl::orb ( void   ) 

Accessors for the ORB.

Definition at line 40 of file FederatorManagerImpl.inl.

References orb_, and TAO_Pseudo_Var_T< T >::ptr().

Referenced by InfoRepo::init(), join_federation(), and pushState().

00041 {
00042   return this->orb_.ptr();
00043 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::Federator::ManagerImpl::processCreate ( const TopicUpdate sample,
const DDS::SampleInfo info 
) [virtual]

Create a proxy for a new topic.

Implements OpenDDS::Federator::UpdateProcessor< TopicUpdate >.

Definition at line 731 of file FederatorManagerImpl_updates.cpp.

References ACE_TEXT(), TAO_DDS_DCPSInfo_i::add_topic(), OpenDDS::Federator::TopicUpdate::datatype, OpenDDS::DCPS::DCPS_debug_level, deferred_lock_, deferredTopics_, OpenDDS::Federator::TopicUpdate::domain, OpenDDS::Federator::TopicUpdate::id, info_, LM_DEBUG, OpenDDS::Federator::TopicUpdate::participant, processDeferred(), OpenDDS::Federator::TopicUpdate::qos, and OpenDDS::Federator::TopicUpdate::topic.

00732 {
00733   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00734     OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
00735     OpenDDS::DCPS::RepoIdConverter topic_converter(sample->id);
00736     ACE_DEBUG((LM_DEBUG,
00737                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( TopicUpdate): ")
00738                ACE_TEXT("repo %d - [ domain %d/ participant %C/ topic %C ]\n"),
00739                this->id().id(),
00740                sample->domain,
00741                std::string(part_converter).c_str(),
00742                std::string(topic_converter).c_str()));
00743   }
00744 
00745   if (false == this->info_->add_topic(sample->id,
00746                                       sample->domain,
00747                                       sample->participant,
00748                                       sample->topic,
00749                                       sample->datatype,
00750                                       sample->qos)) {
00751     {
00752       ACE_GUARD(ACE_Thread_Mutex,
00753                 guard,
00754                 this->deferred_lock_);
00755       this->deferredTopics_.push_back(*sample);
00756     }
00757 
00758     if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00759       ACE_DEBUG((LM_DEBUG,
00760                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( TopicUpdate): ")
00761                  ACE_TEXT("deferred update.\n")));
00762     }
00763   }
00764 
00765   this->processDeferred();
00766 }

Here is the call graph for this function:

void OpenDDS::Federator::ManagerImpl::processCreate ( const ParticipantUpdate sample,
const DDS::SampleInfo info 
) [virtual]

Create a proxy for a new participant.

Implements OpenDDS::Federator::UpdateProcessor< ParticipantUpdate >.

Definition at line 699 of file FederatorManagerImpl_updates.cpp.

References ACE_TEXT(), TAO_DDS_DCPSInfo_i::add_domain_participant(), TAO_DDS_DCPSInfo_i::changeOwnership(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::ParticipantUpdate::domain, OpenDDS::Federator::ParticipantUpdate::id, info_, LM_DEBUG, LM_ERROR, OpenDDS::Federator::ParticipantUpdate::owner, processDeferred(), OpenDDS::Federator::ParticipantUpdate::qos, and OpenDDS::Federator::ParticipantUpdate::sender.

00700 {
00701   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00702     OpenDDS::DCPS::RepoIdConverter converter(sample->id);
00703     ACE_DEBUG((LM_DEBUG,
00704                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( ParticipantUpdate): ")
00705                ACE_TEXT("repo %d - [ domain %d/ participant %C/ owner %d ]\n"),
00706                this->id().id(),
00707                sample->domain,
00708                std::string(converter).c_str(),
00709                sample->owner));
00710   }
00711 
00712   this->info_->add_domain_participant(
00713     sample->domain,
00714     sample->id,
00715     sample->qos);
00716   bool ownershipChanged = this->info_->changeOwnership(
00717     sample->domain,
00718     sample->id,
00719     sample->sender,
00720     sample->owner);
00721   if (!ownershipChanged) {
00722     ACE_ERROR((LM_ERROR,
00723                ACE_TEXT("(%P|%t) ERROR: ")
00724                ACE_TEXT("OpenDDS::Federator::ManagerImpl::processCreate(), ")
00725                ACE_TEXT("Could not change ownership\n")));
00726   }
00727   this->processDeferred();
00728 }

Here is the call graph for this function:

void OpenDDS::Federator::ManagerImpl::processCreate ( const SubscriptionUpdate sample,
const DDS::SampleInfo info 
) [virtual]

Create a proxy for a new subscription.

Implements OpenDDS::Federator::UpdateProcessor< SubscriptionUpdate >.

Definition at line 655 of file FederatorManagerImpl_updates.cpp.

References ACE_TEXT(), TAO_DDS_DCPSInfo_i::add_subscription(), OpenDDS::Federator::SubscriptionUpdate::callback, OpenDDS::Federator::SubscriptionUpdate::datareader_qos, OpenDDS::DCPS::DCPS_debug_level, deferred_lock_, deferredSubscriptions_, OpenDDS::Federator::SubscriptionUpdate::domain, OpenDDS::Federator::SubscriptionUpdate::expression_params, OpenDDS::Federator::SubscriptionUpdate::filter_class_name, OpenDDS::Federator::SubscriptionUpdate::filter_expression, OpenDDS::Federator::SubscriptionUpdate::id, info_, LM_DEBUG, OpenDDS::Federator::SubscriptionUpdate::participant, processDeferred(), OpenDDS::Federator::SubscriptionUpdate::subscriber_qos, OpenDDS::Federator::SubscriptionUpdate::topic, and OpenDDS::Federator::SubscriptionUpdate::transport_info.

00656 {
00657   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00658     OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
00659     OpenDDS::DCPS::RepoIdConverter sub_converter(sample->id);
00660     ACE_DEBUG((LM_DEBUG,
00661                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( SubscriptionUpdate): ")
00662                ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
00663                this->id().id(),
00664                sample->domain,
00665                std::string(part_converter).c_str(),
00666                std::string(sub_converter).c_str()));
00667   }
00668 
00669   if (false == this->info_->add_subscription(sample->domain,
00670                                              sample->participant,
00671                                              sample->topic,
00672                                              sample->id,
00673                                              sample->callback,
00674                                              sample->datareader_qos,
00675                                              sample->transport_info,
00676                                              sample->subscriber_qos,
00677                                              sample->filter_class_name,
00678                                              sample->filter_expression,
00679                                              sample->expression_params,
00680                                              true)) {
00681     {
00682       ACE_GUARD(ACE_Thread_Mutex,
00683                 guard,
00684                 this->deferred_lock_);
00685       this->deferredSubscriptions_.push_back(*sample);
00686     }
00687 
00688     if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00689       ACE_DEBUG((LM_DEBUG,
00690                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( SubscriptionUpdate): ")
00691                  ACE_TEXT("deferred update.\n")));
00692     }
00693   }
00694 
00695   this->processDeferred();
00696 }

Here is the call graph for this function:

void OpenDDS::Federator::ManagerImpl::processCreate ( const PublicationUpdate sample,
const DDS::SampleInfo info 
) [virtual]

Create a proxy for a new publication.

Implements OpenDDS::Federator::UpdateProcessor< PublicationUpdate >.

Definition at line 614 of file FederatorManagerImpl_updates.cpp.

References ACE_TEXT(), TAO_DDS_DCPSInfo_i::add_publication(), OpenDDS::Federator::PublicationUpdate::callback, OpenDDS::Federator::PublicationUpdate::datawriter_qos, OpenDDS::DCPS::DCPS_debug_level, deferred_lock_, deferredPublications_, OpenDDS::Federator::PublicationUpdate::domain, OpenDDS::Federator::PublicationUpdate::id, info_, LM_DEBUG, OpenDDS::Federator::PublicationUpdate::participant, processDeferred(), OpenDDS::Federator::PublicationUpdate::publisher_qos, OpenDDS::Federator::PublicationUpdate::topic, and OpenDDS::Federator::PublicationUpdate::transport_info.

00615 {
00616   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00617     OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
00618     OpenDDS::DCPS::RepoIdConverter pub_converter(sample->id);
00619     ACE_DEBUG((LM_DEBUG,
00620                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( PublicationUpdate): ")
00621                ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"),
00622                this->id().id(),
00623                sample->domain,
00624                std::string(part_converter).c_str(),
00625                std::string(pub_converter).c_str()));
00626   }
00627 
00628   if (false == this->info_->add_publication(sample->domain,
00629                                             sample->participant,
00630                                             sample->topic,
00631                                             sample->id,
00632                                             sample->callback,
00633                                             sample->datawriter_qos,
00634                                             sample->transport_info,
00635                                             sample->publisher_qos,
00636                                             true)) {
00637     {
00638       ACE_GUARD(ACE_Thread_Mutex,
00639                 guard,
00640                 this->deferred_lock_);
00641       this->deferredPublications_.push_back(*sample);
00642     }
00643 
00644     if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00645       ACE_DEBUG((LM_DEBUG,
00646                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( PublicationUpdate): ")
00647                  ACE_TEXT("deferred update.\n")));
00648     }
00649   }
00650 
00651   this->processDeferred();
00652 }

Here is the call graph for this function:

void OpenDDS::Federator::ManagerImpl::processCreate ( const OwnerUpdate sample,
const DDS::SampleInfo info 
) [virtual]

Null implementation for OwnerUpdate samples.

Implements OpenDDS::Federator::UpdateProcessor< OwnerUpdate >.

Definition at line 577 of file FederatorManagerImpl_updates.cpp.

References ACE_TEXT(), TAO_DDS_DCPSInfo_i::changeOwnership(), OpenDDS::DCPS::DCPS_debug_level, deferred_lock_, deferredOwnerships_, OpenDDS::Federator::OwnerUpdate::domain, info_, LM_DEBUG, OpenDDS::Federator::OwnerUpdate::owner, OpenDDS::Federator::OwnerUpdate::participant, processDeferred(), and OpenDDS::Federator::OwnerUpdate::sender.

Referenced by initializeOwner(), initializeParticipant(), initializePublication(), initializeSubscription(), and initializeTopic().

00578 {
00579   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00580     OpenDDS::DCPS::RepoIdConverter converter(sample->participant);
00581     ACE_DEBUG((LM_DEBUG,
00582                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( OwnerUpdate): ")
00583                ACE_TEXT("repo %d - [ domain %d/ participant %C/ sender %d/ owner %d ]\n"),
00584                this->id().id(),
00585                sample->domain,
00586                std::string(converter).c_str(),
00587                sample->sender,
00588                sample->owner));
00589   }
00590 
00591   // We could generate an error message here.  Instead we let action be irrelevant.
00592   if (false == this->info_->changeOwnership(sample->domain,
00593                                             sample->participant,
00594                                             sample->sender,
00595                                             sample->owner)) {
00596     {
00597       ACE_GUARD(ACE_Thread_Mutex,
00598                 guard,
00599                 this->deferred_lock_);
00600       this->deferredOwnerships_.push_back(*sample);
00601     }
00602 
00603     if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00604       ACE_DEBUG((LM_DEBUG,
00605                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( OwnerUpdate): ")
00606                  ACE_TEXT("deferred update.\n")));
00607     }
00608   }
00609 
00610   this->processDeferred();
00611 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::Federator::ManagerImpl::processDeferred (  ) 

Handle any deferred updates that might have become processable.

Definition at line 769 of file FederatorManagerImpl_updates.cpp.

References ACE_TEXT(), TAO_DDS_DCPSInfo_i::add_publication(), TAO_DDS_DCPSInfo_i::add_subscription(), TAO_DDS_DCPSInfo_i::add_topic(), TAO_DDS_DCPSInfo_i::changeOwnership(), OpenDDS::DCPS::DCPS_debug_level, deferred_lock_, deferredOwnerships_, deferredPublications_, deferredSubscriptions_, deferredTopics_, info_, and LM_DEBUG.

Referenced by processCreate().

00770 {
00771   ACE_GUARD(ACE_Thread_Mutex,
00772             guard,
00773             this->deferred_lock_);
00774 
00775   {
00776     std::list<OwnerUpdate>::iterator current = this->deferredOwnerships_.begin();
00777 
00778     while (current != this->deferredOwnerships_.end()) {
00779       if (this->info_->changeOwnership(current->domain,
00780                                        current->participant,
00781                                        current->sender,
00782                                        current->owner)) {
00783         if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00784           OpenDDS::DCPS::RepoIdConverter converter(current->participant);
00785           ACE_DEBUG((LM_DEBUG,
00786                      ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDeferred( OwnerUpdate): ")
00787                      ACE_TEXT("repo %d - [ domain %d/ participant %C/ sender %d/ owner %d ]\n"),
00788                      this->id().id(),
00789                      current->domain,
00790                      std::string(converter).c_str(),
00791                      current->sender,
00792                      current->owner));
00793         }
00794 
00795         current = this->deferredOwnerships_.erase(current);
00796 
00797       } else {
00798         ++ current;
00799       }
00800     }
00801   }
00802 
00803   {
00804     std::list<TopicUpdate>::iterator current = this->deferredTopics_.begin();
00805 
00806     while (current != this->deferredTopics_.end()) {
00807       if (true == this->info_->add_topic(current->id,
00808                                          current->domain,
00809                                          current->participant,
00810                                          current->topic,
00811                                          current->datatype,
00812                                          current->qos)) {
00813         if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00814           OpenDDS::DCPS::RepoIdConverter part_converter(current->participant);
00815           OpenDDS::DCPS::RepoIdConverter topic_converter(current->id);
00816           ACE_DEBUG((LM_DEBUG,
00817                      ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDeferred( TopicUpdate): ")
00818                      ACE_TEXT("repo %d - [ domain %d/ participant %C/ topic %C ]\n"),
00819                      this->id().id(),
00820                      current->domain,
00821                      std::string(part_converter).c_str(),
00822                      std::string(topic_converter).c_str()));
00823         }
00824 
00825         current = this->deferredTopics_.erase(current);
00826 
00827       } else {
00828         ++ current;
00829       }
00830     }
00831   }
00832 
00833   {
00834     std::list<PublicationUpdate>::iterator current = this->deferredPublications_.begin();
00835 
00836     while (current != this->deferredPublications_.end()) {
00837 
00838       if (true == this->info_->add_publication(current->domain,
00839                                                current->participant,
00840                                                current->topic,
00841                                                current->id,
00842                                                current->callback,
00843                                                current->datawriter_qos,
00844                                                current->transport_info,
00845                                                current->publisher_qos,
00846                                                true)) {
00847         if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00848           OpenDDS::DCPS::RepoIdConverter part_converter(current->participant);
00849           OpenDDS::DCPS::RepoIdConverter pub_converter(current->id);
00850           ACE_DEBUG((LM_DEBUG,
00851                      ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDeferred( PublicationUpdate): ")
00852                      ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"),
00853                      this->id().id(),
00854                      current->domain,
00855                      std::string(part_converter).c_str(),
00856                      std::string(pub_converter).c_str()));
00857         }
00858 
00859         current = this->deferredPublications_.erase(current);
00860 
00861       } else {
00862         ++ current;
00863       }
00864     }
00865   }
00866 
00867   {
00868     std::list<SubscriptionUpdate>::iterator current = this->deferredSubscriptions_.begin();
00869 
00870     while (current != this->deferredSubscriptions_.end()) {
00871 
00872       if (true == this->info_->add_subscription(current->domain,
00873                                                 current->participant,
00874                                                 current->topic,
00875                                                 current->id,
00876                                                 current->callback,
00877                                                 current->datareader_qos,
00878                                                 current->transport_info,
00879                                                 current->subscriber_qos,
00880                                                 current->filter_class_name,
00881                                                 current->filter_expression,
00882                                                 current->expression_params,
00883                                                 true)) {
00884         if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00885           OpenDDS::DCPS::RepoIdConverter part_converter(current->participant);
00886           OpenDDS::DCPS::RepoIdConverter sub_converter(current->id);
00887           ACE_DEBUG((LM_DEBUG,
00888                      ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDeferred( SubscriptionUpdate): ")
00889                      ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
00890                      this->id().id(),
00891                      current->domain,
00892                      std::string(part_converter).c_str(),
00893                      std::string(sub_converter).c_str()));
00894         }
00895 
00896         current = this->deferredSubscriptions_.erase(current);
00897 
00898       } else {
00899         ++ current;
00900       }
00901     }
00902   }
00903 
00904 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::Federator::ManagerImpl::processDelete ( const TopicUpdate sample,
const DDS::SampleInfo info 
) [virtual]

Delete a proxy for a topic.

Implements OpenDDS::Federator::UpdateProcessor< TopicUpdate >.

Definition at line 1208 of file FederatorManagerImpl_updates.cpp.

References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::TopicUpdate::domain, OpenDDS::Federator::TopicUpdate::id, info_, LM_DEBUG, OpenDDS::Federator::TopicUpdate::participant, and TAO_DDS_DCPSInfo_i::remove_topic().

01209 {
01210   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01211     OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
01212     OpenDDS::DCPS::RepoIdConverter topic_converter(sample->id);
01213     ACE_DEBUG((LM_DEBUG,
01214                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( TopicUpdate): ")
01215                ACE_TEXT("repo %d - [ domain %d/ participant %C/ topic %C ]\n"),
01216                this->id().id(),
01217                sample->domain,
01218                std::string(part_converter).c_str(),
01219                std::string(topic_converter).c_str()));
01220   }
01221 
01222   try {
01223     this->info_->remove_topic(
01224       sample->domain,
01225       sample->participant,
01226       sample->id);
01227 
01228   } catch (OpenDDS::DCPS::Invalid_Participant&) {
01229     if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01230       ACE_DEBUG((LM_DEBUG,
01231                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( TopicUpdate): ")
01232                  ACE_TEXT("the participant was already removed.\n")));
01233     }
01234   } catch (OpenDDS::DCPS::Invalid_Domain&) {
01235     if (OpenDDS::DCPS::DCPS_debug_level > 1) {
01236       ACE_DEBUG((LM_DEBUG,
01237                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( TopicUpdate): ")
01238                  ACE_TEXT("the domain %d no longer exists.\n"),sample->domain));
01239     }
01240   }
01241 }

Here is the call graph for this function:

void OpenDDS::Federator::ManagerImpl::processDelete ( const ParticipantUpdate sample,
const DDS::SampleInfo info 
) [virtual]

Delete a proxy for a participant.

Implements OpenDDS::Federator::UpdateProcessor< ParticipantUpdate >.

Definition at line 1183 of file FederatorManagerImpl_updates.cpp.

References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::ParticipantUpdate::domain, OpenDDS::Federator::ParticipantUpdate::id, info_, LM_DEBUG, and TAO_DDS_DCPSInfo_i::remove_domain_participant().

01184 {
01185   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01186     OpenDDS::DCPS::RepoIdConverter converter(sample->id);
01187     ACE_DEBUG((LM_DEBUG,
01188                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( ParticipantUpdate): ")
01189                ACE_TEXT("repo %d - [ domain %d/ participant %C ]\n"),
01190                this->id().id(),
01191                sample->domain,
01192                std::string(converter).c_str()));
01193   }
01194   try {
01195     this->info_->remove_domain_participant(
01196       sample->domain,
01197       sample->id);
01198   } catch (OpenDDS::DCPS::Invalid_Participant&) {
01199     if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01200       ACE_DEBUG((LM_DEBUG,
01201                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( ParticipantUpdate): ")
01202                  ACE_TEXT("the participant was already removed.\n")));
01203     }
01204   }
01205 }

Here is the call graph for this function:

void OpenDDS::Federator::ManagerImpl::processDelete ( const SubscriptionUpdate sample,
const DDS::SampleInfo info 
) [virtual]

Delete a proxy for a subscription.

Implements OpenDDS::Federator::UpdateProcessor< SubscriptionUpdate >.

Definition at line 1153 of file FederatorManagerImpl_updates.cpp.

References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::SubscriptionUpdate::domain, OpenDDS::Federator::SubscriptionUpdate::id, info_, LM_DEBUG, OpenDDS::Federator::SubscriptionUpdate::participant, and TAO_DDS_DCPSInfo_i::remove_subscription().

01154 {
01155   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01156     OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
01157     OpenDDS::DCPS::RepoIdConverter sub_converter(sample->id);
01158     ACE_DEBUG((LM_DEBUG,
01159                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( SubscriptionUpdate): ")
01160                ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
01161                this->id().id(),
01162                sample->domain,
01163                std::string(part_converter).c_str(),
01164                std::string(sub_converter).c_str()));
01165   }
01166 
01167   try {
01168     this->info_->remove_subscription(
01169       sample->domain,
01170       sample->participant,
01171       sample->id);
01172 
01173   } catch (OpenDDS::DCPS::Invalid_Participant&) {
01174     if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01175       ACE_DEBUG((LM_DEBUG,
01176                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( SubscriptionUpdate): ")
01177                  ACE_TEXT("the participant was already removed.\n")));
01178     }
01179   }
01180 }

Here is the call graph for this function:

void OpenDDS::Federator::ManagerImpl::processDelete ( const PublicationUpdate sample,
const DDS::SampleInfo info 
) [virtual]

Delete a proxy for a publication.

Implements OpenDDS::Federator::UpdateProcessor< PublicationUpdate >.

Definition at line 1123 of file FederatorManagerImpl_updates.cpp.

References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::PublicationUpdate::domain, OpenDDS::Federator::PublicationUpdate::id, info_, LM_DEBUG, OpenDDS::Federator::PublicationUpdate::participant, and TAO_DDS_DCPSInfo_i::remove_publication().

01124 {
01125   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01126     OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
01127     OpenDDS::DCPS::RepoIdConverter pub_converter(sample->id);
01128     ACE_DEBUG((LM_DEBUG,
01129                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( PublicationUpdate): ")
01130                ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"),
01131                this->id().id(),
01132                sample->domain,
01133                std::string(part_converter).c_str(),
01134                std::string(pub_converter).c_str()));
01135   }
01136 
01137   try {
01138     this->info_->remove_publication(
01139       sample->domain,
01140       sample->participant,
01141       sample->id);
01142 
01143   } catch (OpenDDS::DCPS::Invalid_Participant&) {
01144     if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01145       ACE_DEBUG((LM_DEBUG,
01146                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( PublicationUpdate): ")
01147                  ACE_TEXT("the participant was already removed.\n")));
01148     }
01149   }
01150 }

Here is the call graph for this function:

void OpenDDS::Federator::ManagerImpl::processDelete ( const OwnerUpdate sample,
const DDS::SampleInfo info 
) [virtual]

Null implementation for OwnerUpdate samples.

Implements OpenDDS::Federator::UpdateProcessor< OwnerUpdate >.

Definition at line 1091 of file FederatorManagerImpl_updates.cpp.

References ACE_TEXT(), TAO_DDS_DCPSInfo_i::changeOwnership(), OpenDDS::DCPS::DCPS_debug_level, deferred_lock_, deferredOwnerships_, OpenDDS::Federator::OwnerUpdate::domain, info_, LM_DEBUG, OpenDDS::Federator::OwnerUpdate::owner, OpenDDS::Federator::OwnerUpdate::participant, and OpenDDS::Federator::OwnerUpdate::sender.

01092 {
01093   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01094     OpenDDS::DCPS::RepoIdConverter converter(sample->participant);
01095     ACE_DEBUG((LM_DEBUG,
01096                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( OwnerUpdate): ")
01097                ACE_TEXT("repo %d - [ domain %d/ participant %C/ sender %d/ owner %d ]\n"),
01098                this->id().id(),
01099                sample->domain,
01100                std::string(converter).c_str(),
01101                sample->sender,
01102                sample->owner));
01103   }
01104 
01105   // We could generate an error message here.  Instead we let action be irrelevant.
01106   if (false == this->info_->changeOwnership(sample->domain,
01107                                             sample->participant,
01108                                             sample->sender,
01109                                             sample->owner)) {
01110     {
01111       ACE_GUARD(ACE_Thread_Mutex,
01112                 guard,
01113                 this->deferred_lock_);
01114       this->deferredOwnerships_.push_back(*sample);
01115     }
01116     ACE_DEBUG((LM_DEBUG,
01117                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( OwnerUpdate): ")
01118                ACE_TEXT("deferred update.\n")));
01119   }
01120 }

Here is the call graph for this function:

void OpenDDS::Federator::ManagerImpl::processUpdateFilterExpressionParams ( const SubscriptionUpdate sample,
const DDS::SampleInfo info 
) [virtual]

Update the proxy filter expression params for a subscription.

Reimplemented from OpenDDS::Federator::UpdateProcessor< SubscriptionUpdate >.

Definition at line 1027 of file FederatorManagerImpl_updates.cpp.

References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::SubscriptionUpdate::domain, OpenDDS::Federator::SubscriptionUpdate::expression_params, OpenDDS::Federator::SubscriptionUpdate::id, info_, LM_DEBUG, OpenDDS::Federator::SubscriptionUpdate::participant, and TAO_DDS_DCPSInfo_i::update_subscription_params().

01029 {
01030   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01031     OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
01032     OpenDDS::DCPS::RepoIdConverter sub_converter(sample->id);
01033     ACE_DEBUG((LM_DEBUG,
01034                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateFilterExpressionParams(SubscriptionUpdate): ")
01035                ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
01036                this->id().id(),
01037                sample->domain,
01038                std::string(part_converter).c_str(),
01039                std::string(sub_converter).c_str()));
01040   }
01041 
01042   this->info_->update_subscription_params(
01043     sample->domain,
01044     sample->participant,
01045     sample->id,
01046     sample->expression_params);
01047 }

Here is the call graph for this function:

void OpenDDS::Federator::ManagerImpl::processUpdateQos1 ( const TopicUpdate sample,
const DDS::SampleInfo info 
) [virtual]

Update the proxy TopicQos for a topic.

Implements OpenDDS::Federator::UpdateProcessor< TopicUpdate >.

Definition at line 1069 of file FederatorManagerImpl_updates.cpp.

References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::TopicUpdate::domain, OpenDDS::Federator::TopicUpdate::id, info_, LM_DEBUG, OpenDDS::Federator::TopicUpdate::participant, OpenDDS::Federator::TopicUpdate::qos, and TAO_DDS_DCPSInfo_i::update_topic_qos().

01070 {
01071   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01072     OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
01073     OpenDDS::DCPS::RepoIdConverter topic_converter(sample->id);
01074     ACE_DEBUG((LM_DEBUG,
01075                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos1( TopicUpdate): ")
01076                ACE_TEXT("repo %d - [ domain %d/ participant %C/ topic %C ]\n"),
01077                this->id().id(),
01078                sample->domain,
01079                std::string(part_converter).c_str(),
01080                std::string(topic_converter).c_str()));
01081   }
01082 
01083   this->info_->update_topic_qos(
01084     sample->id,
01085     sample->domain,
01086     sample->participant,
01087     sample->qos);
01088 }

Here is the call graph for this function:

void OpenDDS::Federator::ManagerImpl::processUpdateQos1 ( const ParticipantUpdate sample,
const DDS::SampleInfo info 
) [virtual]

Update the proxy ParticipantQos for a participant.

Implements OpenDDS::Federator::UpdateProcessor< ParticipantUpdate >.

Definition at line 1050 of file FederatorManagerImpl_updates.cpp.

References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::ParticipantUpdate::domain, OpenDDS::Federator::ParticipantUpdate::id, info_, LM_DEBUG, OpenDDS::Federator::ParticipantUpdate::qos, and TAO_DDS_DCPSInfo_i::update_domain_participant_qos().

01051 {
01052   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01053     OpenDDS::DCPS::RepoIdConverter converter(sample->id);
01054     ACE_DEBUG((LM_DEBUG,
01055                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos1( ParticipantUpdate): ")
01056                ACE_TEXT("repo %d - [ domain %d/ participant %C ]\n"),
01057                this->id().id(),
01058                sample->domain,
01059                std::string(converter).c_str()));
01060   }
01061 
01062   this->info_->update_domain_participant_qos(
01063     sample->domain,
01064     sample->id,
01065     sample->qos);
01066 }

Here is the call graph for this function:

void OpenDDS::Federator::ManagerImpl::processUpdateQos1 ( const SubscriptionUpdate sample,
const DDS::SampleInfo info 
) [virtual]

Update the proxy DataReaderQos for a subscription.

Implements OpenDDS::Federator::UpdateProcessor< SubscriptionUpdate >.

Definition at line 983 of file FederatorManagerImpl_updates.cpp.

References ACE_TEXT(), OpenDDS::Federator::SubscriptionUpdate::datareader_qos, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::SubscriptionUpdate::domain, OpenDDS::Federator::SubscriptionUpdate::id, info_, LM_DEBUG, OpenDDS::Federator::SubscriptionUpdate::participant, and TAO_DDS_DCPSInfo_i::update_subscription_qos().

00984 {
00985   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00986     OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
00987     OpenDDS::DCPS::RepoIdConverter sub_converter(sample->id);
00988     ACE_DEBUG((LM_DEBUG,
00989                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos1( SubscriptionUpdate): ")
00990                ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
00991                this->id().id(),
00992                sample->domain,
00993                std::string(part_converter).c_str(),
00994                std::string(sub_converter).c_str()));
00995   }
00996 
00997   this->info_->update_subscription_qos(
00998     sample->domain,
00999     sample->participant,
01000     sample->id,
01001     sample->datareader_qos);
01002 }

Here is the call graph for this function:

void OpenDDS::Federator::ManagerImpl::processUpdateQos1 ( const PublicationUpdate sample,
const DDS::SampleInfo info 
) [virtual]

Update the proxy DataWriterQos for a publication.

Implements OpenDDS::Federator::UpdateProcessor< PublicationUpdate >.

Definition at line 939 of file FederatorManagerImpl_updates.cpp.

References ACE_TEXT(), OpenDDS::Federator::PublicationUpdate::datawriter_qos, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::PublicationUpdate::domain, OpenDDS::Federator::PublicationUpdate::id, info_, LM_DEBUG, OpenDDS::Federator::PublicationUpdate::participant, and TAO_DDS_DCPSInfo_i::update_publication_qos().

00940 {
00941   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00942     OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
00943     OpenDDS::DCPS::RepoIdConverter pub_converter(sample->id);
00944     ACE_DEBUG((LM_DEBUG,
00945                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos1( PublicationUpdate): ")
00946                ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"),
00947                this->id().id(),
00948                sample->domain,
00949                std::string(part_converter).c_str(),
00950                std::string(pub_converter).c_str()));
00951   }
00952 
00953   this->info_->update_publication_qos(
00954     sample->domain,
00955     sample->participant,
00956     sample->id,
00957     sample->datawriter_qos);
00958 }

Here is the call graph for this function:

void OpenDDS::Federator::ManagerImpl::processUpdateQos1 ( const OwnerUpdate sample,
const DDS::SampleInfo info 
) [virtual]

Process ownership changes.

Implements OpenDDS::Federator::UpdateProcessor< OwnerUpdate >.

Definition at line 907 of file FederatorManagerImpl_updates.cpp.

References ACE_TEXT(), TAO_DDS_DCPSInfo_i::changeOwnership(), OpenDDS::DCPS::DCPS_debug_level, deferred_lock_, deferredOwnerships_, OpenDDS::Federator::OwnerUpdate::domain, info_, LM_DEBUG, OpenDDS::Federator::OwnerUpdate::owner, OpenDDS::Federator::OwnerUpdate::participant, and OpenDDS::Federator::OwnerUpdate::sender.

00908 {
00909   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00910     OpenDDS::DCPS::RepoIdConverter converter(sample->participant);
00911     ACE_DEBUG((LM_DEBUG,
00912                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos1( OwnerUpdate): ")
00913                ACE_TEXT("repo %d - [ domain %d/ participant %C/ sender %d/ owner %d ]\n"),
00914                this->id().id(),
00915                sample->domain,
00916                std::string(converter).c_str(),
00917                sample->sender,
00918                sample->owner));
00919   }
00920 
00921   if (false == this->info_->changeOwnership(sample->domain,
00922                                             sample->participant,
00923                                             sample->sender,
00924                                             sample->owner)) {
00925     {
00926       ACE_GUARD(ACE_Thread_Mutex,
00927                 guard,
00928                 this->deferred_lock_);
00929 
00930       this->deferredOwnerships_.push_back(*sample);
00931     }
00932     ACE_DEBUG((LM_DEBUG,
00933                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos1( OwnerUpdate): ")
00934                ACE_TEXT("deferred update.\n")));
00935   }
00936 }

Here is the call graph for this function:

void OpenDDS::Federator::ManagerImpl::processUpdateQos2 ( const SubscriptionUpdate sample,
const DDS::SampleInfo info 
) [virtual]

Update the proxy SubscriberQos for a subscription.

Reimplemented from OpenDDS::Federator::UpdateProcessor< SubscriptionUpdate >.

Definition at line 1005 of file FederatorManagerImpl_updates.cpp.

References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::SubscriptionUpdate::domain, OpenDDS::Federator::SubscriptionUpdate::id, info_, LM_DEBUG, OpenDDS::Federator::SubscriptionUpdate::participant, OpenDDS::Federator::SubscriptionUpdate::subscriber_qos, and TAO_DDS_DCPSInfo_i::update_subscription_qos().

01006 {
01007   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01008     OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
01009     OpenDDS::DCPS::RepoIdConverter sub_converter(sample->id);
01010     ACE_DEBUG((LM_DEBUG,
01011                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos2( SubscriptionUpdate): ")
01012                ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
01013                this->id().id(),
01014                sample->domain,
01015                std::string(part_converter).c_str(),
01016                std::string(sub_converter).c_str()));
01017   }
01018 
01019   this->info_->update_subscription_qos(
01020     sample->domain,
01021     sample->participant,
01022     sample->id,
01023     sample->subscriber_qos);
01024 }

Here is the call graph for this function:

void OpenDDS::Federator::ManagerImpl::processUpdateQos2 ( const PublicationUpdate sample,
const DDS::SampleInfo info 
) [virtual]

Update the proxy PublisherQos for a publication.

Reimplemented from OpenDDS::Federator::UpdateProcessor< PublicationUpdate >.

Definition at line 961 of file FederatorManagerImpl_updates.cpp.

References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::PublicationUpdate::domain, OpenDDS::Federator::PublicationUpdate::id, info_, LM_DEBUG, OpenDDS::Federator::PublicationUpdate::participant, OpenDDS::Federator::PublicationUpdate::publisher_qos, and TAO_DDS_DCPSInfo_i::update_publication_qos().

00962 {
00963   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00964     OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
00965     OpenDDS::DCPS::RepoIdConverter pub_converter(sample->id);
00966     ACE_DEBUG((LM_DEBUG,
00967                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos2( PublicationUpdate): ")
00968                ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"),
00969                this->id().id(),
00970                sample->domain,
00971                std::string(part_converter).c_str(),
00972                std::string(pub_converter).c_str()));
00973   }
00974 
00975   this->info_->update_publication_qos(
00976     sample->domain,
00977     sample->participant,
00978     sample->id,
00979     sample->publisher_qos);
00980 }

Here is the call graph for this function:

void OpenDDS::Federator::ManagerImpl::pushState ( Manager_ptr  peer  ) 

Push our current state to a remote repository.

Definition at line 1244 of file FederatorManagerImpl_updates.cpp.

References OpenDDS::Federator::SubscriptionUpdate::action, OpenDDS::Federator::PublicationUpdate::action, OpenDDS::Federator::TopicUpdate::action, OpenDDS::Federator::OwnerUpdate::action, OpenDDS::Federator::ParticipantUpdate::action, OpenDDS::Federator::SubscriptionUpdate::callback, OpenDDS::Federator::PublicationUpdate::callback, config_, OpenDDS::Federator::CreateEntity, OpenDDS::Federator::SubscriptionUpdate::datareader_qos, OpenDDS::Federator::TopicUpdate::datatype, OpenDDS::Federator::PublicationUpdate::datawriter_qos, OpenDDS::Federator::SubscriptionUpdate::domain, OpenDDS::Federator::PublicationUpdate::domain, OpenDDS::Federator::TopicUpdate::domain, OpenDDS::Federator::OwnerUpdate::domain, OpenDDS::Federator::ParticipantUpdate::domain, TAO_DDS_DCPSInfo_i::domains(), OpenDDS::Federator::SubscriptionUpdate::expression_params, OpenDDS::Federator::Config::federationDomain(), OpenDDS::Federator::SubscriptionUpdate::filter_expression, DCPS_IR_Subscription::get_datareader_qos(), DCPS_IR_Publication::get_datawriter_qos(), DCPS_IR_Subscription::get_expr_params(), DCPS_IR_Subscription::get_filter_expression(), DCPS_IR_Subscription::get_id(), DCPS_IR_Publication::get_id(), DCPS_IR_Subscription::get_participant_id(), DCPS_IR_Publication::get_participant_id(), DCPS_IR_Publication::get_publisher_qos(), DCPS_IR_Subscription::get_subscriber_qos(), DCPS_IR_Subscription::get_topic_id(), DCPS_IR_Publication::get_topic_id(), DCPS_IR_Subscription::get_transportLocatorSeq(), DCPS_IR_Publication::get_transportLocatorSeq(), OpenDDS::Federator::SubscriptionUpdate::id, OpenDDS::Federator::PublicationUpdate::id, OpenDDS::Federator::TopicUpdate::id, OpenDDS::Federator::ParticipantUpdate::id, TAO_DDS_DCPSFederationId::id(), id(), info_, TAO_DDS_DCPSInfo_i::orb(), orb(), OpenDDS::Federator::OwnerUpdate::owner, OpenDDS::Federator::ParticipantUpdate::owner, OpenDDS::Federator::SubscriptionUpdate::participant, OpenDDS::Federator::PublicationUpdate::participant, OpenDDS::Federator::TopicUpdate::participant, OpenDDS::Federator::OwnerUpdate::participant, OpenDDS::Federator::PublicationUpdate::publisher_qos, OpenDDS::Federator::TopicUpdate::qos, OpenDDS::Federator::ParticipantUpdate::qos, DCPS_IR_Subscription::reader(), OpenDDS::Federator::SubscriptionUpdate::sender, OpenDDS::Federator::PublicationUpdate::sender, OpenDDS::Federator::TopicUpdate::sender, OpenDDS::Federator::OwnerUpdate::sender, OpenDDS::Federator::ParticipantUpdate::sender, OpenDDS::Federator::SubscriptionUpdate::subscriber_qos, OpenDDS::Federator::SubscriptionUpdate::topic, OpenDDS::Federator::PublicationUpdate::topic, OpenDDS::Federator::TopicUpdate::topic, OpenDDS::Federator::SubscriptionUpdate::transport_info, OpenDDS::Federator::PublicationUpdate::transport_info, and DCPS_IR_Publication::writer().

Referenced by join_federation().

01245 {
01246   // foreach DCPS_IR_Domain
01247   //   foreach DCPS_IR_Participant
01248   //     peer->initializeParticipant(...)
01249   //     peer->initializeOwner(...)
01250   //   foreach DCPS_IR_Participant
01251   //     foreach DCPS_IR_Topic
01252   //       peer->initializeTopic(...)
01253   //     foreach DCPS_IR_Publication
01254   //       peer->initializePublication(...)
01255   //     foreach DCPS_IR_Subscription
01256   //       peer->initializeSubscription(...)
01257 
01258   // Process each domain within the repository.
01259   for (DCPS_IR_Domain_Map::const_iterator currentDomain
01260        = this->info_->domains().begin();
01261        currentDomain != this->info_->domains().end();
01262        ++currentDomain) {
01263 
01264     if (currentDomain->second->get_id() == this->config_.federationDomain()) {
01265       // Do not push the Federation domain publications.
01266       //continue;
01267     }
01268 
01269     // Process each participant within the current domain.
01270     for (DCPS_IR_Participant_Map::const_iterator currentParticipant
01271          = currentDomain->second->participants().begin();
01272          currentParticipant != currentDomain->second->participants().end();
01273          ++currentParticipant) {
01274 
01275       if (currentParticipant->second->isBitPublisher() == true) {
01276         // Do not push the built-in topic publications.
01277         continue;
01278       }
01279 
01280       // Initialize the participant on the peer.
01281       ParticipantUpdate participantSample;
01282       participantSample.sender = this->id().id();
01283       participantSample.action = CreateEntity;
01284 
01285       participantSample.owner  =  currentParticipant->second->owner();
01286       participantSample.domain =  currentDomain->second->get_id();
01287       participantSample.id     =  currentParticipant->second->get_id();
01288       participantSample.qos    = *currentParticipant->second->get_qos();
01289 
01290       peer->initializeParticipant(participantSample);
01291 
01292       // Initialize the ownership of the participant on the peer.
01293       OwnerUpdate ownerSample;
01294       ownerSample.sender      = this->id().id();
01295       ownerSample.action      = CreateEntity;
01296 
01297       ownerSample.domain      = currentDomain->second->get_id();
01298       ownerSample.participant = currentParticipant->second->get_id();
01299       ownerSample.owner       = currentParticipant->second->owner();
01300 
01301       peer->initializeOwner(ownerSample);
01302     }
01303 
01304     // Process each participant within the current domain.
01305     for (DCPS_IR_Participant_Map::const_iterator currentParticipant
01306          = currentDomain->second->participants().begin();
01307          currentParticipant != currentDomain->second->participants().end();
01308          ++currentParticipant) {
01309 
01310       if (currentParticipant->second->isBitPublisher() == true) {
01311         // Do not push the built-in topic publications.
01312         continue;
01313       }
01314 
01315       // Process each topic within the current particpant.
01316       for (DCPS_IR_Topic_Map::const_iterator currentTopic
01317            = currentParticipant->second->topics().begin();
01318            currentTopic != currentParticipant->second->topics().end();
01319            ++currentTopic) {
01320         TopicUpdate topicSample;
01321         topicSample.sender      = this->id().id();
01322         topicSample.action      = CreateEntity;
01323 
01324         topicSample.id          = currentTopic->second->get_id();
01325         topicSample.domain      = currentDomain->second->get_id();
01326         topicSample.participant = currentTopic->second->get_participant_id();
01327         topicSample.topic       = currentTopic->second->get_topic_description()->get_name();
01328         topicSample.datatype    = currentTopic->second->get_topic_description()->get_dataTypeName();
01329         topicSample.qos         = *currentTopic->second->get_topic_qos();
01330 
01331         peer->initializeTopic(topicSample);
01332       }
01333 
01334       // Process each publication within the current particpant.
01335       for (DCPS_IR_Publication_Map::const_iterator currentPublication
01336            = currentParticipant->second->publications().begin();
01337            currentPublication != currentParticipant->second->publications().end();
01338            ++currentPublication) {
01339         PublicationUpdate publicationSample;
01340         publicationSample.sender         = this->id().id();
01341         publicationSample.action         = CreateEntity;
01342 
01343         DCPS_IR_Publication* p = currentPublication->second.get();
01344         CORBA::ORB_var orb = this->info_->orb();
01345         CORBA::String_var callback = orb->object_to_string(p->writer());
01346 
01347         publicationSample.domain         = currentDomain->second->get_id();
01348         publicationSample.participant    = p->get_participant_id();
01349         publicationSample.topic          = p->get_topic_id();
01350         publicationSample.id             = p->get_id();
01351         publicationSample.callback       = callback.in();
01352         publicationSample.datawriter_qos = *p->get_datawriter_qos();
01353         publicationSample.publisher_qos  = *p->get_publisher_qos();
01354         publicationSample.transport_info = p->get_transportLocatorSeq();
01355 
01356         peer->initializePublication(publicationSample);
01357       }
01358 
01359       // Process each subscription within the current particpant.
01360       for (DCPS_IR_Subscription_Map::const_iterator currentSubscription
01361            = currentParticipant->second->subscriptions().begin();
01362            currentSubscription != currentParticipant->second->subscriptions().end();
01363            ++currentSubscription) {
01364         SubscriptionUpdate subscriptionSample;
01365         subscriptionSample.sender         = this->id().id();
01366         subscriptionSample.action         = CreateEntity;
01367 
01368         DCPS_IR_Subscription* s = currentSubscription->second.get();
01369         CORBA::ORB_var orb = this->info_->orb();
01370         CORBA::String_var callback = orb->object_to_string(s->reader());
01371 
01372         subscriptionSample.domain         = currentDomain->second->get_id();
01373         subscriptionSample.participant    = s->get_participant_id();
01374         subscriptionSample.topic          = s->get_topic_id();
01375         subscriptionSample.id             = s->get_id();
01376         subscriptionSample.callback       = callback.in();
01377         subscriptionSample.datareader_qos = *s->get_datareader_qos();
01378         subscriptionSample.subscriber_qos = *s->get_subscriber_qos();
01379         subscriptionSample.transport_info = s->get_transportLocatorSeq();
01380         subscriptionSample.filter_expression = s->get_filter_expression().c_str();
01381         subscriptionSample.expression_params = s->get_expr_params();
01382 
01383         peer->initializeSubscription(subscriptionSample);
01384       }
01385     }
01386   }
01387 }

Here is the call graph for this function:

Here is the caller graph for this function:

OpenDDS::DCPS::DCPSInfo_ptr OpenDDS::Federator::ManagerImpl::repository ( void   )  [virtual]

Definition at line 878 of file FederatorManagerImpl.cpp.

References ACE_TEXT(), config_, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::Config::federationDomain(), CORBA::is_nil(), OpenDDS::DCPS::RcHandle< T >::is_nil(), LM_DEBUG, localRepo_, OpenDDS::DCPS::static_rchandle_cast(), and TheServiceParticipant.

00879 {
00880   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00881     ACE_DEBUG((LM_DEBUG,
00882                ACE_TEXT("(%P|%t) ManagerImpl::repository()\n")));
00883   }
00884 
00885   OpenDDS::DCPS::Discovery_rch disco
00886   = TheServiceParticipant->get_discovery(
00887       this->config_.federationDomain());
00888   OpenDDS::DCPS::DCPSInfo_var repo;
00889   if (!disco.is_nil()) {
00890     OpenDDS::DCPS::InfoRepoDiscovery_rch irDisco =
00891       DCPS::static_rchandle_cast<DCPS::InfoRepoDiscovery>(disco);
00892     repo = irDisco->get_dcps_info();
00893   }
00894 
00895   if (CORBA::is_nil(repo.in())) {
00896     return OpenDDS::DCPS::DCPSInfo::_duplicate(this->localRepo_.in());
00897 
00898   } else {
00899     return OpenDDS::DCPS::DCPSInfo::_duplicate(repo.in());
00900   }
00901 }

Here is the call graph for this function:

void OpenDDS::Federator::ManagerImpl::requestImage (  )  [virtual]

Request an image refresh to be sent to the specified callback (asynchronously).

Implements Update::Updater.

Definition at line 28 of file FederatorManagerImpl_updates.cpp.

00029 {
00030   /* This method intentionally left unimplemented. */
00031 }

void OpenDDS::Federator::ManagerImpl::shutdown ( void   )  [virtual]

Definition at line 1118 of file FederatorManagerImpl.cpp.

References federated_, info_, and TAO_DDS_DCPSInfo_i::shutdown().

01120 {
01121   // Prevent the removal of this repository from the federation during
01122   // shutdown processing.
01123   this->federated_ = false;
01124 
01125   // Shutdown the process via the repository object.
01126   this->info_->shutdown();
01127 }

Here is the call graph for this function:

void OpenDDS::Federator::ManagerImpl::unregisterCallback (  )  [virtual]

Definition at line 22 of file FederatorManagerImpl_updates.cpp.

00023 {
00024   /* This method intentionally left unimplemented. */
00025 }

void OpenDDS::Federator::ManagerImpl::update ( const Update::IdPath id,
const DDS::StringSeq exprParams 
) [virtual]

Propagate an entity has been created.

Implements Update::Updater.

Definition at line 508 of file FederatorManagerImpl_updates.cpp.

References ACE_TEXT(), OpenDDS::Federator::SubscriptionUpdate::action, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::SubscriptionUpdate::domain, OpenDDS::Federator::SubscriptionUpdate::expression_params, DDS::HANDLE_NIL, OpenDDS::Federator::SubscriptionUpdate::id, TAO_DDS_DCPSFederationId::id(), id(), CORBA::is_nil(), LM_DEBUG, OpenDDS::Federator::SubscriptionUpdate::participant, OpenDDS::Federator::SubscriptionUpdate::sender, subscriptionWriter_, and OpenDDS::Federator::UpdateFilterExpressionParams.

00509 {
00510   if (CORBA::is_nil(this->subscriptionWriter_.in())) {
00511     // Decline to publish data until we can.
00512     return;
00513   }
00514 
00515   SubscriptionUpdate sample;
00516   sample.sender            = this->id().id();
00517   sample.action            = UpdateFilterExpressionParams;
00518   sample.domain            = id.domain;
00519   sample.participant       = id.participant;
00520   sample.id                = id.id;
00521   sample.expression_params = params;
00522 
00523   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00524     OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
00525     OpenDDS::DCPS::RepoIdConverter sub_converter(sample.id);
00526     ACE_DEBUG((LM_DEBUG,
00527                ACE_TEXT("(%P|%t) Federator::ManagerImpl::update(FilterParams): ")
00528                ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
00529                this->id().id(),
00530                sample.domain,
00531                std::string(part_converter).c_str(),
00532                std::string(sub_converter).c_str()));
00533   }
00534 
00535   this->subscriptionWriter_->write(sample, DDS::HANDLE_NIL);
00536 }

Here is the call graph for this function:

void OpenDDS::Federator::ManagerImpl::update ( const Update::IdPath id,
const DDS::SubscriberQos qos 
) [virtual]

Propagate an entity has been created.

Implements Update::Updater.

Definition at line 539 of file FederatorManagerImpl_updates.cpp.

References ACE_TEXT(), OpenDDS::Federator::SubscriptionUpdate::action, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::SubscriptionUpdate::domain, DDS::HANDLE_NIL, OpenDDS::Federator::SubscriptionUpdate::id, TAO_DDS_DCPSFederationId::id(), id(), CORBA::is_nil(), LM_DEBUG, OpenDDS::Federator::SubscriptionUpdate::participant, OpenDDS::Federator::SubscriptionUpdate::sender, OpenDDS::Federator::SubscriptionUpdate::subscriber_qos, subscriptionWriter_, and OpenDDS::Federator::UpdateQosValue2.

00540 {
00541   if (CORBA::is_nil(this->subscriptionWriter_.in())) {
00542     // Decline to publish data until we can.
00543     return;
00544   }
00545 
00546   SubscriptionUpdate sample;
00547   sample.sender         = this->id().id();
00548   sample.action         = UpdateQosValue2;
00549 
00550   sample.domain         = id.domain;
00551   sample.participant    = id.participant;
00552   sample.id             = id.id;
00553   sample.subscriber_qos = qos;
00554 
00555   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00556     OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
00557     OpenDDS::DCPS::RepoIdConverter sub_converter(sample.id);
00558     ACE_DEBUG((LM_DEBUG,
00559                ACE_TEXT("(%P|%t) Federator::ManagerImpl::update( SubscriberUpdate): ")
00560                ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
00561                this->id().id(),
00562                sample.domain,
00563                std::string(part_converter).c_str(),
00564                std::string(sub_converter).c_str()));
00565   }
00566 
00567   this->subscriptionWriter_->write(sample, DDS::HANDLE_NIL);
00568 }

Here is the call graph for this function:

void OpenDDS::Federator::ManagerImpl::update ( const Update::IdPath id,
const DDS::DataReaderQos qos 
) [virtual]

Propagate an entity has been created.

Implements Update::Updater.

Definition at line 476 of file FederatorManagerImpl_updates.cpp.

References ACE_TEXT(), OpenDDS::Federator::SubscriptionUpdate::action, OpenDDS::Federator::SubscriptionUpdate::datareader_qos, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::SubscriptionUpdate::domain, DDS::HANDLE_NIL, OpenDDS::Federator::SubscriptionUpdate::id, TAO_DDS_DCPSFederationId::id(), id(), CORBA::is_nil(), LM_DEBUG, OpenDDS::Federator::SubscriptionUpdate::participant, OpenDDS::Federator::SubscriptionUpdate::sender, subscriptionWriter_, and OpenDDS::Federator::UpdateQosValue1.

00477 {
00478   if (CORBA::is_nil(this->subscriptionWriter_.in())) {
00479     // Decline to publish data until we can.
00480     return;
00481   }
00482 
00483   SubscriptionUpdate sample;
00484   sample.sender         = this->id().id();
00485   sample.action         = UpdateQosValue1;
00486 
00487   sample.domain         = id.domain;
00488   sample.participant    = id.participant;
00489   sample.id             = id.id;
00490   sample.datareader_qos = qos;
00491 
00492   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00493     OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
00494     OpenDDS::DCPS::RepoIdConverter sub_converter(sample.id);
00495     ACE_DEBUG((LM_DEBUG,
00496                ACE_TEXT("(%P|%t) Federator::ManagerImpl::update( ReaderUpdate): ")
00497                ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
00498                this->id().id(),
00499                sample.domain,
00500                std::string(part_converter).c_str(),
00501                std::string(sub_converter).c_str()));
00502   }
00503 
00504   this->subscriptionWriter_->write(sample, DDS::HANDLE_NIL);
00505 }

Here is the call graph for this function:

void OpenDDS::Federator::ManagerImpl::update ( const Update::IdPath id,
const DDS::PublisherQos qos 
) [virtual]

Propagate an entity has been created.

Implements Update::Updater.

Definition at line 444 of file FederatorManagerImpl_updates.cpp.

References ACE_TEXT(), OpenDDS::Federator::PublicationUpdate::action, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::PublicationUpdate::domain, DDS::HANDLE_NIL, OpenDDS::Federator::PublicationUpdate::id, TAO_DDS_DCPSFederationId::id(), id(), CORBA::is_nil(), LM_DEBUG, OpenDDS::Federator::PublicationUpdate::participant, publicationWriter_, OpenDDS::Federator::PublicationUpdate::publisher_qos, OpenDDS::Federator::PublicationUpdate::sender, and OpenDDS::Federator::UpdateQosValue2.

00445 {
00446   if (CORBA::is_nil(this->publicationWriter_.in())) {
00447     // Decline to publish data until we can.
00448     return;
00449   }
00450 
00451   PublicationUpdate sample;
00452   sample.sender         = this->id().id();
00453   sample.action         = UpdateQosValue2;
00454 
00455   sample.domain         = id.domain;
00456   sample.participant    = id.participant;
00457   sample.id             = id.id;
00458   sample.publisher_qos  = qos;
00459 
00460   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00461     OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
00462     OpenDDS::DCPS::RepoIdConverter pub_converter(sample.id);
00463     ACE_DEBUG((LM_DEBUG,
00464                ACE_TEXT("(%P|%t) Federator::ManagerImpl::update( PublisherUpdate): ")
00465                ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"),
00466                this->id().id(),
00467                sample.domain,
00468                std::string(part_converter).c_str(),
00469                std::string(pub_converter).c_str()));
00470   }
00471 
00472   this->publicationWriter_->write(sample, DDS::HANDLE_NIL);
00473 }

Here is the call graph for this function:

void OpenDDS::Federator::ManagerImpl::update ( const Update::IdPath id,
const DDS::DataWriterQos qos 
) [virtual]

Propagate an entity has been created.

Implements Update::Updater.

Definition at line 412 of file FederatorManagerImpl_updates.cpp.

References ACE_TEXT(), OpenDDS::Federator::PublicationUpdate::action, OpenDDS::Federator::PublicationUpdate::datawriter_qos, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::PublicationUpdate::domain, DDS::HANDLE_NIL, OpenDDS::Federator::PublicationUpdate::id, TAO_DDS_DCPSFederationId::id(), id(), CORBA::is_nil(), LM_DEBUG, OpenDDS::Federator::PublicationUpdate::participant, publicationWriter_, OpenDDS::Federator::PublicationUpdate::sender, and OpenDDS::Federator::UpdateQosValue1.

00413 {
00414   if (CORBA::is_nil(this->publicationWriter_.in())) {
00415     // Decline to publish data until we can.
00416     return;
00417   }
00418 
00419   PublicationUpdate sample;
00420   sample.sender         = this->id().id();
00421   sample.action         = UpdateQosValue1;
00422 
00423   sample.domain         = id.domain;
00424   sample.participant    = id.participant;
00425   sample.id             = id.id;
00426   sample.datawriter_qos = qos;
00427 
00428   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00429     OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
00430     OpenDDS::DCPS::RepoIdConverter pub_converter(sample.id);
00431     ACE_DEBUG((LM_DEBUG,
00432                ACE_TEXT("(%P|%t) Federator::ManagerImpl::update( WriterUpdate): ")
00433                ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"),
00434                this->id().id(),
00435                sample.domain,
00436                std::string(part_converter).c_str(),
00437                std::string(pub_converter).c_str()));
00438   }
00439 
00440   this->publicationWriter_->write(sample, DDS::HANDLE_NIL);
00441 }

Here is the call graph for this function:

void OpenDDS::Federator::ManagerImpl::update ( const Update::IdPath id,
const DDS::TopicQos qos 
) [virtual]

Propagate an entity has been created.

Implements Update::Updater.

Definition at line 380 of file FederatorManagerImpl_updates.cpp.

References ACE_TEXT(), OpenDDS::Federator::TopicUpdate::action, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::TopicUpdate::domain, DDS::HANDLE_NIL, OpenDDS::Federator::TopicUpdate::id, TAO_DDS_DCPSFederationId::id(), id(), CORBA::is_nil(), LM_DEBUG, OpenDDS::Federator::TopicUpdate::participant, OpenDDS::Federator::TopicUpdate::qos, OpenDDS::Federator::TopicUpdate::sender, topicWriter_, and OpenDDS::Federator::UpdateQosValue1.

00381 {
00382   if (CORBA::is_nil(this->topicWriter_.in())) {
00383     // Decline to publish data until we can.
00384     return;
00385   }
00386 
00387   TopicUpdate sample;
00388   sample.sender      = this->id().id();
00389   sample.action      = UpdateQosValue1;
00390 
00391   sample.id          = id.id;
00392   sample.domain      = id.domain;
00393   sample.participant = id.participant;
00394   sample.qos         = qos;
00395 
00396   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00397     OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
00398     OpenDDS::DCPS::RepoIdConverter topic_converter(sample.id);
00399     ACE_DEBUG((LM_DEBUG,
00400                ACE_TEXT("(%P|%t) Federator::ManagerImpl::update( TopicUpdate): ")
00401                ACE_TEXT("repo %d - [ domain %d/ participant %C/ topic %C ]\n"),
00402                this->id().id(),
00403                sample.domain,
00404                std::string(part_converter).c_str(),
00405                std::string(topic_converter).c_str()));
00406   }
00407 
00408   this->topicWriter_->write(sample, DDS::HANDLE_NIL);
00409 }

Here is the call graph for this function:

void OpenDDS::Federator::ManagerImpl::update ( const Update::IdPath id,
const DDS::DomainParticipantQos qos 
) [virtual]

Propagate updated Qos parameters for an entity.

Implements Update::Updater.

Definition at line 351 of file FederatorManagerImpl_updates.cpp.

References ACE_TEXT(), OpenDDS::Federator::ParticipantUpdate::action, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::ParticipantUpdate::domain, DDS::HANDLE_NIL, OpenDDS::Federator::ParticipantUpdate::id, TAO_DDS_DCPSFederationId::id(), id(), CORBA::is_nil(), LM_DEBUG, participantWriter_, OpenDDS::Federator::ParticipantUpdate::qos, OpenDDS::Federator::ParticipantUpdate::sender, and OpenDDS::Federator::UpdateQosValue1.

00352 {
00353   if (CORBA::is_nil(this->participantWriter_.in())) {
00354     // Decline to publish data until we can.
00355     return;
00356   }
00357 
00358   ParticipantUpdate sample;
00359   sample.sender = this->id().id();
00360   sample.action = UpdateQosValue1;
00361 
00362   sample.domain = id.domain;
00363   sample.id     = id.id;
00364   sample.qos    = qos;
00365 
00366   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00367     OpenDDS::DCPS::RepoIdConverter converter(sample.id);
00368     ACE_DEBUG((LM_DEBUG,
00369                ACE_TEXT("(%P|%t) Federator::ManagerImpl::update( ParticipantUpdate): ")
00370                ACE_TEXT("repo %d - [ domain %d/ participant %C ]\n"),
00371                this->id().id(),
00372                sample.domain,
00373                std::string(converter).c_str()));
00374   }
00375 
00376   this->participantWriter_->write(sample, DDS::HANDLE_NIL);
00377 }

Here is the call graph for this function:


Member Data Documentation

The configuration information for this manager.

Definition at line 231 of file FederatorManagerImpl.h.

Referenced by destroy(), id(), initialize(), join_federation(), pushState(), and repository().

Protect deferred updates.

Definition at line 294 of file FederatorManagerImpl.h.

Referenced by processCreate(), processDeferred(), processDelete(), and processUpdateQos1().

Deferred ownership updates.

Definition at line 279 of file FederatorManagerImpl.h.

Referenced by processCreate(), processDeferred(), processDelete(), and processUpdateQos1().

Deferred publication updates.

Definition at line 285 of file FederatorManagerImpl.h.

Referenced by processCreate(), and processDeferred().

Deferred subscription updates.

Definition at line 288 of file FederatorManagerImpl.h.

Referenced by processCreate(), and processDeferred().

Deferred topic updates.

Definition at line 282 of file FederatorManagerImpl.h.

Referenced by processCreate(), and processDeferred().

Flag indicating that we are actively participating in a federation of repositories.

Definition at line 219 of file FederatorManagerImpl.h.

Referenced by finalize(), join_federation(), and shutdown().

DDS::DomainParticipant_var OpenDDS::Federator::ManagerImpl::federationParticipant_ [private]

local DomainParticipant

Definition at line 246 of file FederatorManagerImpl.h.

Referenced by finalize(), and initialize().

Simple recursion avoidance during the join operations.

Definition at line 212 of file FederatorManagerImpl.h.

Referenced by join_federation().

Condition used to gate joining activities.

Definition at line 209 of file FederatorManagerImpl.h.

Referenced by join_federation().

Repository to which we joined.

Definition at line 215 of file FederatorManagerImpl.h.

Referenced by finalize(), and join_federation().

OpenDDS::DCPS::DCPSInfo_var OpenDDS::Federator::ManagerImpl::localRepo_ [private]

Remotely callable reference to the local repository.

Definition at line 237 of file FederatorManagerImpl.h.

Referenced by localRepo(), and repository().

ACE_SYNCH_MUTEX OpenDDS::Federator::ManagerImpl::lock_ [private]

Critical section MUTEX.

Definition at line 206 of file FederatorManagerImpl.h.

Referenced by join_federation().

Is multicast enabled?

Definition at line 291 of file FederatorManagerImpl.h.

Referenced by initialize(), and ManagerImpl().

Multicast responder.

Definition at line 243 of file FederatorManagerImpl.h.

Referenced by finalize(), and initialize().

The ORB in which we are activated.

Definition at line 240 of file FederatorManagerImpl.h.

Referenced by finalize(), initialize(), and orb().

TopicUpdate listener.

Definition at line 249 of file FederatorManagerImpl.h.

Referenced by finalize(), and initialize().

OwnerUpdateDataWriter_var OpenDDS::Federator::ManagerImpl::ownerWriter_ [private]

TopicUpdate writer.

Definition at line 264 of file FederatorManagerImpl.h.

Referenced by create(), and initialize().

ParticipantUpdate listener.

Definition at line 255 of file FederatorManagerImpl.h.

Referenced by finalize(), and initialize().

ParticipantUpdateDataWriter_var OpenDDS::Federator::ManagerImpl::participantWriter_ [private]

ParticipantUpdate writer.

Definition at line 270 of file FederatorManagerImpl.h.

Referenced by create(), destroy(), initialize(), join_federation(), and update().

The peer with which we have federated.

Definition at line 225 of file FederatorManagerImpl.h.

Referenced by finalize(), join_federation(), and leave_federation().

PublicationUpdate listener.

Definition at line 258 of file FederatorManagerImpl.h.

Referenced by finalize(), and initialize().

PublicationUpdateDataWriter_var OpenDDS::Federator::ManagerImpl::publicationWriter_ [private]

PublicationUpdate writer.

Definition at line 273 of file FederatorManagerImpl.h.

Referenced by destroy(), initialize(), and update().

The packet sequence number for data that we publish.

Definition at line 228 of file FederatorManagerImpl.h.

SubscriptionUpdate listener.

Definition at line 261 of file FederatorManagerImpl.h.

Referenced by finalize(), and initialize().

SubscriptionUpdateDataWriter_var OpenDDS::Federator::ManagerImpl::subscriptionWriter_ [private]

SubscriptionUpdate writer.

Definition at line 276 of file FederatorManagerImpl.h.

Referenced by create(), destroy(), initialize(), and update().

TopicUpdate listener.

Definition at line 252 of file FederatorManagerImpl.h.

Referenced by finalize(), and initialize().

TopicUpdateDataWriter_var OpenDDS::Federator::ManagerImpl::topicWriter_ [private]

TopicUpdate writer.

Definition at line 267 of file FederatorManagerImpl.h.

Referenced by create(), destroy(), initialize(), and update().


The documentation for this class was generated from the following files:
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1