OpenDDS  Snapshot(2023/04/28-20:55)
Public Member Functions | Private Types | Private Attributes | List of all members
OpenDDS::Federator::ManagerImpl Class Reference

#include <FederatorManagerImpl.h>

Inheritance diagram for OpenDDS::Federator::ManagerImpl:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::Federator::ManagerImpl:
Collaboration graph
[legend]

Public Member Functions

 ManagerImpl (Config &config)
 
virtual ~ManagerImpl ()
 
virtual CORBA::Boolean discover_federation (const char *ior)
 
virtual Manager_ptr join_federation (Manager_ptr peer, FederationDomain federation)
 
virtual void leave_federation (RepoKey id)
 
virtual RepoKey federation_id ()
 
virtual OpenDDS::DCPS::DCPSInfo_ptr repository ()
 
virtual void initializeOwner (const OpenDDS::Federator::OwnerUpdate &data)
 
virtual void initializeTopic (const OpenDDS::Federator::TopicUpdate &data)
 
virtual void initializeParticipant (const OpenDDS::Federator::ParticipantUpdate &data)
 
virtual void initializePublication (const OpenDDS::Federator::PublicationUpdate &data)
 
virtual void initializeSubscription (const OpenDDS::Federator::SubscriptionUpdate &data)
 
virtual void leave_and_shutdown ()
 
virtual void shutdown ()
 
void initialize ()
 Establish the update publications and subscriptions. More...
 
void finalize ()
 Release resources gracefully. More...
 
TAO_DDS_DCPSInfo_i *& info ()
 Accessors for the DCPSInfo reference. More...
 
TAO_DDS_DCPSInfo_iinfo () const
 
void localRepo (::OpenDDS::DCPS::DCPSInfo_ptr repo)
 Capture a remote callable reference to the DCPSInfo. More...
 
const TAO_DDS_DCPSFederationIdid () const
 Accessors for the federation Id value. More...
 
CORBA::ORB_ptr orb ()
 Accessors for the ORB. More...
 
void orb (CORBA::ORB_ptr value)
 
void pushState (Manager_ptr peer)
 Push our current state to a remote repository. More...
 
void processDeferred ()
 Handle any deferred updates that might have become processable. More...
 
virtual void unregisterCallback ()
 
virtual void requestImage ()
 
virtual void create (const Update::UTopic &topic)
 
virtual void create (const Update::UParticipant &participant)
 
virtual void create (const Update::URActor &reader)
 
virtual void create (const Update::UWActor &writer)
 
virtual void create (const Update::OwnershipData &data)
 
virtual void update (const Update::IdPath &id, const DDS::DomainParticipantQos &qos)
 
virtual void update (const Update::IdPath &id, const DDS::TopicQos &qos)
 
virtual void update (const Update::IdPath &id, const DDS::DataWriterQos &qos)
 
virtual void update (const Update::IdPath &id, const DDS::PublisherQos &qos)
 
virtual void update (const Update::IdPath &id, const DDS::DataReaderQos &qos)
 
virtual void update (const Update::IdPath &id, const DDS::SubscriberQos &qos)
 
virtual void update (const Update::IdPath &id, const DDS::StringSeq &exprParams)
 
virtual void destroy (const Update::IdPath &id, Update::ItemType type, Update::ActorType actor)
 Propagate that an entity has been destroyed. More...
 
void processCreate (const OwnerUpdate *sample, const DDS::SampleInfo *info)
 Null implementation for OwnerUpdate samples. More...
 
void processCreate (const PublicationUpdate *sample, const DDS::SampleInfo *info)
 Create a proxy for a new publication. More...
 
void processCreate (const SubscriptionUpdate *sample, const DDS::SampleInfo *info)
 Create a proxy for a new subscription. More...
 
void processCreate (const ParticipantUpdate *sample, const DDS::SampleInfo *info)
 Create a proxy for a new participant. More...
 
void processCreate (const TopicUpdate *sample, const DDS::SampleInfo *info)
 Create a proxy for a new topic. More...
 
void processUpdateQos1 (const OwnerUpdate *sample, const DDS::SampleInfo *info)
 Process ownership changes. More...
 
void processUpdateQos1 (const PublicationUpdate *sample, const DDS::SampleInfo *info)
 Update the proxy DataWriterQos for a publication. More...
 
void processUpdateQos2 (const PublicationUpdate *sample, const DDS::SampleInfo *info)
 Update the proxy PublisherQos for a publication. More...
 
void processUpdateQos1 (const SubscriptionUpdate *sample, const DDS::SampleInfo *info)
 Update the proxy DataReaderQos for a subscription. More...
 
void processUpdateQos2 (const SubscriptionUpdate *sample, const DDS::SampleInfo *info)
 Update the proxy SubscriberQos for a subscription. More...
 
void processUpdateFilterExpressionParams (const SubscriptionUpdate *sample, const DDS::SampleInfo *info)
 Update the proxy filter expression params for a subscription. More...
 
void processUpdateQos1 (const ParticipantUpdate *sample, const DDS::SampleInfo *info)
 Update the proxy ParticipantQos for a participant. More...
 
void processUpdateQos1 (const TopicUpdate *sample, const DDS::SampleInfo *info)
 Update the proxy TopicQos for a topic. More...
 
void processDelete (const OwnerUpdate *sample, const DDS::SampleInfo *info)
 Null implementation for OwnerUpdate samples. More...
 
void processDelete (const PublicationUpdate *sample, const DDS::SampleInfo *info)
 Delete a proxy for a publication. More...
 
void processDelete (const SubscriptionUpdate *sample, const DDS::SampleInfo *info)
 Delete a proxy for a subscription. More...
 
void processDelete (const ParticipantUpdate *sample, const DDS::SampleInfo *info)
 Delete a proxy for a participant. More...
 
void processDelete (const TopicUpdate *sample, const DDS::SampleInfo *info)
 Delete a proxy for a topic. More...
 
- Public Member Functions inherited from Update::Updater
virtual ~Updater ()
 
virtual void updateLastPartId (PartIdType partId)
 Update Last Participant Id for the repo. More...
 
- Public Member Functions inherited from OpenDDS::Federator::UpdateProcessor< OwnerUpdate >
 UpdateProcessor ()
 
virtual ~UpdateProcessor ()
 
virtual void processUpdateQos2 (const OwnerUpdate *sample, const DDS::SampleInfo *info)
 
virtual void processUpdateFilterExpressionParams (const OwnerUpdate *sample, const DDS::SampleInfo *info)
 A default null implementation is provided. More...
 
void processSample (const OwnerUpdate *sample, const DDS::SampleInfo *info)
 Update publication information with sample data. More...
 
- Public Member Functions inherited from OpenDDS::Federator::UpdateProcessor< TopicUpdate >
 UpdateProcessor ()
 
virtual ~UpdateProcessor ()
 
virtual void processUpdateQos2 (const TopicUpdate *sample, const DDS::SampleInfo *info)
 
virtual void processUpdateFilterExpressionParams (const TopicUpdate *sample, const DDS::SampleInfo *info)
 A default null implementation is provided. More...
 
void processSample (const TopicUpdate *sample, const DDS::SampleInfo *info)
 Update publication information with sample data. More...
 
- Public Member Functions inherited from OpenDDS::Federator::UpdateProcessor< ParticipantUpdate >
 UpdateProcessor ()
 
virtual ~UpdateProcessor ()
 
virtual void processUpdateQos2 (const ParticipantUpdate *sample, const DDS::SampleInfo *info)
 
virtual void processUpdateFilterExpressionParams (const ParticipantUpdate *sample, const DDS::SampleInfo *info)
 A default null implementation is provided. More...
 
void processSample (const ParticipantUpdate *sample, const DDS::SampleInfo *info)
 Update publication information with sample data. More...
 
- Public Member Functions inherited from OpenDDS::Federator::UpdateProcessor< SubscriptionUpdate >
 UpdateProcessor ()
 
virtual ~UpdateProcessor ()
 
void processSample (const SubscriptionUpdate *sample, const DDS::SampleInfo *info)
 Update publication information with sample data. More...
 
- Public Member Functions inherited from OpenDDS::Federator::UpdateProcessor< PublicationUpdate >
 UpdateProcessor ()
 
virtual ~UpdateProcessor ()
 
virtual void processUpdateFilterExpressionParams (const PublicationUpdate *sample, const DDS::SampleInfo *info)
 A default null implementation is provided. More...
 
void processSample (const PublicationUpdate *sample, const DDS::SampleInfo *info)
 Update publication information with sample data. More...
 

Private Types

typedef std::map< RepoKey, Manager_var > IdToManagerMap
 Map type to hold references to federated repository Managers. More...
 

Private Attributes

ACE_SYNCH_MUTEX lock_
 Critical section MUTEX. More...
 
ACE_Condition< ACE_SYNCH_MUTEXjoining_
 Condition used to gate joining activities. More...
 
RepoKey joiner_
 Simple recursion avoidance during the join operations. More...
 
RepoKey joinRepo_
 Repository to which we joined. More...
 
bool federated_
 
IdToManagerMap peers_
 The peer with which we have federated. More...
 
OpenDDS::DCPS::SequenceNumber sequence_
 The packet sequence number for data that we publish. More...
 
Configconfig_
 The configuration information for this manager. More...
 
TAO_DDS_DCPSInfo_iinfo_
 The Info object reference to update. More...
 
OpenDDS::DCPS::DCPSInfo_var localRepo_
 Remotely callable reference to the local repository. More...
 
CORBA::ORB_var orb_
 The ORB in which we are activated. More...
 
InfoRepoMulticastResponder multicastResponder_
 Multicast responder. More...
 
DDS::DomainParticipant_var federationParticipant_
 local DomainParticipant More...
 
UpdateListener< OwnerUpdate, OwnerUpdateDataReader > ownerListener_
 TopicUpdate listener. More...
 
UpdateListener< TopicUpdate, TopicUpdateDataReader > topicListener_
 TopicUpdate listener. More...
 
UpdateListener< ParticipantUpdate, ParticipantUpdateDataReader > participantListener_
 ParticipantUpdate listener. More...
 
UpdateListener< PublicationUpdate, PublicationUpdateDataReader > publicationListener_
 PublicationUpdate listener. More...
 
UpdateListener< SubscriptionUpdate, SubscriptionUpdateDataReader > subscriptionListener_
 SubscriptionUpdate listener. More...
 
OwnerUpdateDataWriter_var ownerWriter_
 TopicUpdate writer. More...
 
TopicUpdateDataWriter_var topicWriter_
 TopicUpdate writer. More...
 
ParticipantUpdateDataWriter_var participantWriter_
 ParticipantUpdate writer. More...
 
PublicationUpdateDataWriter_var publicationWriter_
 PublicationUpdate writer. More...
 
SubscriptionUpdateDataWriter_var subscriptionWriter_
 SubscriptionUpdate writer. More...
 
std::list< OwnerUpdatedeferredOwnerships_
 Deferred ownership updates. More...
 
std::list< TopicUpdatedeferredTopics_
 Deferred topic updates. More...
 
std::list< PublicationUpdatedeferredPublications_
 Deferred publication updates. More...
 
std::list< SubscriptionUpdatedeferredSubscriptions_
 Deferred subscription updates. More...
 
bool multicastEnabled_
 Is multicast enabled? More...
 
ACE_Thread_Mutex deferred_lock_
 Protect deferred updates. More...
 

Detailed Description

Definition at line 36 of file FederatorManagerImpl.h.

Member Typedef Documentation

◆ IdToManagerMap

typedef std::map<RepoKey, Manager_var> OpenDDS::Federator::ManagerImpl::IdToManagerMap
private

Map type to hold references to federated repository Managers.

Definition at line 222 of file FederatorManagerImpl.h.

Constructor & Destructor Documentation

◆ ManagerImpl()

OpenDDS::Federator::ManagerImpl::ManagerImpl ( Config config)

Definition at line 42 of file FederatorManagerImpl.cpp.

References ACE_DEBUG, ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, ACE_OS::getenv(), LM_DEBUG, and multicastEnabled_.

43  : joining_(this->lock_),
46  federated_(false),
47  config_(config),
48  info_(0),
49  ownerListener_(*this),
50  topicListener_(*this),
51  participantListener_(*this),
52  publicationListener_(*this),
53  subscriptionListener_(*this),
54  multicastEnabled_(false)
55 {
57  ACE_DEBUG((LM_DEBUG,
58  ACE_TEXT("(%P|%t) Federator::ManagerImpl::ManagerImpl()\n")));
59  }
60 
61  char* mdec = ACE_OS::getenv("MulticastDiscoveryEnabled");
62 
63  if (mdec != 0) {
64  std::string mde(ACE_OS::getenv("MulticastDiscoveryEnabled"));
65 
66  if (mde != "0") {
67  multicastEnabled_ = true;
68  }
69  }
70 }
bool multicastEnabled_
Is multicast enabled?
RepoKey joiner_
Simple recursion avoidance during the join operations.
#define ACE_DEBUG(X)
UpdateListener< SubscriptionUpdate, SubscriptionUpdateDataReader > subscriptionListener_
SubscriptionUpdate listener.
TAO_DDS_DCPSInfo_i * info_
The Info object reference to update.
Config & config_
The configuration information for this manager.
char * getenv(const char *symbol)
const RepoKey NIL_REPOSITORY
Definition: Federator.idl:36
ACE_Condition< ACE_SYNCH_MUTEX > joining_
Condition used to gate joining activities.
UpdateListener< OwnerUpdate, OwnerUpdateDataReader > ownerListener_
TopicUpdate listener.
RepoKey joinRepo_
Repository to which we joined.
UpdateListener< PublicationUpdate, PublicationUpdateDataReader > publicationListener_
PublicationUpdate listener.
UpdateListener< ParticipantUpdate, ParticipantUpdateDataReader > participantListener_
ParticipantUpdate listener.
ACE_TEXT("TCP_Factory")
ACE_SYNCH_MUTEX lock_
Critical section MUTEX.
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
UpdateListener< TopicUpdate, TopicUpdateDataReader > topicListener_
TopicUpdate listener.

◆ ~ManagerImpl()

OpenDDS::Federator::ManagerImpl::~ManagerImpl ( )
virtual

Definition at line 72 of file FederatorManagerImpl.cpp.

References ACE_DEBUG, ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, and LM_DEBUG.

73 {
75  ACE_DEBUG((LM_DEBUG,
76  ACE_TEXT("(%P|%t) Federator::ManagerImpl::~ManagerImpl()\n")));
77  }
78 }
#define ACE_DEBUG(X)
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30

Member Function Documentation

◆ create() [1/5]

void OpenDDS::Federator::ManagerImpl::create ( const Update::UTopic topic)
virtual

Propagate an entity has been created.

Implements Update::Updater.

Definition at line 40 of file FederatorManagerImpl_updates.cpp.

References ACE_DEBUG, ACE_TEXT(), OpenDDS::Federator::TopicUpdate::action, OpenDDS::Federator::CreateEntity, Update::TopicStrt< Q, S >::dataType, OpenDDS::Federator::TopicUpdate::datatype, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::TopicUpdate::domain, Update::TopicStrt< Q, S >::domainId, DDS::HANDLE_NIL, TAO_DDS_DCPSFederationId::id(), OpenDDS::Federator::TopicUpdate::id, id(), CORBA::is_nil(), LM_DEBUG, Update::TopicStrt< Q, S >::name, OpenDDS::Federator::TopicUpdate::participant, Update::TopicStrt< Q, S >::participantId, OpenDDS::Federator::TopicUpdate::qos, OpenDDS::Federator::TopicUpdate::sender, OpenDDS::Federator::TopicUpdate::topic, Update::TopicStrt< Q, S >::topicId, Update::TopicStrt< Q, S >::topicQos, and topicWriter_.

Referenced by create().

41 {
42  if (CORBA::is_nil(this->topicWriter_.in())) {
43  // Decline to publish data until we can.
44  return;
45  }
46 
47  TopicUpdate sample = TopicUpdate();
48  sample.sender = this->id().id();
49  sample.action = CreateEntity;
50 
51  sample.id = topic.topicId;
52  sample.domain = topic.domainId;
53  sample.participant = topic.participantId;
54  sample.topic = topic.name.c_str();
55  sample.datatype = topic.dataType.c_str();
56  sample.qos = topic.topicQos;
57 
59  OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
60  OpenDDS::DCPS::RepoIdConverter topic_converter(sample.id);
61  ACE_DEBUG((LM_DEBUG,
62  ACE_TEXT("(%P|%t) Federator::ManagerImpl::create( TopicUpdate): ")
63  ACE_TEXT("repo %d - [ domain %d/ participant %C/ topic %C ]\n"),
64  this->id().id(),
65  sample.domain,
66  std::string(part_converter).c_str(),
67  std::string(topic_converter).c_str()));
68  }
69 
70  this->topicWriter_->write(sample, DDS::HANDLE_NIL);
71 }
#define ACE_DEBUG(X)
const InstanceHandle_t HANDLE_NIL
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
const TAO_DDS_DCPSFederationId & id() const
Accessors for the federation Id value.
DomainIdType domainId
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
TopicUpdateDataWriter_var topicWriter_
TopicUpdate writer.
Boolean is_nil(T x)
void id(RepoKey fedId)

◆ create() [2/5]

void OpenDDS::Federator::ManagerImpl::create ( const Update::UParticipant participant)
virtual

Propagate an entity has been created.

Implements Update::Updater.

Definition at line 74 of file FederatorManagerImpl_updates.cpp.

References ACE_DEBUG, ACE_TEXT(), OpenDDS::Federator::ParticipantUpdate::action, OpenDDS::Federator::CreateEntity, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::ParticipantUpdate::domain, Update::ParticipantStrt< Q >::domainId, DDS::HANDLE_NIL, TAO_DDS_DCPSFederationId::id(), OpenDDS::Federator::ParticipantUpdate::id, id(), CORBA::is_nil(), LM_DEBUG, OpenDDS::Federator::ParticipantUpdate::owner, Update::ParticipantStrt< Q >::owner, Update::ParticipantStrt< Q >::participantId, Update::ParticipantStrt< Q >::participantQos, participantWriter_, OpenDDS::Federator::ParticipantUpdate::qos, and OpenDDS::Federator::ParticipantUpdate::sender.

75 {
76  if (CORBA::is_nil(this->participantWriter_.in())) {
77  // Decline to publish data until we can.
78  return;
79  }
80 
81  ParticipantUpdate sample = ParticipantUpdate();
82  sample.sender = this->id().id();
83  sample.action = CreateEntity;
84 
85  sample.owner = participant.owner;
86  sample.domain = participant.domainId;
87  sample.id = participant.participantId;
88  sample.qos = participant.participantQos;
89 
91  OpenDDS::DCPS::RepoIdConverter converter(sample.id);
92  ACE_DEBUG((LM_DEBUG,
93  ACE_TEXT("(%P|%t) Federator::ManagerImpl::create( ParticipantUpdate): ")
94  ACE_TEXT("repo %d - [ domain %d/ participant %C ]\n"),
95  this->id().id(),
96  sample.domain,
97  std::string(converter).c_str()));
98  }
99 
100  this->participantWriter_->write(sample, DDS::HANDLE_NIL);
101 }
#define ACE_DEBUG(X)
const InstanceHandle_t HANDLE_NIL
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
const TAO_DDS_DCPSFederationId & id() const
Accessors for the federation Id value.
ParticipantUpdateDataWriter_var participantWriter_
ParticipantUpdate writer.
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
Boolean is_nil(T x)
void id(RepoKey fedId)

◆ create() [3/5]

void OpenDDS::Federator::ManagerImpl::create ( const Update::URActor actor)
virtual

Propagate an entity has been created.

Implements Update::Updater.

Definition at line 104 of file FederatorManagerImpl_updates.cpp.

References ACE_DEBUG, ACE_TEXT(), OpenDDS::Federator::PublicationUpdate::action, OpenDDS::Federator::SubscriptionUpdate::action, Update::ActorStrt< PSQ, RWQ, C, T, CSP, STI >::actorId, OpenDDS::Federator::PublicationUpdate::callback, Update::ActorStrt< PSQ, RWQ, C, T, CSP, STI >::callback, OpenDDS::Federator::SubscriptionUpdate::callback, Update::ActorStrt< PSQ, RWQ, C, T, CSP, STI >::contentSubscriptionProfile, create(), OpenDDS::Federator::CreateEntity, OpenDDS::Federator::SubscriptionUpdate::datareader_qos, OpenDDS::Federator::PublicationUpdate::datawriter_qos, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::PublicationUpdate::domain, OpenDDS::Federator::SubscriptionUpdate::domain, Update::ActorStrt< PSQ, RWQ, C, T, CSP, STI >::domainId, Update::ActorStrt< PSQ, RWQ, C, T, CSP, STI >::drdwQos, OpenDDS::Federator::SubscriptionUpdate::expression_params, OpenDDS::Federator::SubscriptionUpdate::filter_class_name, OpenDDS::Federator::SubscriptionUpdate::filter_expression, DDS::HANDLE_NIL, TAO_DDS_DCPSFederationId::id(), id(), OpenDDS::Federator::PublicationUpdate::id, OpenDDS::Federator::SubscriptionUpdate::id, CORBA::is_nil(), LM_DEBUG, OpenDDS::Federator::PublicationUpdate::participant, OpenDDS::Federator::SubscriptionUpdate::participant, Update::ActorStrt< PSQ, RWQ, C, T, CSP, STI >::participantId, publicationWriter_, OpenDDS::Federator::PublicationUpdate::publisher_qos, Update::ActorStrt< PSQ, RWQ, C, T, CSP, STI >::pubsubQos, OpenDDS::Federator::PublicationUpdate::sender, OpenDDS::Federator::SubscriptionUpdate::sender, OpenDDS::Federator::PublicationUpdate::serialized_type_info, OpenDDS::Federator::SubscriptionUpdate::serialized_type_info, Update::ActorStrt< PSQ, RWQ, C, T, CSP, STI >::serializedTypeInfo, OpenDDS::Federator::SubscriptionUpdate::subscriber_qos, subscriptionWriter_, OpenDDS::Federator::PublicationUpdate::topic, OpenDDS::Federator::SubscriptionUpdate::topic, Update::ActorStrt< PSQ, RWQ, C, T, CSP, STI >::topicId, OpenDDS::Federator::PublicationUpdate::transport_info, OpenDDS::Federator::SubscriptionUpdate::transport_info, and Update::ActorStrt< PSQ, RWQ, C, T, CSP, STI >::transportInterfaceInfo.

105 {
106  if (CORBA::is_nil(this->subscriptionWriter_.in())) {
107  // Decline to publish data until we can.
108  return;
109  }
110 
111  SubscriptionUpdate sample = SubscriptionUpdate();
112  sample.sender = this->id().id();
113  sample.action = CreateEntity;
114 
115  sample.domain = reader.domainId;
116  sample.participant = reader.participantId;
117  sample.topic = reader.topicId;
118  sample.id = reader.actorId;
119  sample.callback = reader.callback.c_str();
120  sample.datareader_qos = reader.drdwQos;
121  sample.subscriber_qos = reader.pubsubQos;
122  sample.transport_info = reader.transportInterfaceInfo;
123  sample.filter_class_name = reader.contentSubscriptionProfile.filterClassName;
124  sample.filter_expression = reader.contentSubscriptionProfile.filterExpr;
125  sample.expression_params = reader.contentSubscriptionProfile.exprParams;
126  sample.serialized_type_info = reader.serializedTypeInfo;
127 
129  OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
130  OpenDDS::DCPS::RepoIdConverter sub_converter(sample.id);
131  ACE_DEBUG((LM_DEBUG,
132  ACE_TEXT("(%P|%t) Federator::ManagerImpl::create( SubscriptionUpdate): ")
133  ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
134  this->id().id(),
135  sample.domain,
136  std::string(part_converter).c_str(),
137  std::string(sub_converter).c_str()));
138  }
139 
140  this->subscriptionWriter_->write(sample, DDS::HANDLE_NIL);
141 }
#define ACE_DEBUG(X)
const InstanceHandle_t HANDLE_NIL
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
const TAO_DDS_DCPSFederationId & id() const
Accessors for the federation Id value.
SubscriptionUpdateDataWriter_var subscriptionWriter_
SubscriptionUpdate writer.
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
Boolean is_nil(T x)
void id(RepoKey fedId)

◆ create() [4/5]

virtual void OpenDDS::Federator::ManagerImpl::create ( const Update::UWActor actor)
virtual

Propagate an entity has been created.

Implements Update::Updater.

◆ create() [5/5]

void OpenDDS::Federator::ManagerImpl::create ( const Update::OwnershipData data)
virtual

Propagate an entity has been created.

Implements Update::Updater.

Definition at line 181 of file FederatorManagerImpl_updates.cpp.

References ACE_DEBUG, ACE_TEXT(), OpenDDS::Federator::OwnerUpdate::action, OpenDDS::Federator::CreateEntity, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::OwnerUpdate::domain, Update::OwnershipData::domain, DDS::HANDLE_NIL, TAO_DDS_DCPSFederationId::id(), id(), CORBA::is_nil(), LM_DEBUG, OpenDDS::Federator::OwnerUpdate::owner, Update::OwnershipData::owner, ownerWriter_, OpenDDS::Federator::OwnerUpdate::participant, Update::OwnershipData::participant, and OpenDDS::Federator::OwnerUpdate::sender.

182 {
183  if (CORBA::is_nil(this->ownerWriter_.in())) {
184  // Decline to publish data until we can.
185  return;
186  }
187 
188  OwnerUpdate sample = OwnerUpdate();
189  sample.sender = this->id().id();
190  sample.action = CreateEntity;
191 
192  sample.domain = data.domain;
193  sample.participant = data.participant;
194  sample.owner = data.owner;
195 
197  OpenDDS::DCPS::RepoIdConverter converter(sample.participant);
198  ACE_DEBUG((LM_DEBUG,
199  ACE_TEXT("(%P|%t) Federator::ManagerImpl::create( OwnerUpdate): ")
200  ACE_TEXT("repo %d - [ domain %d/ participant %C/ sender %d/ owner %d ]\n"),
201  this->id().id(),
202  sample.domain,
203  std::string(converter).c_str(),
204  sample.sender,
205  sample.owner));
206  }
207 
208  this->ownerWriter_->write(sample, DDS::HANDLE_NIL);
209 }
#define ACE_DEBUG(X)
const InstanceHandle_t HANDLE_NIL
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
const TAO_DDS_DCPSFederationId & id() const
Accessors for the federation Id value.
OwnerUpdateDataWriter_var ownerWriter_
TopicUpdate writer.
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
Boolean is_nil(T x)
void id(RepoKey fedId)

◆ destroy()

void OpenDDS::Federator::ManagerImpl::destroy ( const Update::IdPath id,
Update::ItemType  type,
Update::ActorType  actor 
)
virtual

Propagate that an entity has been destroyed.

Implements Update::Updater.

Definition at line 212 of file FederatorManagerImpl_updates.cpp.

References ACE_DEBUG, ACE_TEXT(), OpenDDS::Federator::TopicUpdate::action, OpenDDS::Federator::ParticipantUpdate::action, OpenDDS::Federator::PublicationUpdate::action, OpenDDS::Federator::SubscriptionUpdate::action, Update::Actor, config_, Update::DataReader, Update::DataWriter, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::DestroyEntity, OpenDDS::Federator::TopicUpdate::domain, OpenDDS::Federator::ParticipantUpdate::domain, OpenDDS::Federator::PublicationUpdate::domain, OpenDDS::Federator::SubscriptionUpdate::domain, OpenDDS::Federator::Config::federationDomain(), DDS::HANDLE_NIL, TAO_DDS_DCPSFederationId::id(), OpenDDS::Federator::TopicUpdate::id, OpenDDS::Federator::ParticipantUpdate::id, id(), OpenDDS::Federator::PublicationUpdate::id, OpenDDS::Federator::SubscriptionUpdate::id, CORBA::is_nil(), LM_DEBUG, Update::Participant, OpenDDS::Federator::TopicUpdate::participant, OpenDDS::Federator::PublicationUpdate::participant, OpenDDS::Federator::SubscriptionUpdate::participant, participantWriter_, publicationWriter_, OpenDDS::Federator::TopicUpdate::sender, OpenDDS::Federator::ParticipantUpdate::sender, OpenDDS::Federator::PublicationUpdate::sender, OpenDDS::Federator::SubscriptionUpdate::sender, subscriptionWriter_, Update::Topic, and topicWriter_.

216 {
217  //
218  // Do not propagate any destroy() messages within the FederationDomain.
219  // This domain will be managed separately.
220  //
221  if (id.domain == this->config_.federationDomain()) {
222  return;
223  }
224 
225  switch (type) {
226  case Update::Topic: {
227  if (CORBA::is_nil(this->topicWriter_.in())) {
228  // Decline to publish data until we can.
229  return;
230  }
231 
232  TopicUpdate sample = TopicUpdate();
233  sample.sender = this->id().id();
234  sample.action = DestroyEntity;
235 
236  sample.id = id.id;
237  sample.domain = id.domain;
238  sample.participant = id.participant;
239 
241  OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
242  OpenDDS::DCPS::RepoIdConverter topic_converter(sample.id);
243  ACE_DEBUG((LM_DEBUG,
244  ACE_TEXT("(%P|%t) Federator::ManagerImpl::destroy( TopicUpdate): ")
245  ACE_TEXT("repo %d - [ domain %d/ participant %C/ topic %C ]\n"),
246  this->id().id(),
247  sample.domain,
248  std::string(part_converter).c_str(),
249  std::string(topic_converter).c_str()));
250  }
251 
252  this->topicWriter_->write(sample, DDS::HANDLE_NIL);
253  }
254  break;
255 
256  case Update::Participant: {
257  if (CORBA::is_nil(this->participantWriter_.in())) {
258  // Decline to publish data until we can.
259  return;
260  }
261 
262  ParticipantUpdate sample = ParticipantUpdate();
263  sample.sender = this->id().id();
264  sample.action = DestroyEntity;
265 
266  sample.domain = id.domain;
267  sample.id = id.id;
268 
270  OpenDDS::DCPS::RepoIdConverter converter(sample.id);
271  ACE_DEBUG((LM_DEBUG,
272  ACE_TEXT("(%P|%t) Federator::ManagerImpl::destroy( ParticipantUpdate): ")
273  ACE_TEXT("repo %d - [ domain %d/ participant %C ]\n"),
274  this->id().id(),
275  sample.domain,
276  std::string(converter).c_str()));
277  }
278 
279  this->participantWriter_->write(sample, DDS::HANDLE_NIL);
280  }
281  break;
282 
283  case Update::Actor:
284 
285  // This is VERY annoying.
286  switch (actor) {
287  case Update::DataWriter: {
288  if (CORBA::is_nil(this->publicationWriter_.in())) {
289  // Decline to publish data until we can.
290  return;
291  }
292 
293  PublicationUpdate sample = PublicationUpdate();
294  sample.sender = this->id().id();
295  sample.action = DestroyEntity;
296 
297  sample.domain = id.domain;
298  sample.participant = id.participant;
299  sample.id = id.id;
300 
302  OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
303  OpenDDS::DCPS::RepoIdConverter pub_converter(sample.id);
304  ACE_DEBUG((LM_DEBUG,
305  ACE_TEXT("(%P|%t) Federator::ManagerImpl::destroy( PublicationUpdate): ")
306  ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"),
307  this->id().id(),
308  sample.domain,
309  std::string(part_converter).c_str(),
310  std::string(pub_converter).c_str()));
311  }
312 
313  this->publicationWriter_->write(sample, DDS::HANDLE_NIL);
314  }
315  break;
316 
317  case Update::DataReader: {
318  if (CORBA::is_nil(this->subscriptionWriter_.in())) {
319  // Decline to publish data until we can.
320  return;
321  }
322 
323  SubscriptionUpdate sample = SubscriptionUpdate();
324  sample.sender = this->id().id();
325  sample.action = DestroyEntity;
326 
327  sample.domain = id.domain;
328  sample.participant = id.participant;
329  sample.id = id.id;
330 
332  OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
333  OpenDDS::DCPS::RepoIdConverter sub_converter(sample.id);
334  ACE_DEBUG((LM_DEBUG,
335  ACE_TEXT("(%P|%t) Federator::ManagerImpl::destroy( SubscriptionUpdate): ")
336  ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
337  this->id().id(),
338  sample.domain,
339  std::string(part_converter).c_str(),
340  std::string(sub_converter).c_str()));
341  }
342 
343  this->subscriptionWriter_->write(sample, DDS::HANDLE_NIL);
344  }
345  break;
346  }
347 
348  break;
349  }
350 }
#define ACE_DEBUG(X)
const InstanceHandle_t HANDLE_NIL
void federationDomain(long domain)
Federation Id value.
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
const TAO_DDS_DCPSFederationId & id() const
Accessors for the federation Id value.
ParticipantUpdateDataWriter_var participantWriter_
ParticipantUpdate writer.
Config & config_
The configuration information for this manager.
SubscriptionUpdateDataWriter_var subscriptionWriter_
SubscriptionUpdate writer.
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
PublicationUpdateDataWriter_var publicationWriter_
PublicationUpdate writer.
TopicUpdateDataWriter_var topicWriter_
TopicUpdate writer.
Boolean is_nil(T x)
void id(RepoKey fedId)

◆ discover_federation()

CORBA::Boolean OpenDDS::Federator::ManagerImpl::discover_federation ( const char *  ior)
virtual

: Implement this.

Definition at line 907 of file FederatorManagerImpl.cpp.

References ACE_DEBUG, ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, and LM_DEBUG.

908 {
910  ACE_DEBUG((LM_DEBUG,
911  ACE_TEXT("(%P|%t) ManagerImpl::discover_federation( %C)\n"),
912  ior));
913  }
914 
915  ///@TODO: Implement this.
916  return false;
917 }
#define ACE_DEBUG(X)
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30

◆ federation_id()

RepoKey OpenDDS::Federator::ManagerImpl::federation_id ( )
virtual

Definition at line 870 of file FederatorManagerImpl.cpp.

References ACE_DEBUG, ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, TAO_DDS_DCPSFederationId::id(), id(), and LM_DEBUG.

871 {
873  ACE_DEBUG((LM_DEBUG,
874  ACE_TEXT("(%P|%t) ManagerImpl::federation_id()\n")));
875  }
876 
877  return this->id().id();
878 }
#define ACE_DEBUG(X)
const TAO_DDS_DCPSFederationId & id() const
Accessors for the federation Id value.
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
void id(RepoKey fedId)

◆ finalize()

void OpenDDS::Federator::ManagerImpl::finalize ( void  )

Release resources gracefully.

Definition at line 791 of file FederatorManagerImpl.cpp.

References CORBA::Exception::_tao_print_exception(), ACE_DEBUG, ACE_ERROR, ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, ACE_Event_Handler::DONT_CALL, federated_, federationParticipant_, TAO_Pseudo_Var_T< ORB >::in(), CORBA::is_nil(), joinRepo_, LM_DEBUG, LM_ERROR, multicastResponder_, orb_, ownerListener_, participantListener_, peers_, publicationListener_, ACE_Event_Handler::READ_MASK, OpenDDS::DCPS::retcode_to_string(), subscriptionListener_, TheParticipantFactory, and topicListener_.

Referenced by InfoRepo::finalize(), and InfoRepo::handle_exception().

792 {
794  ACE_DEBUG((LM_DEBUG,
795  ACE_TEXT("(%P|%t) Federator::ManagerImpl::finalize()\n")));
796  }
797 
798  ownerListener_.stop();
799  topicListener_.stop();
800  participantListener_.stop();
801  publicationListener_.stop();
802  subscriptionListener_.stop();
803  ownerListener_.join();
804  topicListener_.join();
805  participantListener_.join();
806  publicationListener_.join();
807  subscriptionListener_.join();
808 
809  if (this->federated_) {
810  try {
811  IdToManagerMap::iterator where = this->peers_.find(this->joinRepo_);
812 
813  if (where == this->peers_.end()) {
814  ACE_DEBUG((LM_DEBUG,
815  ACE_TEXT("(%P|%t) Federator::Manager::finalize: ")
816  ACE_TEXT("repository %d - all attachment to federation left.\n"),
817  this->id().id()));
818 
819  } else {
820  if (CORBA::is_nil(where->second.in())) {
821  ACE_ERROR((LM_ERROR,
822  ACE_TEXT("(%P|%t) ERROR: Federator::Manager::finalize: ")
823  ACE_TEXT("repository %d not currently attached to a federation.\n"),
824  this->id().id()));
825 
826  } else {
827  where->second->leave_federation(this->id().id());
828  this->federated_ = false;
829  }
830  }
831 
832  } catch (const CORBA::Exception& ex) {
834  ACE_TEXT("ERROR: Federator::ManagerImpl::finalize() - ")
835  ACE_TEXT("unable to leave remote federation "));
836  throw Incomplete();
837  }
838  }
839 
840  if (!CORBA::is_nil(this->orb_.in()) && (0 != this->orb_->orb_core())) {
841  this->orb_->orb_core()->reactor()->remove_handler(
842  &this->multicastResponder_,
844  }
845 
846  // Remove our local participant and contained entities.
848  const DDS::ReturnCode_t entities_error =
849  federationParticipant_->delete_contained_entities();
850  if (entities_error) {
851  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: Federator::Manager: "
852  "unable to release resources for repository %d: %C\n",
853  id().id(), DCPS::retcode_to_string(entities_error)));
854  return;
855  }
856 
857  DDS::DomainParticipantFactory_var dpf = TheParticipantFactory;
858  const DDS::ReturnCode_t part_error = dpf->delete_participant(federationParticipant_);
859  if (part_error) {
860  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: Federator::Manager: "
861  "unable to release the participant for repository %d: %C\n",
862  id().id(), DCPS::retcode_to_string(part_error)));
863  }
864  }
865 }
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
UpdateListener< SubscriptionUpdate, SubscriptionUpdateDataReader > subscriptionListener_
SubscriptionUpdate listener.
InfoRepoMulticastResponder multicastResponder_
Multicast responder.
CORBA::ORB_var orb_
The ORB in which we are activated.
IdToManagerMap peers_
The peer with which we have federated.
UpdateListener< OwnerUpdate, OwnerUpdateDataReader > ownerListener_
TopicUpdate listener.
RepoKey joinRepo_
Repository to which we joined.
UpdateListener< PublicationUpdate, PublicationUpdateDataReader > publicationListener_
PublicationUpdate listener.
UpdateListener< ParticipantUpdate, ParticipantUpdateDataReader > participantListener_
ParticipantUpdate listener.
_in_type in(void) const
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
const char * retcode_to_string(DDS::ReturnCode_t value)
Definition: DCPS_Utils.cpp:29
UpdateListener< TopicUpdate, TopicUpdateDataReader > topicListener_
TopicUpdate listener.
#define TheParticipantFactory
DDS::DomainParticipant_var federationParticipant_
local DomainParticipant
void _tao_print_exception(const char *info, FILE *f=stdout) const
Definition: Exception.cpp:82
Boolean is_nil(T x)

◆ id()

OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL ACE_INLINE const TAO_DDS_DCPSFederationId & OpenDDS::Federator::ManagerImpl::id ( void  ) const

Accessors for the federation Id value.

Definition at line 12 of file FederatorManagerImpl.inl.

References ACE_INLINE, config_, and OpenDDS::Federator::Config::federationId().

Referenced by create(), destroy(), federation_id(), InfoRepo::init(), leave_federation(), pushState(), and update().

13 {
14  return this->config_.federationId();
15 }
Config & config_
The configuration information for this manager.
TAO_DDS_DCPSFederationId & federationId()
Federation Id value.

◆ info() [1/2]

ACE_INLINE TAO_DDS_DCPSInfo_i *& OpenDDS::Federator::ManagerImpl::info ( )

Accessors for the DCPSInfo reference.

Definition at line 19 of file FederatorManagerImpl.inl.

References ACE_INLINE, and info_.

Referenced by InfoRepo::init().

20 {
21  return this->info_;
22 }
TAO_DDS_DCPSInfo_i * info_
The Info object reference to update.

◆ info() [2/2]

ACE_INLINE TAO_DDS_DCPSInfo_i * OpenDDS::Federator::ManagerImpl::info ( ) const

Definition at line 26 of file FederatorManagerImpl.inl.

References ACE_INLINE, and info_.

27 {
28  return this->info_;
29 }
TAO_DDS_DCPSInfo_i * info_
The Info object reference to update.

◆ initialize()

void OpenDDS::Federator::ManagerImpl::initialize ( void  )

Establish the update publications and subscriptions.

Definition at line 81 of file FederatorManagerImpl.cpp.

References ACE_DEBUG, ACE_DEFAULT_MULTICAST_ADDR, ACE_DEFAULT_MULTICASTV6_ADDR, ACE_ERROR, ACE_TEXT(), ACE_OS::atoi(), OpenDDS::DCPS::TransportRegistry::bind_config(), config_, OpenDDS::DCPS::TransportRegistry::create_config(), OpenDDS::DCPS::TransportRegistry::create_inst(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::TransportRegistry::DEFAULT_INST_PREFIX, OpenDDS::DCPS::DEFAULT_STATUS_MASK, DDS::HistoryQosPolicy::depth, OpenDDS::Federator::Defaults::DiscoveryRequestPort, DDS::DataWriterQos::durability, DDS::DataReaderQos::durability, OpenDDS::Federator::Config::federationDomain(), federationParticipant_, OpenDDS::DCPS::DataWriterImpl::get_guid(), OpenDDS::DCPS::DataReaderImpl::get_guid(), ACE_OS::getenv(), DDS::DataWriterQos::history, DDS::DataReaderQos::history, OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::Federator::InfoRepoMulticastResponder::init(), OpenDDS::DCPS::TransportRegistry::instance(), OpenDDS::DCPS::TransportConfig::instances_, CORBA::is_nil(), DDS::KEEP_LAST_HISTORY_QOS, DDS::DurabilityQosPolicy::kind, DDS::ReliabilityQosPolicy::kind, DDS::HistoryQosPolicy::kind, LM_DEBUG, LM_ERROR, LM_WARNING, DDS::ReliabilityQosPolicy::max_blocking_time, multicastEnabled_, multicastResponder_, DDS::Duration_t::nanosec, orb_, ownerListener_, OpenDDS::Federator::OWNERUPDATETOPICNAME, OpenDDS::Federator::OWNERUPDATETYPENAME, ownerWriter_, PARTICIPANT_QOS_DEFAULT, participantListener_, OpenDDS::Federator::PARTICIPANTUPDATETOPICNAME, OpenDDS::Federator::PARTICIPANTUPDATETYPENAME, participantWriter_, publicationListener_, OpenDDS::Federator::PUBLICATIONUPDATETOPICNAME, OpenDDS::Federator::PUBLICATIONUPDATETYPENAME, publicationWriter_, PUBLISHER_QOS_DEFAULT, ACE_Event_Handler::READ_MASK, ACE_Reactor::register_handler(), DDS::DataWriterQos::reliability, DDS::DataReaderQos::reliability, DDS::RELIABLE_RELIABILITY_QOS, DDS::RETCODE_OK, DDS::Duration_t::sec, SUBSCRIBER_QOS_DEFAULT, subscriptionListener_, OpenDDS::Federator::SUBSCRIPTIONUPDATETOPICNAME, OpenDDS::Federator::SUBSCRIPTIONUPDATETYPENAME, subscriptionWriter_, TheParticipantFactory, TOPIC_QOS_DEFAULT, topicListener_, OpenDDS::Federator::TOPICUPDATETOPICNAME, OpenDDS::Federator::TOPICUPDATETYPENAME, topicWriter_, and DDS::TRANSIENT_LOCAL_DURABILITY_QOS.

Referenced by join_federation().

82 {
84  ACE_DEBUG((LM_DEBUG,
85  ACE_TEXT("(%P|%t) Federation::ManagerImpl::initialize()\n")));
86  }
87 
88  // Let the listeners know which repository we are to filter samples at
89  // the earliest opportunity.
90  this->ownerListener_.federationId(this->id());
91  this->topicListener_.federationId(this->id());
92  this->participantListener_.federationId(this->id());
93  this->publicationListener_.federationId(this->id());
94  this->subscriptionListener_.federationId(this->id());
95 
96  // Add participant for Federation domain
97  DDS::DomainParticipantFactory_var dpf = TheParticipantFactory;
98  federationParticipant_ = dpf->create_participant(
99  this->config_.federationDomain(),
101  DDS::DomainParticipantListener::_nil(),
103  if (!federationParticipant_) {
104  ACE_ERROR((LM_ERROR,
105  ACE_TEXT("(%P|%t) ERROR: create_participant failed for ")
106  ACE_TEXT("repository %d in federation domain %d.\n"),
107  this->id().id(),
108  this->config_.federationDomain()));
109  throw Incomplete();
110  }
111 
112  //
113  // Add type support for update topics
114  //
115 
116  OwnerUpdateTypeSupportImpl::_var_type ownerUpdate = new OwnerUpdateTypeSupportImpl();
117 
118  if (DDS::RETCODE_OK != ownerUpdate->register_type(
119  this->federationParticipant_.in(),
121  ACE_ERROR((LM_ERROR,
122  ACE_TEXT("(%P|%t) ERROR: Unable to install ")
123  ACE_TEXT("OwnerUpdate type support for repository %d.\n"),
124  this->id().id()));
125  throw Incomplete();
126  }
127 
128  ParticipantUpdateTypeSupportImpl::_var_type participantUpdate = new ParticipantUpdateTypeSupportImpl();
129 
130  if (DDS::RETCODE_OK != participantUpdate->register_type(
131  this->federationParticipant_.in(),
133  ACE_ERROR((LM_ERROR,
134  ACE_TEXT("(%P|%t) ERROR: Unable to install ")
135  ACE_TEXT("ParticipantUpdate type support for repository %d.\n"),
136  this->id().id()));
137  throw Incomplete();
138  }
139 
140  TopicUpdateTypeSupportImpl::_var_type topicUpdate = new TopicUpdateTypeSupportImpl();
141 
142  if (DDS::RETCODE_OK != topicUpdate->register_type(
143  this->federationParticipant_.in(),
145  ACE_ERROR((LM_ERROR,
146  ACE_TEXT("(%P|%t) ERROR: Unable to install ")
147  ACE_TEXT("TopicUpdate type support for repository %d.\n"),
148  this->id().id()));
149  throw Incomplete();
150  }
151 
152  PublicationUpdateTypeSupportImpl::_var_type publicationUpdate = new PublicationUpdateTypeSupportImpl();
153 
154  if (DDS::RETCODE_OK != publicationUpdate->register_type(
155  this->federationParticipant_.in(),
157  ACE_ERROR((LM_ERROR,
158  ACE_TEXT("(%P|%t) ERROR: Unable to install ")
159  ACE_TEXT("PublicationUpdate type support for repository %d.\n"),
160  this->id().id()));
161  throw Incomplete();
162  }
163 
164  SubscriptionUpdateTypeSupportImpl::_var_type subscriptionUpdate = new SubscriptionUpdateTypeSupportImpl();
165 
166  if (DDS::RETCODE_OK != subscriptionUpdate->register_type(
167  this->federationParticipant_.in(),
169  ACE_ERROR((LM_ERROR,
170  ACE_TEXT("(%P|%t) ERROR: Unable to install ")
171  ACE_TEXT("SubscriptionUpdate type support for repository %d.\n"),
172  this->id().id()));
173  throw Incomplete();
174  }
175 
176  //
177  // Create a transport config for use with federation entities.
178  //
179  std::string config_name =
181  + std::string("FederationBITTransportConfig");
184 
186  + std::string("FederationBITTCPTransportInst");
189  "tcp");
190  config->instances_.push_back(inst);
191 
192  //
193  // Create the subscriber for the update topics.
194  //
195 
196  DDS::Subscriber_var subscriber
197  = this->federationParticipant_->create_subscriber(
199  DDS::SubscriberListener::_nil(),
201 
202  if (CORBA::is_nil(subscriber.in())) {
203  ACE_ERROR((LM_ERROR,
204  ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
205  ACE_TEXT("failed to create subscriber for repository %d\n"),
206  this->id().id()));
207  throw Incomplete();
208 
209  } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
210  ACE_DEBUG((LM_DEBUG,
211  ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
212  ACE_TEXT("created federation subscriber for repository %d\n"),
213  this->id().id()));
214 
215  }
216 
217  // Attach the transport to it.
218 
219  try {
221  subscriber.in());
222  } catch (const OpenDDS::DCPS::Transport::Exception&) {
223  ACE_ERROR((LM_ERROR,
224  ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
225  ACE_TEXT("failed to bind transport config to federation subscriber.\n")));
226  throw Incomplete();
227  }
228 
229  //
230  // Create the publisher for the update topics.
231  //
232 
233  DDS::Publisher_var publisher
234  = this->federationParticipant_->create_publisher(
236  DDS::PublisherListener::_nil(),
238 
239  if (CORBA::is_nil(publisher.in())) {
240  ACE_ERROR((LM_ERROR,
241  ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
242  ACE_TEXT("failed to create publisher for repository %d\n"),
243  this->id().id()));
244  throw Incomplete();
245 
246  } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
247  ACE_DEBUG((LM_DEBUG,
248  ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
249  ACE_TEXT("created federation publisher for repository %d\n"),
250  this->id().id()));
251 
252  }
253 
254  // Attach the transport to it.
255 
256  try {
258  publisher.in());
259  } catch (const OpenDDS::DCPS::Transport::Exception&) {
260  ACE_ERROR((LM_ERROR,
261  ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
262  ACE_TEXT("failed to bind transport config to federation publisher.\n")));
263  throw Incomplete();
264  }
265 
266  //
267  // Some useful items for adding the subscriptions.
268  //
269  DDS::Topic_var topic;
270  DDS::TopicDescription_var description;
271  DDS::DataReader_var dataReader;
272  DDS::DataWriter_var dataWriter;
273 
274  DDS::DataReaderQos readerQos;
275  subscriber->get_default_datareader_qos(readerQos);
278  readerQos.history.depth = 50;
280  readerQos.reliability.max_blocking_time.sec = 0;
281  readerQos.reliability.max_blocking_time.nanosec = 0;
282 
283  DDS::DataWriterQos writerQos;
284  publisher->get_default_datawriter_qos(writerQos);
287  writerQos.history.depth = 50;
289  writerQos.reliability.max_blocking_time.sec = 0;
290  writerQos.reliability.max_blocking_time.nanosec = 0;
291 
292  //
293  // Add update subscriptions
294  //
295  // NOTE: Its ok to lose the references to the objects here since they
296  // are not needed after this point. The only thing we will do
297  // with them is to destroy them, and that will be done via a
298  // cascade delete from the participant. The listeners will
299  // survive and can be used within other participants as well,
300  // since the only state they retain is the manager, which is the
301  // same for all.
302  //
303 
304  topic = this->federationParticipant_->create_topic(
308  DDS::TopicListener::_nil(),
310 
311  dataWriter = publisher->create_datawriter(
312  topic.in(),
313  writerQos,
314  DDS::DataWriterListener::_nil(),
316 
317  if (CORBA::is_nil(dataWriter.in())) {
318  ACE_ERROR((LM_ERROR,
319  ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
320  ACE_TEXT("failed to create OwnerUpdate writer for repository %d\n"),
321  this->id().id()));
322  throw Incomplete();
323  }
324 
325  this->ownerWriter_ = OwnerUpdateDataWriter::_narrow(dataWriter.in());
326 
327  if (::CORBA::is_nil(this->ownerWriter_.in())) {
328  ACE_ERROR((LM_ERROR,
329  ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
330  ACE_TEXT("failed to extract typed OwnerUpdate writer.\n")));
331  throw Incomplete();
332 
333  } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
335  = dynamic_cast<OpenDDS::DCPS::DataWriterImpl*>(dataWriter.in());
336 
337  if (0 == servant) {
338  ACE_DEBUG((LM_WARNING,
339  ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
340  ACE_TEXT("unable to extract typed OwnerUpdate writer.\n")));
341 
342  } else {
343  OpenDDS::DCPS::RepoIdConverter converter(servant->get_guid());
344  ACE_DEBUG((LM_DEBUG,
345  ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
346  ACE_TEXT("created federation OwnerUpdate writer %C for repository %d\n"),
347  std::string(converter).c_str(),
348  this->id().id()));
349  }
350  }
351 
352  description = this->federationParticipant_->lookup_topicdescription(OWNERUPDATETOPICNAME);
353  dataReader = subscriber->create_datareader(
354  description.in(),
355  readerQos,
356  &this->ownerListener_,
358 
359  if (CORBA::is_nil(dataReader.in())) {
360  ACE_ERROR((LM_ERROR,
361  ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
362  ACE_TEXT("failed to create OwnerUpdate reader for repository %d\n"),
363  this->id().id()));
364  throw Incomplete();
365 
366  } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
368  = dynamic_cast<OpenDDS::DCPS::DataReaderImpl*>(dataReader.in());
369 
370  if (0 == servant) {
371  ACE_DEBUG((LM_WARNING,
372  ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
373  ACE_TEXT("unable to extract typed OwnerUpdate reader.\n")));
374 
375  } else {
376  OpenDDS::DCPS::RepoIdConverter converter(servant->get_guid());
377  ACE_DEBUG((LM_DEBUG,
378  ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
379  ACE_TEXT("created federation OwnerUpdate reader %C for repository %d\n"),
380  std::string(converter).c_str(),
381  this->id().id()));
382  }
383  }
384 
385  topic = this->federationParticipant_->create_topic(
389  DDS::TopicListener::_nil(),
391  dataWriter = publisher->create_datawriter(
392  topic.in(),
393  writerQos,
394  DDS::DataWriterListener::_nil(),
396 
397  if (CORBA::is_nil(dataWriter.in())) {
398  ACE_ERROR((LM_ERROR,
399  ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
400  ACE_TEXT("failed to create TopicUpdate writer for repository %d\n"),
401  this->id().id()));
402  throw Incomplete();
403  }
404 
405  this->topicWriter_
406  = TopicUpdateDataWriter::_narrow(dataWriter.in());
407 
408  if (::CORBA::is_nil(this->topicWriter_.in())) {
409  ACE_ERROR((LM_ERROR,
410  ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
411  ACE_TEXT("failed to extract typed TopicUpdate writer.\n")));
412  throw Incomplete();
413 
414  } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
416  = dynamic_cast<OpenDDS::DCPS::DataWriterImpl*>(dataWriter.in());
417 
418  if (0 == servant) {
419  ACE_DEBUG((LM_WARNING,
420  ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
421  ACE_TEXT("unable to extract typed TopicUpdate writer.\n")));
422 
423  } else {
424  OpenDDS::DCPS::RepoIdConverter converter(servant->get_guid());
425  ACE_DEBUG((LM_DEBUG,
426  ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
427  ACE_TEXT("created federation TopicUpdate writer %C for repository %d\n"),
428  std::string(converter).c_str(),
429  this->id().id()));
430  }
431  }
432 
433  description = this->federationParticipant_->lookup_topicdescription(TOPICUPDATETOPICNAME);
434  dataReader = subscriber->create_datareader(
435  description.in(),
436  readerQos,
437  &this->topicListener_,
439 
440  if (CORBA::is_nil(dataReader.in())) {
441  ACE_ERROR((LM_ERROR,
442  ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
443  ACE_TEXT("failed to create TopicUpdate reader for repository %d\n"),
444  this->id().id()));
445  throw Incomplete();
446 
447  } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
449  = dynamic_cast<OpenDDS::DCPS::DataReaderImpl*>(dataReader.in());
450 
451  if (0 == servant) {
452  ACE_DEBUG((LM_WARNING,
453  ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
454  ACE_TEXT("unable to extract typed TopicUpdate reader.\n")));
455 
456  } else {
457  OpenDDS::DCPS::RepoIdConverter converter(servant->get_guid());
458  ACE_DEBUG((LM_DEBUG,
459  ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
460  ACE_TEXT("created federation TopicUpdate reader %C for repository %d\n"),
461  std::string(converter).c_str(),
462  this->id().id()));
463  }
464  }
465 
466  topic = this->federationParticipant_->create_topic(
470  DDS::TopicListener::_nil(),
472  dataWriter = publisher->create_datawriter(
473  topic.in(),
474  writerQos,
475  DDS::DataWriterListener::_nil(),
477 
478  if (CORBA::is_nil(dataWriter.in())) {
479  ACE_ERROR((LM_ERROR,
480  ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
481  ACE_TEXT("failed to create ParticipantUpdate writer for repository %d\n"),
482  this->id().id()));
483  throw Incomplete();
484  }
485 
486  this->participantWriter_
487  = ParticipantUpdateDataWriter::_narrow(dataWriter.in());
488 
489  if (::CORBA::is_nil(this->participantWriter_.in())) {
490  ACE_ERROR((LM_ERROR,
491  ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
492  ACE_TEXT("failed to extract typed ParticipantUpdate writer.\n")));
493  throw Incomplete();
494 
495  } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
497  = dynamic_cast<OpenDDS::DCPS::DataWriterImpl*>(dataWriter.in());
498 
499  if (0 == servant) {
500  ACE_DEBUG((LM_WARNING,
501  ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
502  ACE_TEXT("unable to extract typed ParticipantUpdate writer.\n")));
503 
504  } else {
505  OpenDDS::DCPS::RepoIdConverter converter(servant->get_guid());
506  ACE_DEBUG((LM_DEBUG,
507  ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
508  ACE_TEXT("created federation ParticipantUpdate writer %C for repository %d\n"),
509  std::string(converter).c_str(),
510  this->id().id()));
511  }
512  }
513 
514  description = this->federationParticipant_->lookup_topicdescription(PARTICIPANTUPDATETOPICNAME);
515  dataReader = subscriber->create_datareader(
516  description.in(),
517  readerQos,
518  &this->participantListener_,
520 
521  if (CORBA::is_nil(dataReader.in())) {
522  ACE_ERROR((LM_ERROR,
523  ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
524  ACE_TEXT("failed to create ParticipantUpdate reader for repository %d\n"),
525  this->id().id()));
526  throw Incomplete();
527 
528  } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
530  = dynamic_cast<OpenDDS::DCPS::DataReaderImpl*>(dataReader.in());
531 
532  if (0 == servant) {
533  ACE_DEBUG((LM_WARNING,
534  ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
535  ACE_TEXT("unable to extract typed ParticipantUpdate reader.\n")));
536 
537  } else {
538  OpenDDS::DCPS::RepoIdConverter converter(servant->get_guid());
539  ACE_DEBUG((LM_DEBUG,
540  ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
541  ACE_TEXT("created federation ParticipantUpdate reader %C for repository %d\n"),
542  std::string(converter).c_str(),
543  this->id().id()));
544  }
545  }
546 
547  topic = this->federationParticipant_->create_topic(
551  DDS::TopicListener::_nil(),
553  dataWriter = publisher->create_datawriter(
554  topic.in(),
555  writerQos,
556  DDS::DataWriterListener::_nil(),
558 
559  if (CORBA::is_nil(dataWriter.in())) {
560  ACE_ERROR((LM_ERROR,
561  ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
562  ACE_TEXT("failed to create PublicationUpdate writer for repository %d\n"),
563  this->id().id()));
564  throw Incomplete();
565  }
566 
567  this->publicationWriter_
568  = PublicationUpdateDataWriter::_narrow(dataWriter.in());
569 
570  if (::CORBA::is_nil(this->publicationWriter_.in())) {
571  ACE_ERROR((LM_ERROR,
572  ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
573  ACE_TEXT("failed to extract typed PublicationUpdate writer.\n")));
574  throw Incomplete();
575 
576  } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
578  = dynamic_cast<OpenDDS::DCPS::DataWriterImpl*>(dataWriter.in());
579 
580  if (0 == servant) {
581  ACE_DEBUG((LM_WARNING,
582  ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
583  ACE_TEXT("unable to extract typed PublicationUpdate writer.\n")));
584 
585  } else {
586  OpenDDS::DCPS::RepoIdConverter converter(servant->get_guid());
587  ACE_DEBUG((LM_DEBUG,
588  ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
589  ACE_TEXT("created federation PublicationUpdate writer %C for repository %d\n"),
590  std::string(converter).c_str(),
591  this->id().id()));
592  }
593  }
594 
595  description = this->federationParticipant_->lookup_topicdescription(PUBLICATIONUPDATETOPICNAME);
596  dataReader = subscriber->create_datareader(
597  description.in(),
598  readerQos,
599  &this->publicationListener_,
601 
602  if (CORBA::is_nil(dataReader.in())) {
603  ACE_ERROR((LM_ERROR,
604  ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
605  ACE_TEXT("failed to create PublicationUpdate reader for repository %d\n"),
606  this->id().id()));
607  throw Incomplete();
608 
609  } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
611  = dynamic_cast<OpenDDS::DCPS::DataReaderImpl*>(dataReader.in());
612 
613  if (0 == servant) {
614  ACE_DEBUG((LM_WARNING,
615  ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
616  ACE_TEXT("unable to extract typed PublicationUpdate reader.\n")));
617 
618  } else {
619  OpenDDS::DCPS::RepoIdConverter converter(servant->get_guid());
620  ACE_DEBUG((LM_DEBUG,
621  ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
622  ACE_TEXT("created federation PublicationUpdate reader %C for repository %d\n"),
623  std::string(converter).c_str(),
624  this->id().id()));
625  }
626  }
627 
628  topic = this->federationParticipant_->create_topic(
632  DDS::TopicListener::_nil(),
634  dataWriter = publisher->create_datawriter(
635  topic.in(),
636  writerQos,
637  DDS::DataWriterListener::_nil(),
639 
640  if (CORBA::is_nil(dataWriter.in())) {
641  ACE_ERROR((LM_ERROR,
642  ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
643  ACE_TEXT("failed to create SubscriptionUpdate writer for repository %d\n"),
644  this->id().id()));
645  throw Incomplete();
646  }
647 
648  this->subscriptionWriter_
649  = SubscriptionUpdateDataWriter::_narrow(dataWriter.in());
650 
651  if (::CORBA::is_nil(this->subscriptionWriter_.in())) {
652  ACE_ERROR((LM_ERROR,
653  ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
654  ACE_TEXT("failed to extract typed SubscriptionUpdate writer.\n")));
655  throw Incomplete();
656 
657  } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
659  = dynamic_cast<OpenDDS::DCPS::DataWriterImpl*>(dataWriter.in());
660 
661  if (0 == servant) {
662  ACE_DEBUG((LM_WARNING,
663  ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
664  ACE_TEXT("unable to extract typed SubscriptionUpdate writer.\n")));
665 
666  } else {
667  OpenDDS::DCPS::RepoIdConverter converter(servant->get_guid());
668  ACE_DEBUG((LM_DEBUG,
669  ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
670  ACE_TEXT("created federation SubscriptionUpdate writer %C for repository %d\n"),
671  std::string(converter).c_str(),
672  this->id().id()));
673  }
674  }
675 
676  description = this->federationParticipant_->lookup_topicdescription(SUBSCRIPTIONUPDATETOPICNAME);
677  dataReader = subscriber->create_datareader(
678  description.in(),
679  readerQos,
680  &this->subscriptionListener_,
682 
683  if (CORBA::is_nil(dataReader.in())) {
684  ACE_ERROR((LM_ERROR,
685  ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
686  ACE_TEXT("failed to create SubscriptionUpdate reader for repository %d\n"),
687  this->id().id()));
688  throw Incomplete();
689 
690  } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
692  = dynamic_cast<OpenDDS::DCPS::DataReaderImpl*>(dataReader.in());
693 
694  if (0 == servant) {
695  ACE_DEBUG((LM_WARNING,
696  ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
697  ACE_TEXT("unable to extract typed SubscriptionUpdate reader.\n")));
698 
699  } else {
700  OpenDDS::DCPS::RepoIdConverter converter(servant->get_guid());
701  ACE_DEBUG((LM_DEBUG,
702  ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
703  ACE_TEXT("created federation SubscriptionUpdate reader %C for repository %d\n"),
704  std::string(converter).c_str(),
705  this->id().id()));
706  }
707  }
708 
709  // JSP
710 #if defined (ACE_HAS_IP_MULTICAST)
711 
712  if (this->multicastEnabled_) {
713  //
714  // Install ior multicast handler.
715  //
716  // Get reactor instance from TAO.
717  ACE_Reactor* reactor = this->orb_->orb_core()->reactor();
718 
719  // See if the -ORBMulticastDiscoveryEndpoint option was specified.
720  ACE_CString mde(this->orb_->orb_core()->orb_params()->mcast_discovery_endpoint());
721 
722  // First, see if the user has given us a multicast port number
723  // on the command-line;
724  u_short port = 0;
725 
726  // Check environment var. for multicast port.
727  const char *port_number = ACE_OS::getenv("OpenDDSFederationPort");
728 
729  if (port_number != 0) {
730  port = static_cast<u_short>(ACE_OS::atoi(port_number));
731  }
732 
733  // Port wasn't specified on the command-line -
734  // use the default.
735  if (port == 0)
737 
738  // Initialize the handler
739  if (mde.length() != 0) {
740  if (this->multicastResponder_.init(
741  this->orb_.in(),
742  mde.c_str()) == -1) {
743  ACE_ERROR((LM_ERROR,
744  ACE_TEXT("(%P|%t) ERROR: Unable to initialize ")
745  ACE_TEXT("the multicast responder for repository %d.\n"),
746  this->id().id()));
747  throw Incomplete();
748  }
749 
750  } else {
751  if (this->multicastResponder_.init(
752  this->orb_.in(),
753  port,
754 #if defined (ACE_HAS_IPV6)
756 #else
758 #endif /* ACE_HAS_IPV6 */
759  )) {
760  ACE_ERROR((LM_ERROR,
761  ACE_TEXT("(%P|%t) ERROR: Unable to initialize ")
762  ACE_TEXT("the multicast responder for repository %d.\n"),
763  this->id().id()));
764  throw Incomplete();
765  }
766  }
767 
768  // Register event handler for the ior multicast.
769  if (reactor->register_handler(&this->multicastResponder_,
771  ACE_ERROR((LM_ERROR,
772  ACE_TEXT("(%P|%t) ERROR: Unable to register event handler ")
773  ACE_TEXT("for repository %d.\n"),
774  this->id().id()));
775  throw Incomplete();
776  }
777 
779  ACE_DEBUG((LM_DEBUG,
780  ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
781  ACE_TEXT("multicast server setup is complete.\n")));
782  }
783  }
784 
785 #else
786  ACE_UNUSED_ARG(this->multicastEnabled_);
787 #endif /* ACE_HAS_IP_MULTICAST */
788 }
bool multicastEnabled_
Is multicast enabled?
#define PARTICIPANT_QOS_DEFAULT
#define ACE_DEBUG(X)
const string PUBLICATIONUPDATETOPICNAME
Definition: Federator.idl:121
HistoryQosPolicy history
#define ACE_ERROR(X)
const string TOPICUPDATETOPICNAME
Definition: Federator.idl:76
const string OWNERUPDATETYPENAME
Definition: Federator.idl:56
UpdateListener< SubscriptionUpdate, SubscriptionUpdateDataReader > subscriptionListener_
SubscriptionUpdate listener.
ReliabilityQosPolicy reliability
void federationDomain(long domain)
Federation Id value.
InfoRepoMulticastResponder multicastResponder_
Multicast responder.
DurabilityQosPolicy durability
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
HistoryQosPolicyKind kind
CORBA::ORB_var orb_
The ORB in which we are activated.
ParticipantUpdateDataWriter_var participantWriter_
ParticipantUpdate writer.
Config & config_
The configuration information for this manager.
SubscriptionUpdateDataWriter_var subscriptionWriter_
SubscriptionUpdate writer.
char * getenv(const char *symbol)
const string SUBSCRIPTIONUPDATETOPICNAME
Definition: Federator.idl:152
OwnerUpdateDataWriter_var ownerWriter_
TopicUpdate writer.
void bind_config(const OPENDDS_STRING &name, DDS::Entity_ptr entity)
const DDS::StatusMask DEFAULT_STATUS_MASK
int register_handler(ACE_Event_Handler *event_handler, ACE_Reactor_Mask mask)
Implements the OpenDDS::DCPS::DataWriterRemote interfaces and DDS::DataWriter interfaces.
const string SUBSCRIPTIONUPDATETYPENAME
Definition: Federator.idl:153
ReliabilityQosPolicyKind kind
DurabilityQosPolicyKind kind
DurabilityQosPolicy durability
UpdateListener< OwnerUpdate, OwnerUpdateDataReader > ownerListener_
TopicUpdate listener.
unsigned long nanosec
Definition: DdsDcpsCore.idl:69
Implements the DDS::DataReader interface.
const string PUBLICATIONUPDATETYPENAME
Definition: Federator.idl:122
UpdateListener< PublicationUpdate, PublicationUpdateDataReader > publicationListener_
PublicationUpdate listener.
UpdateListener< ParticipantUpdate, ParticipantUpdateDataReader > participantListener_
ParticipantUpdate listener.
#define PUBLISHER_QOS_DEFAULT
TransportInst_rch create_inst(const OPENDDS_STRING &name, const OPENDDS_STRING &transport_type)
TransportConfig_rch create_config(const OPENDDS_STRING &name)
int init(CORBA::ORB_ptr orb, u_short port, const char *mcast_addr)
Initialization method.
ACE_TEXT("TCP_Factory")
int atoi(const char *s)
HistoryQosPolicy history
#define SUBSCRIBER_QOS_DEFAULT
#define ACE_DEFAULT_MULTICASTV6_ADDR
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ReliabilityQosPolicy reliability
const string PARTICIPANTUPDATETOPICNAME
Definition: Federator.idl:95
UpdateListener< TopicUpdate, TopicUpdateDataReader > topicListener_
TopicUpdate listener.
static const char DEFAULT_INST_PREFIX[]
static TransportRegistry * instance()
Return a singleton instance of this class.
const ReturnCode_t RETCODE_OK
PublicationUpdateDataWriter_var publicationWriter_
PublicationUpdate writer.
#define TheParticipantFactory
DDS::DomainParticipant_var federationParticipant_
local DomainParticipant
const string OWNERUPDATETOPICNAME
Definition: Federator.idl:55
#define ACE_DEFAULT_MULTICAST_ADDR
TopicUpdateDataWriter_var topicWriter_
TopicUpdate writer.
Boolean is_nil(T x)
const string PARTICIPANTUPDATETYPENAME
Definition: Federator.idl:96
#define TOPIC_QOS_DEFAULT
const string TOPICUPDATETYPENAME
Definition: Federator.idl:77

◆ initializeOwner()

void OpenDDS::Federator::ManagerImpl::initializeOwner ( const OpenDDS::Federator::OwnerUpdate data)
virtual

Definition at line 1132 of file FederatorManagerImpl.cpp.

References processCreate().

1134 {
1135  this->processCreate(&data, 0);
1136 }
void processCreate(const OwnerUpdate *sample, const DDS::SampleInfo *info)
Null implementation for OwnerUpdate samples.

◆ initializeParticipant()

void OpenDDS::Federator::ManagerImpl::initializeParticipant ( const OpenDDS::Federator::ParticipantUpdate data)
virtual

Definition at line 1146 of file FederatorManagerImpl.cpp.

References processCreate().

1148 {
1149  this->processCreate(&data, 0);
1150 }
void processCreate(const OwnerUpdate *sample, const DDS::SampleInfo *info)
Null implementation for OwnerUpdate samples.

◆ initializePublication()

void OpenDDS::Federator::ManagerImpl::initializePublication ( const OpenDDS::Federator::PublicationUpdate data)
virtual

Definition at line 1153 of file FederatorManagerImpl.cpp.

References processCreate().

1155 {
1156  this->processCreate(&data, 0);
1157 }
void processCreate(const OwnerUpdate *sample, const DDS::SampleInfo *info)
Null implementation for OwnerUpdate samples.

◆ initializeSubscription()

void OpenDDS::Federator::ManagerImpl::initializeSubscription ( const OpenDDS::Federator::SubscriptionUpdate data)
virtual

Definition at line 1160 of file FederatorManagerImpl.cpp.

References OPENDDS_END_VERSIONED_NAMESPACE_DECL, and processCreate().

1162 {
1163  this->processCreate(&data, 0);
1164 }
void processCreate(const OwnerUpdate *sample, const DDS::SampleInfo *info)
Null implementation for OwnerUpdate samples.

◆ initializeTopic()

void OpenDDS::Federator::ManagerImpl::initializeTopic ( const OpenDDS::Federator::TopicUpdate data)
virtual

Definition at line 1139 of file FederatorManagerImpl.cpp.

References processCreate().

1141 {
1142  this->processCreate(&data, 0);
1143 }
void processCreate(const OwnerUpdate *sample, const DDS::SampleInfo *info)
Null implementation for OwnerUpdate samples.

◆ join_federation()

Manager_ptr OpenDDS::Federator::ManagerImpl::join_federation ( Manager_ptr  peer,
FederationDomain  federation 
)
virtual

Definition at line 920 of file FederatorManagerImpl.cpp.

References CORBA::Exception::_tao_print_exception(), ACE_DEBUG, ACE_GUARD_RETURN, ACE_SYNCH_MUTEX, ACE_TEXT(), config_, OpenDDS::DCPS::DCPS_debug_level, federated_, OpenDDS::Federator::Config::federationDomain(), TAO::String_var< charT >::in(), initialize(), CORBA::is_nil(), joiner_, joining_, joinRepo_, LM_DEBUG, lock_, OpenDDS::Federator::NIL_REPOSITORY, orb(), participantWriter_, peers_, pushState(), ACE_Condition< MUTEX >::signal(), TheServiceParticipant, and ACE_Condition< MUTEX >::wait().

924 {
926  ACE_DEBUG((LM_DEBUG,
927  ACE_TEXT("(%P|%t) ManagerImpl::join_federation( peer, %d)\n"),
928  federation));
929  }
930 
931  RepoKey remote = NIL_REPOSITORY;
932 
933  try {
934  // Obtain the remote repository federator Id value.
935  remote = peer->federation_id();
936 
938  ACE_DEBUG((LM_DEBUG,
939  ACE_TEXT("(%P|%t) Federator::ManagerImpl::join_federation() - ")
940  ACE_TEXT("repo id %d entered from repository with id %d.\n"),
941  this->id().id(),
942  remote));
943  }
944 
945  } catch (const CORBA::Exception& ex) {
947  ACE_TEXT("ERROR: Federator::ManagerImpl::join_federation() - ")
948  ACE_TEXT("unable to obtain remote federation Id value: "));
949  throw Incomplete();
950  }
951 
952  // If we are recursing, then we are done.
953  if (this->joiner_ == remote) {
955  ACE_DEBUG((LM_DEBUG,
956  ACE_TEXT("(%P|%t) Federator::ManagerImpl::join_federation() - ")
957  ACE_TEXT("repo id %d leaving after reentry from repository with id %d.\n"),
958  this->id().id(),
959  remote));
960  }
961 
962  return this->_this();
963 
964  } else {
965  // Block while any different repository is joining.
966  ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->lock_, 0);
967 
968  while (this->joiner_ != NIL_REPOSITORY) {
969  // This releases the lock while we block.
970  this->joining_.wait();
971 
972  // We are now recursing - curses!
973  if (this->joiner_ == remote) {
974  return this->_this();
975  }
976  }
977 
978  // Note that we are joining the remote repository now.
979  this->joiner_ = remote;
980  }
981 
982  //
983  // We only reach this point if:
984  // 1) No other repository is processing past this point;
985  // 2) We are not recursing.
986  //
987 
988  // Check if we already have Federation repository.
989  // Check if we are already federated.
990  if (this->federated_ == false) {
991  // Go ahead and add the joining repository as our Federation
992  // repository.
993  try {
994  // Mark this repository as the point to which we are joined to
995  // the federation.
996  this->joinRepo_ = remote;
997 
998  // Obtain a reference to the remote repository.
999  OpenDDS::DCPS::DCPSInfo_var remoteRepo = peer->repository();
1000 
1001  CORBA::ORB_var orb = remoteRepo->_get_orb();
1002  CORBA::String_var remoteRepoIor = orb->object_to_string(remoteRepo.in());
1004  ACE_DEBUG((LM_DEBUG,
1005  ACE_TEXT("(%P|%t) FederatorManagerImpl::join_federation() - ")
1006  ACE_TEXT("id %d obtained reference to id %d:\n")
1007  ACE_TEXT("\t%C\n"),
1008  this->id().id(),
1009  remote,
1010  remoteRepoIor.in()));
1011  }
1012 
1013  // Add remote repository to Service_Participant in the Federation domain
1014  std::ostringstream oss;
1015  oss << remote;
1016  std::string key_string = oss.str();
1017  TheServiceParticipant->set_repo_ior(remoteRepoIor.in(), key_string);
1018  TheServiceParticipant->set_repo_domain(this->config_.federationDomain(), key_string);
1019 
1020  } catch (const CORBA::Exception& ex) {
1022  "ERROR: Federator::ManagerImpl::join_federation() - Unable to join with remote: ");
1023  throw Incomplete();
1024  }
1025  }
1026 
1027  // Symmetrical joining behavior.
1028  try {
1029  Manager_var self = this->_this();
1030  Manager_var remoteManager
1031  = peer->join_federation(self, this->config_.federationDomain());
1032 
1033  if (this->joinRepo_ == remote) {
1034  this->peers_[ this->joinRepo_]
1035  = OpenDDS::Federator::Manager::_duplicate(remoteManager.in());
1036  }
1037 
1038  //
1039  // Push our initial state out to the joining repository *after* we call
1040  // him back to join. This reduces the amount of duplicate data pushed
1041  // when a new (empty) repository is joining an existing federation.
1042  //
1044  ACE_DEBUG((LM_DEBUG,
1045  ACE_TEXT("(%P|%t) Federator::ManagerImpl::join_federation() - ")
1046  ACE_TEXT("repo id %d pushing state to repository with id %d.\n"),
1047  this->id().id(),
1048  remote));
1049  }
1050 
1051  this->pushState(peer);
1052 
1053  } catch (const CORBA::Exception& ex) {
1055  "ERROR: Federator::ManagerImpl::join_federation() - unsuccessful call to remote->join: ");
1056  throw Incomplete();
1057  }
1058 
1059  if (CORBA::is_nil(this->participantWriter_.in())) {
1060  //
1061  // Establish our update publications and subscriptions *after* we
1062  // have exchanged internal state with the first joining repository.
1063  //
1064  this->initialize();
1065  }
1066 
1067  // Adjust our joining state and give others the opportunity to proceed.
1069  ACE_DEBUG((LM_DEBUG,
1070  ACE_TEXT("(%P|%t) Federator::ManagerImpl::join_federation() - ")
1071  ACE_TEXT("repo id %d joined to repository with id %d.\n"),
1072  this->id().id(),
1073  remote));
1074  }
1075 
1076  this->federated_ = true;
1077  this->joiner_ = NIL_REPOSITORY;
1078  this->joining_.signal();
1079  return this->_this();
1080 }
RepoKey joiner_
Simple recursion avoidance during the join operations.
#define ACE_DEBUG(X)
void federationDomain(long domain)
Federation Id value.
ParticipantUpdateDataWriter_var participantWriter_
ParticipantUpdate writer.
IdToManagerMap peers_
The peer with which we have federated.
Config & config_
The configuration information for this manager.
CORBA::ORB_ptr orb()
Accessors for the ORB.
const RepoKey NIL_REPOSITORY
Definition: Federator.idl:36
int wait(const ACE_Time_Value *abstime)
ACE_Condition< ACE_SYNCH_MUTEX > joining_
Condition used to gate joining activities.
RepoKey joinRepo_
Repository to which we joined.
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
void initialize()
Establish the update publications and subscriptions.
ACE_TEXT("TCP_Factory")
ACE_SYNCH_MUTEX lock_
Critical section MUTEX.
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
::CORBA::Long RepoKey
const character_type * in(void) const
#define TheServiceParticipant
int signal(void)
void _tao_print_exception(const char *info, FILE *f=stdout) const
Definition: Exception.cpp:82
void pushState(Manager_ptr peer)
Push our current state to a remote repository.
Boolean is_nil(T x)

◆ leave_and_shutdown()

void OpenDDS::Federator::ManagerImpl::leave_and_shutdown ( void  )
virtual

Definition at line 1112 of file FederatorManagerImpl.cpp.

References info_, and TAO_DDS_DCPSInfo_i::shutdown().

1114 {
1115  // Shutdown the process via the repository object.
1116  this->info_->shutdown();
1117 }
virtual void shutdown()
Cause the entire repository to exit.
Definition: DCPSInfo_i.cpp:113
TAO_DDS_DCPSInfo_i * info_
The Info object reference to update.

◆ leave_federation()

void OpenDDS::Federator::ManagerImpl::leave_federation ( RepoKey  id)
virtual

Definition at line 1083 of file FederatorManagerImpl.cpp.

References ACE_DEBUG, ACE_TEXT(), config_, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::Config::federationDomain(), id(), info_, LM_DEBUG, peers_, and TAO_DDS_DCPSInfo_i::remove_by_owner().

1085 {
1087  ACE_DEBUG((LM_DEBUG,
1088  ACE_TEXT("(%P|%t) ManagerImpl::leave_federation( %d)\n"),
1089  this->id().id()));
1090  }
1091 
1092  // Remove the leaving repository from our outbound mappings.
1093  IdToManagerMap::iterator where = this->peers_.find(id);
1094 
1095  if (where != this->peers_.end()) {
1096  this->peers_.erase(where);
1097  }
1098 
1099  // Remove all the internal Entities owned by the leaving repository.
1101  throw Incomplete();
1102  }
1103 
1105  ACE_DEBUG((LM_DEBUG,
1106  ACE_TEXT("(%P|%t) ManagerImpl::leave_federation( %d) complete.\n"),
1107  this->id().id()));
1108  }
1109 }
#define ACE_DEBUG(X)
void federationDomain(long domain)
Federation Id value.
TAO_DDS_DCPSInfo_i * info_
The Info object reference to update.
const TAO_DDS_DCPSFederationId & id() const
Accessors for the federation Id value.
IdToManagerMap peers_
The peer with which we have federated.
Config & config_
The configuration information for this manager.
bool remove_by_owner(DDS::DomainId_t domain, long owner)
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30

◆ localRepo()

ACE_INLINE void OpenDDS::Federator::ManagerImpl::localRepo ( ::OpenDDS::DCPS::DCPSInfo_ptr  repo)

Capture a remote callable reference to the DCPSInfo.

Definition at line 33 of file FederatorManagerImpl.inl.

References ACE_INLINE, and localRepo_.

Referenced by InfoRepo::init().

34 {
35  this->localRepo_ = OpenDDS::DCPS::DCPSInfo::_duplicate(repo);
36 }
OpenDDS::DCPS::DCPSInfo_var localRepo_
Remotely callable reference to the local repository.

◆ orb() [1/2]

ACE_INLINE CORBA::ORB_ptr OpenDDS::Federator::ManagerImpl::orb ( void  )

Accessors for the ORB.

Definition at line 40 of file FederatorManagerImpl.inl.

References ACE_INLINE, orb_, and TAO_Pseudo_Var_T< ORB >::ptr().

Referenced by InfoRepo::init(), join_federation(), and pushState().

41 {
42  return this->orb_.ptr();
43 }
CORBA::ORB_var orb_
The ORB in which we are activated.
_retn_type ptr(void) const

◆ orb() [2/2]

ACE_INLINE void OpenDDS::Federator::ManagerImpl::orb ( CORBA::ORB_ptr  value)

Definition at line 47 of file FederatorManagerImpl.inl.

References CORBA::ORB::_duplicate(), OPENDDS_END_VERSIONED_NAMESPACE_DECL, and orb_.

48 {
49  this->orb_ = CORBA::ORB::_duplicate(value);
50 }
CORBA::ORB_var orb_
The ORB in which we are activated.
static CORBA::ORB_ptr _duplicate(CORBA::ORB_ptr orb)

◆ processCreate() [1/5]

void OpenDDS::Federator::ManagerImpl::processCreate ( const OwnerUpdate sample,
const DDS::SampleInfo info 
)
virtual

Null implementation for OwnerUpdate samples.

Implements OpenDDS::Federator::UpdateProcessor< OwnerUpdate >.

Definition at line 579 of file FederatorManagerImpl_updates.cpp.

References ACE_DEBUG, ACE_GUARD, ACE_TEXT(), TAO_DDS_DCPSInfo_i::changeOwnership(), OpenDDS::DCPS::DCPS_debug_level, deferred_lock_, deferredOwnerships_, OpenDDS::Federator::OwnerUpdate::domain, info_, LM_DEBUG, OpenDDS::Federator::OwnerUpdate::owner, OpenDDS::Federator::OwnerUpdate::participant, processDeferred(), and OpenDDS::Federator::OwnerUpdate::sender.

Referenced by initializeOwner(), initializeParticipant(), initializePublication(), initializeSubscription(), and initializeTopic().

580 {
582  OpenDDS::DCPS::RepoIdConverter converter(sample->participant);
583  ACE_DEBUG((LM_DEBUG,
584  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( OwnerUpdate): ")
585  ACE_TEXT("repo %d - [ domain %d/ participant %C/ sender %d/ owner %d ]\n"),
586  this->id().id(),
587  sample->domain,
588  std::string(converter).c_str(),
589  sample->sender,
590  sample->owner));
591  }
592 
593  // We could generate an error message here. Instead we let action be irrelevant.
594  if (false == this->info_->changeOwnership(sample->domain,
595  sample->participant,
596  sample->sender,
597  sample->owner)) {
598  {
600  guard,
601  this->deferred_lock_);
602  this->deferredOwnerships_.push_back(*sample);
603  }
604 
606  ACE_DEBUG((LM_DEBUG,
607  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( OwnerUpdate): ")
608  ACE_TEXT("deferred update.\n")));
609  }
610  }
611 
612  this->processDeferred();
613 }
#define ACE_DEBUG(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
TAO_DDS_DCPSInfo_i * info_
The Info object reference to update.
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
void processDeferred()
Handle any deferred updates that might have become processable.
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_Thread_Mutex deferred_lock_
Protect deferred updates.
std::list< OwnerUpdate > deferredOwnerships_
Deferred ownership updates.
bool changeOwnership(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, long sender, long owner)
assert new ownership for a participant and its contained entities.
Definition: DCPSInfo_i.cpp:152

◆ processCreate() [2/5]

void OpenDDS::Federator::ManagerImpl::processCreate ( const PublicationUpdate sample,
const DDS::SampleInfo info 
)
virtual

Create a proxy for a new publication.

Implements OpenDDS::Federator::UpdateProcessor< PublicationUpdate >.

Definition at line 616 of file FederatorManagerImpl_updates.cpp.

References ACE_DEBUG, ACE_GUARD, ACE_TEXT(), TAO_DDS_DCPSInfo_i::add_publication(), OpenDDS::Federator::PublicationUpdate::callback, OpenDDS::Federator::PublicationUpdate::datawriter_qos, OpenDDS::DCPS::DCPS_debug_level, deferred_lock_, deferredPublications_, OpenDDS::Federator::PublicationUpdate::domain, OpenDDS::Federator::PublicationUpdate::id, info_, LM_DEBUG, OpenDDS::Federator::PublicationUpdate::participant, processDeferred(), OpenDDS::Federator::PublicationUpdate::publisher_qos, OpenDDS::Federator::PublicationUpdate::serialized_type_info, OpenDDS::Federator::PublicationUpdate::topic, OpenDDS::Federator::PublicationUpdate::transport_context, and OpenDDS::Federator::PublicationUpdate::transport_info.

617 {
619  OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
620  OpenDDS::DCPS::RepoIdConverter pub_converter(sample->id);
621  ACE_DEBUG((LM_DEBUG,
622  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( PublicationUpdate): ")
623  ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"),
624  this->id().id(),
625  sample->domain,
626  std::string(part_converter).c_str(),
627  std::string(pub_converter).c_str()));
628  }
629 
630  if (false == this->info_->add_publication(sample->domain,
631  sample->participant,
632  sample->topic,
633  sample->id,
634  sample->callback,
635  sample->datawriter_qos,
636  sample->transport_info,
637  sample->transport_context,
638  sample->publisher_qos,
639  sample->serialized_type_info,
640  true)) {
641  {
643  guard,
644  this->deferred_lock_);
645  this->deferredPublications_.push_back(*sample);
646  }
647 
649  ACE_DEBUG((LM_DEBUG,
650  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( PublicationUpdate): ")
651  ACE_TEXT("deferred update.\n")));
652  }
653  }
654 
655  this->processDeferred();
656 }
#define ACE_DEBUG(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
TAO_DDS_DCPSInfo_i * info_
The Info object reference to update.
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
virtual OpenDDS::DCPS::GUID_t add_publication(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const OpenDDS::DCPS::GUID_t &topicId, OpenDDS::DCPS::DataWriterRemote_ptr publication, const DDS::DataWriterQos &qos, const OpenDDS::DCPS::TransportLocatorSeq &transInfo, const DDS::PublisherQos &publisherQos, const DDS::OctetSeq &serializedTypeInfo)
Definition: DCPSInfo_i.cpp:374
void processDeferred()
Handle any deferred updates that might have become processable.
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
std::list< PublicationUpdate > deferredPublications_
Deferred publication updates.
ACE_Thread_Mutex deferred_lock_
Protect deferred updates.

◆ processCreate() [3/5]

void OpenDDS::Federator::ManagerImpl::processCreate ( const SubscriptionUpdate sample,
const DDS::SampleInfo info 
)
virtual

Create a proxy for a new subscription.

Implements OpenDDS::Federator::UpdateProcessor< SubscriptionUpdate >.

Definition at line 659 of file FederatorManagerImpl_updates.cpp.

References ACE_DEBUG, ACE_GUARD, ACE_TEXT(), TAO_DDS_DCPSInfo_i::add_subscription(), OpenDDS::Federator::SubscriptionUpdate::callback, OpenDDS::Federator::SubscriptionUpdate::datareader_qos, OpenDDS::DCPS::DCPS_debug_level, deferred_lock_, deferredSubscriptions_, OpenDDS::Federator::SubscriptionUpdate::domain, OpenDDS::Federator::SubscriptionUpdate::expression_params, OpenDDS::Federator::SubscriptionUpdate::filter_class_name, OpenDDS::Federator::SubscriptionUpdate::filter_expression, OpenDDS::Federator::SubscriptionUpdate::id, info_, LM_DEBUG, OpenDDS::Federator::SubscriptionUpdate::participant, processDeferred(), OpenDDS::Federator::SubscriptionUpdate::serialized_type_info, OpenDDS::Federator::SubscriptionUpdate::subscriber_qos, OpenDDS::Federator::SubscriptionUpdate::topic, OpenDDS::Federator::SubscriptionUpdate::transport_context, and OpenDDS::Federator::SubscriptionUpdate::transport_info.

660 {
662  OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
663  OpenDDS::DCPS::RepoIdConverter sub_converter(sample->id);
664  ACE_DEBUG((LM_DEBUG,
665  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( SubscriptionUpdate): ")
666  ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
667  this->id().id(),
668  sample->domain,
669  std::string(part_converter).c_str(),
670  std::string(sub_converter).c_str()));
671  }
672 
673  if (false == this->info_->add_subscription(sample->domain,
674  sample->participant,
675  sample->topic,
676  sample->id,
677  sample->callback,
678  sample->datareader_qos,
679  sample->transport_info,
680  sample->transport_context,
681  sample->subscriber_qos,
682  sample->filter_class_name,
683  sample->filter_expression,
684  sample->expression_params,
685  sample->serialized_type_info,
686  true)) {
687  {
689  guard,
690  this->deferred_lock_);
691  this->deferredSubscriptions_.push_back(*sample);
692  }
693 
695  ACE_DEBUG((LM_DEBUG,
696  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( SubscriptionUpdate): ")
697  ACE_TEXT("deferred update.\n")));
698  }
699  }
700 
701  this->processDeferred();
702 }
#define ACE_DEBUG(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
TAO_DDS_DCPSInfo_i * info_
The Info object reference to update.
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
virtual OpenDDS::DCPS::GUID_t add_subscription(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const OpenDDS::DCPS::GUID_t &topicId, OpenDDS::DCPS::DataReaderRemote_ptr subscription, const DDS::DataReaderQos &qos, const OpenDDS::DCPS::TransportLocatorSeq &transInfo, const DDS::SubscriberQos &subscriberQos, const char *filterClassName, const char *filterExpression, const DDS::StringSeq &exprParams, const DDS::OctetSeq &serializedTypeInfo)
Definition: DCPSInfo_i.cpp:681
void processDeferred()
Handle any deferred updates that might have become processable.
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_Thread_Mutex deferred_lock_
Protect deferred updates.
std::list< SubscriptionUpdate > deferredSubscriptions_
Deferred subscription updates.

◆ processCreate() [4/5]

void OpenDDS::Federator::ManagerImpl::processCreate ( const ParticipantUpdate sample,
const DDS::SampleInfo info 
)
virtual

Create a proxy for a new participant.

Implements OpenDDS::Federator::UpdateProcessor< ParticipantUpdate >.

Definition at line 705 of file FederatorManagerImpl_updates.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_TEXT(), TAO_DDS_DCPSInfo_i::add_domain_participant(), TAO_DDS_DCPSInfo_i::changeOwnership(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::ParticipantUpdate::domain, OpenDDS::Federator::ParticipantUpdate::id, info_, LM_DEBUG, LM_ERROR, OpenDDS::Federator::ParticipantUpdate::owner, processDeferred(), OpenDDS::Federator::ParticipantUpdate::qos, and OpenDDS::Federator::ParticipantUpdate::sender.

706 {
708  OpenDDS::DCPS::RepoIdConverter converter(sample->id);
709  ACE_DEBUG((LM_DEBUG,
710  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( ParticipantUpdate): ")
711  ACE_TEXT("repo %d - [ domain %d/ participant %C/ owner %d ]\n"),
712  this->id().id(),
713  sample->domain,
714  std::string(converter).c_str(),
715  sample->owner));
716  }
717 
719  sample->domain,
720  sample->id,
721  sample->qos);
722  bool ownershipChanged = this->info_->changeOwnership(
723  sample->domain,
724  sample->id,
725  sample->sender,
726  sample->owner);
727  if (!ownershipChanged) {
728  ACE_ERROR((LM_ERROR,
729  ACE_TEXT("(%P|%t) ERROR: ")
730  ACE_TEXT("OpenDDS::Federator::ManagerImpl::processCreate(), ")
731  ACE_TEXT("Could not change ownership\n")));
732  }
733  this->processDeferred();
734 }
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
TAO_DDS_DCPSInfo_i * info_
The Info object reference to update.
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
virtual OpenDDS::DCPS::AddDomainStatus add_domain_participant(DDS::DomainId_t domain, const DDS::DomainParticipantQos &qos)
void processDeferred()
Handle any deferred updates that might have become processable.
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
bool changeOwnership(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, long sender, long owner)
assert new ownership for a participant and its contained entities.
Definition: DCPSInfo_i.cpp:152

◆ processCreate() [5/5]

void OpenDDS::Federator::ManagerImpl::processCreate ( const TopicUpdate sample,
const DDS::SampleInfo info 
)
virtual

Create a proxy for a new topic.

Implements OpenDDS::Federator::UpdateProcessor< TopicUpdate >.

Definition at line 737 of file FederatorManagerImpl_updates.cpp.

References ACE_DEBUG, ACE_GUARD, ACE_TEXT(), TAO_DDS_DCPSInfo_i::add_topic(), OpenDDS::Federator::TopicUpdate::datatype, OpenDDS::DCPS::DCPS_debug_level, deferred_lock_, deferredTopics_, OpenDDS::Federator::TopicUpdate::domain, OpenDDS::Federator::TopicUpdate::id, info_, LM_DEBUG, OpenDDS::Federator::TopicUpdate::participant, processDeferred(), OpenDDS::Federator::TopicUpdate::qos, and OpenDDS::Federator::TopicUpdate::topic.

738 {
740  OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
741  OpenDDS::DCPS::RepoIdConverter topic_converter(sample->id);
742  ACE_DEBUG((LM_DEBUG,
743  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( TopicUpdate): ")
744  ACE_TEXT("repo %d - [ domain %d/ participant %C/ topic %C ]\n"),
745  this->id().id(),
746  sample->domain,
747  std::string(part_converter).c_str(),
748  std::string(topic_converter).c_str()));
749  }
750 
751  if (false == this->info_->add_topic(sample->id,
752  sample->domain,
753  sample->participant,
754  sample->topic,
755  sample->datatype,
756  sample->qos)) {
757  {
759  guard,
760  this->deferred_lock_);
761  this->deferredTopics_.push_back(*sample);
762  }
763 
765  ACE_DEBUG((LM_DEBUG,
766  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( TopicUpdate): ")
767  ACE_TEXT("deferred update.\n")));
768  }
769  }
770 
771  this->processDeferred();
772 }
#define ACE_DEBUG(X)
std::list< TopicUpdate > deferredTopics_
Deferred topic updates.
#define ACE_GUARD(MUTEX, OBJ, LOCK)
TAO_DDS_DCPSInfo_i * info_
The Info object reference to update.
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
bool add_topic(const OpenDDS::DCPS::GUID_t &topicId, DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const char *topicName, const char *dataTypeName, const DDS::TopicQos &qos)
Add a previously existing topic to the repository.
Definition: DCPSInfo_i.cpp:232
void processDeferred()
Handle any deferred updates that might have become processable.
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_Thread_Mutex deferred_lock_
Protect deferred updates.

◆ processDeferred()

void OpenDDS::Federator::ManagerImpl::processDeferred ( )

Handle any deferred updates that might have become processable.

Definition at line 775 of file FederatorManagerImpl_updates.cpp.

References ACE_DEBUG, ACE_GUARD, ACE_TEXT(), TAO_DDS_DCPSInfo_i::add_publication(), TAO_DDS_DCPSInfo_i::add_subscription(), TAO_DDS_DCPSInfo_i::add_topic(), TAO_DDS_DCPSInfo_i::changeOwnership(), OpenDDS::DCPS::DCPS_debug_level, deferred_lock_, deferredOwnerships_, deferredPublications_, deferredSubscriptions_, deferredTopics_, info_, and LM_DEBUG.

Referenced by processCreate().

776 {
778  guard,
779  this->deferred_lock_);
780 
781  {
782  std::list<OwnerUpdate>::iterator current = this->deferredOwnerships_.begin();
783 
784  while (current != this->deferredOwnerships_.end()) {
785  if (this->info_->changeOwnership(current->domain,
786  current->participant,
787  current->sender,
788  current->owner)) {
790  OpenDDS::DCPS::RepoIdConverter converter(current->participant);
791  ACE_DEBUG((LM_DEBUG,
792  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDeferred( OwnerUpdate): ")
793  ACE_TEXT("repo %d - [ domain %d/ participant %C/ sender %d/ owner %d ]\n"),
794  this->id().id(),
795  current->domain,
796  std::string(converter).c_str(),
797  current->sender,
798  current->owner));
799  }
800 
801  current = this->deferredOwnerships_.erase(current);
802 
803  } else {
804  ++ current;
805  }
806  }
807  }
808 
809  {
810  std::list<TopicUpdate>::iterator current = this->deferredTopics_.begin();
811 
812  while (current != this->deferredTopics_.end()) {
813  if (true == this->info_->add_topic(current->id,
814  current->domain,
815  current->participant,
816  current->topic,
817  current->datatype,
818  current->qos)) {
820  OpenDDS::DCPS::RepoIdConverter part_converter(current->participant);
821  OpenDDS::DCPS::RepoIdConverter topic_converter(current->id);
822  ACE_DEBUG((LM_DEBUG,
823  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDeferred( TopicUpdate): ")
824  ACE_TEXT("repo %d - [ domain %d/ participant %C/ topic %C ]\n"),
825  this->id().id(),
826  current->domain,
827  std::string(part_converter).c_str(),
828  std::string(topic_converter).c_str()));
829  }
830 
831  current = this->deferredTopics_.erase(current);
832 
833  } else {
834  ++ current;
835  }
836  }
837  }
838 
839  {
840  std::list<PublicationUpdate>::iterator current = this->deferredPublications_.begin();
841 
842  while (current != this->deferredPublications_.end()) {
843 
844  if (true == this->info_->add_publication(current->domain,
845  current->participant,
846  current->topic,
847  current->id,
848  current->callback,
849  current->datawriter_qos,
850  current->transport_info,
851  current->transport_context,
852  current->publisher_qos,
853  current->serialized_type_info,
854  true)) {
856  OpenDDS::DCPS::RepoIdConverter part_converter(current->participant);
857  OpenDDS::DCPS::RepoIdConverter pub_converter(current->id);
858  ACE_DEBUG((LM_DEBUG,
859  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDeferred( PublicationUpdate): ")
860  ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"),
861  this->id().id(),
862  current->domain,
863  std::string(part_converter).c_str(),
864  std::string(pub_converter).c_str()));
865  }
866 
867  current = this->deferredPublications_.erase(current);
868 
869  } else {
870  ++ current;
871  }
872  }
873  }
874 
875  {
876  std::list<SubscriptionUpdate>::iterator current = this->deferredSubscriptions_.begin();
877 
878  while (current != this->deferredSubscriptions_.end()) {
879 
880  if (true == this->info_->add_subscription(current->domain,
881  current->participant,
882  current->topic,
883  current->id,
884  current->callback,
885  current->datareader_qos,
886  current->transport_info,
887  current->transport_context,
888  current->subscriber_qos,
889  current->filter_class_name,
890  current->filter_expression,
891  current->expression_params,
892  current->serialized_type_info,
893  true)) {
895  OpenDDS::DCPS::RepoIdConverter part_converter(current->participant);
896  OpenDDS::DCPS::RepoIdConverter sub_converter(current->id);
897  ACE_DEBUG((LM_DEBUG,
898  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDeferred( SubscriptionUpdate): ")
899  ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
900  this->id().id(),
901  current->domain,
902  std::string(part_converter).c_str(),
903  std::string(sub_converter).c_str()));
904  }
905 
906  current = this->deferredSubscriptions_.erase(current);
907 
908  } else {
909  ++ current;
910  }
911  }
912  }
913 
914 }
#define ACE_DEBUG(X)
std::list< TopicUpdate > deferredTopics_
Deferred topic updates.
#define ACE_GUARD(MUTEX, OBJ, LOCK)
TAO_DDS_DCPSInfo_i * info_
The Info object reference to update.
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
virtual OpenDDS::DCPS::GUID_t add_subscription(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const OpenDDS::DCPS::GUID_t &topicId, OpenDDS::DCPS::DataReaderRemote_ptr subscription, const DDS::DataReaderQos &qos, const OpenDDS::DCPS::TransportLocatorSeq &transInfo, const DDS::SubscriberQos &subscriberQos, const char *filterClassName, const char *filterExpression, const DDS::StringSeq &exprParams, const DDS::OctetSeq &serializedTypeInfo)
Definition: DCPSInfo_i.cpp:681
bool add_topic(const OpenDDS::DCPS::GUID_t &topicId, DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const char *topicName, const char *dataTypeName, const DDS::TopicQos &qos)
Add a previously existing topic to the repository.
Definition: DCPSInfo_i.cpp:232
virtual OpenDDS::DCPS::GUID_t add_publication(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const OpenDDS::DCPS::GUID_t &topicId, OpenDDS::DCPS::DataWriterRemote_ptr publication, const DDS::DataWriterQos &qos, const OpenDDS::DCPS::TransportLocatorSeq &transInfo, const DDS::PublisherQos &publisherQos, const DDS::OctetSeq &serializedTypeInfo)
Definition: DCPSInfo_i.cpp:374
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
std::list< PublicationUpdate > deferredPublications_
Deferred publication updates.
ACE_Thread_Mutex deferred_lock_
Protect deferred updates.
std::list< OwnerUpdate > deferredOwnerships_
Deferred ownership updates.
bool changeOwnership(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, long sender, long owner)
assert new ownership for a participant and its contained entities.
Definition: DCPSInfo_i.cpp:152
std::list< SubscriptionUpdate > deferredSubscriptions_
Deferred subscription updates.

◆ processDelete() [1/5]

void OpenDDS::Federator::ManagerImpl::processDelete ( const OwnerUpdate sample,
const DDS::SampleInfo info 
)
virtual

Null implementation for OwnerUpdate samples.

Implements OpenDDS::Federator::UpdateProcessor< OwnerUpdate >.

Definition at line 1101 of file FederatorManagerImpl_updates.cpp.

References ACE_DEBUG, ACE_GUARD, ACE_TEXT(), TAO_DDS_DCPSInfo_i::changeOwnership(), OpenDDS::DCPS::DCPS_debug_level, deferred_lock_, deferredOwnerships_, OpenDDS::Federator::OwnerUpdate::domain, info_, LM_DEBUG, OpenDDS::Federator::OwnerUpdate::owner, OpenDDS::Federator::OwnerUpdate::participant, and OpenDDS::Federator::OwnerUpdate::sender.

1102 {
1104  OpenDDS::DCPS::RepoIdConverter converter(sample->participant);
1105  ACE_DEBUG((LM_DEBUG,
1106  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( OwnerUpdate): ")
1107  ACE_TEXT("repo %d - [ domain %d/ participant %C/ sender %d/ owner %d ]\n"),
1108  this->id().id(),
1109  sample->domain,
1110  std::string(converter).c_str(),
1111  sample->sender,
1112  sample->owner));
1113  }
1114 
1115  // We could generate an error message here. Instead we let action be irrelevant.
1116  if (false == this->info_->changeOwnership(sample->domain,
1117  sample->participant,
1118  sample->sender,
1119  sample->owner)) {
1120  {
1122  guard,
1123  this->deferred_lock_);
1124  this->deferredOwnerships_.push_back(*sample);
1125  }
1126  ACE_DEBUG((LM_DEBUG,
1127  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( OwnerUpdate): ")
1128  ACE_TEXT("deferred update.\n")));
1129  }
1130 }
#define ACE_DEBUG(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
TAO_DDS_DCPSInfo_i * info_
The Info object reference to update.
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_Thread_Mutex deferred_lock_
Protect deferred updates.
std::list< OwnerUpdate > deferredOwnerships_
Deferred ownership updates.
bool changeOwnership(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, long sender, long owner)
assert new ownership for a participant and its contained entities.
Definition: DCPSInfo_i.cpp:152

◆ processDelete() [2/5]

void OpenDDS::Federator::ManagerImpl::processDelete ( const PublicationUpdate sample,
const DDS::SampleInfo info 
)
virtual

Delete a proxy for a publication.

Implements OpenDDS::Federator::UpdateProcessor< PublicationUpdate >.

Definition at line 1133 of file FederatorManagerImpl_updates.cpp.

References ACE_DEBUG, ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::PublicationUpdate::domain, OpenDDS::Federator::PublicationUpdate::id, info_, LM_DEBUG, OpenDDS::Federator::PublicationUpdate::participant, and TAO_DDS_DCPSInfo_i::remove_publication().

1134 {
1136  OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
1137  OpenDDS::DCPS::RepoIdConverter pub_converter(sample->id);
1138  ACE_DEBUG((LM_DEBUG,
1139  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( PublicationUpdate): ")
1140  ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"),
1141  this->id().id(),
1142  sample->domain,
1143  std::string(part_converter).c_str(),
1144  std::string(pub_converter).c_str()));
1145  }
1146 
1147  try {
1148  this->info_->remove_publication(
1149  sample->domain,
1150  sample->participant,
1151  sample->id);
1152 
1155  ACE_DEBUG((LM_DEBUG,
1156  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( PublicationUpdate): ")
1157  ACE_TEXT("the participant was already removed.\n")));
1158  }
1159  }
1160 }
#define ACE_DEBUG(X)
TAO_DDS_DCPSInfo_i * info_
The Info object reference to update.
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
virtual void remove_publication(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const OpenDDS::DCPS::GUID_t &publicationId)
Definition: DCPSInfo_i.cpp:630
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30

◆ processDelete() [3/5]

void OpenDDS::Federator::ManagerImpl::processDelete ( const SubscriptionUpdate sample,
const DDS::SampleInfo info 
)
virtual

Delete a proxy for a subscription.

Implements OpenDDS::Federator::UpdateProcessor< SubscriptionUpdate >.

Definition at line 1163 of file FederatorManagerImpl_updates.cpp.

References ACE_DEBUG, ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::SubscriptionUpdate::domain, OpenDDS::Federator::SubscriptionUpdate::id, info_, LM_DEBUG, OpenDDS::Federator::SubscriptionUpdate::participant, and TAO_DDS_DCPSInfo_i::remove_subscription().

1164 {
1166  OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
1167  OpenDDS::DCPS::RepoIdConverter sub_converter(sample->id);
1168  ACE_DEBUG((LM_DEBUG,
1169  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( SubscriptionUpdate): ")
1170  ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
1171  this->id().id(),
1172  sample->domain,
1173  std::string(part_converter).c_str(),
1174  std::string(sub_converter).c_str()));
1175  }
1176 
1177  try {
1178  this->info_->remove_subscription(
1179  sample->domain,
1180  sample->participant,
1181  sample->id);
1182 
1185  ACE_DEBUG((LM_DEBUG,
1186  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( SubscriptionUpdate): ")
1187  ACE_TEXT("the participant was already removed.\n")));
1188  }
1189  }
1190 }
virtual void remove_subscription(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const OpenDDS::DCPS::GUID_t &subscriptionId)
Definition: DCPSInfo_i.cpp:962
#define ACE_DEBUG(X)
TAO_DDS_DCPSInfo_i * info_
The Info object reference to update.
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30

◆ processDelete() [4/5]

void OpenDDS::Federator::ManagerImpl::processDelete ( const ParticipantUpdate sample,
const DDS::SampleInfo info 
)
virtual

Delete a proxy for a participant.

Implements OpenDDS::Federator::UpdateProcessor< ParticipantUpdate >.

Definition at line 1193 of file FederatorManagerImpl_updates.cpp.

References ACE_DEBUG, ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::ParticipantUpdate::domain, OpenDDS::Federator::ParticipantUpdate::id, info_, LM_DEBUG, and TAO_DDS_DCPSInfo_i::remove_domain_participant().

1194 {
1196  OpenDDS::DCPS::RepoIdConverter converter(sample->id);
1197  ACE_DEBUG((LM_DEBUG,
1198  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( ParticipantUpdate): ")
1199  ACE_TEXT("repo %d - [ domain %d/ participant %C ]\n"),
1200  this->id().id(),
1201  sample->domain,
1202  std::string(converter).c_str()));
1203  }
1204  try {
1206  sample->domain,
1207  sample->id);
1210  ACE_DEBUG((LM_DEBUG,
1211  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( ParticipantUpdate): ")
1212  ACE_TEXT("the participant was already removed.\n")));
1213  }
1214  }
1215 }
#define ACE_DEBUG(X)
TAO_DDS_DCPSInfo_i * info_
The Info object reference to update.
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
virtual void remove_domain_participant(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId)

◆ processDelete() [5/5]

void OpenDDS::Federator::ManagerImpl::processDelete ( const TopicUpdate sample,
const DDS::SampleInfo info 
)
virtual

Delete a proxy for a topic.

Implements OpenDDS::Federator::UpdateProcessor< TopicUpdate >.

Definition at line 1218 of file FederatorManagerImpl_updates.cpp.

References ACE_DEBUG, ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::TopicUpdate::domain, OpenDDS::Federator::TopicUpdate::id, info_, LM_DEBUG, OpenDDS::Federator::TopicUpdate::participant, and TAO_DDS_DCPSInfo_i::remove_topic().

1219 {
1221  OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
1222  OpenDDS::DCPS::RepoIdConverter topic_converter(sample->id);
1223  ACE_DEBUG((LM_DEBUG,
1224  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( TopicUpdate): ")
1225  ACE_TEXT("repo %d - [ domain %d/ participant %C/ topic %C ]\n"),
1226  this->id().id(),
1227  sample->domain,
1228  std::string(part_converter).c_str(),
1229  std::string(topic_converter).c_str()));
1230  }
1231 
1232  try {
1233  this->info_->remove_topic(
1234  sample->domain,
1235  sample->participant,
1236  sample->id);
1237 
1240  ACE_DEBUG((LM_DEBUG,
1241  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( TopicUpdate): ")
1242  ACE_TEXT("the participant was already removed.\n")));
1243  }
1244  } catch (OpenDDS::DCPS::Invalid_Domain&) {
1246  ACE_DEBUG((LM_DEBUG,
1247  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( TopicUpdate): ")
1248  ACE_TEXT("the domain %d no longer exists.\n"),sample->domain));
1249  }
1250  }
1251 }
#define ACE_DEBUG(X)
TAO_DDS_DCPSInfo_i * info_
The Info object reference to update.
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
virtual OpenDDS::DCPS::TopicStatus remove_topic(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const OpenDDS::DCPS::GUID_t &topicId)
Definition: DCPSInfo_i.cpp:325
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30

◆ processUpdateFilterExpressionParams()

void OpenDDS::Federator::ManagerImpl::processUpdateFilterExpressionParams ( const SubscriptionUpdate sample,
const DDS::SampleInfo info 
)
virtual

Update the proxy filter expression params for a subscription.

Reimplemented from OpenDDS::Federator::UpdateProcessor< SubscriptionUpdate >.

Definition at line 1037 of file FederatorManagerImpl_updates.cpp.

References ACE_DEBUG, ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::SubscriptionUpdate::domain, OpenDDS::Federator::SubscriptionUpdate::expression_params, OpenDDS::Federator::SubscriptionUpdate::id, info_, LM_DEBUG, OpenDDS::Federator::SubscriptionUpdate::participant, and TAO_DDS_DCPSInfo_i::update_subscription_params().

1039 {
1041  OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
1042  OpenDDS::DCPS::RepoIdConverter sub_converter(sample->id);
1043  ACE_DEBUG((LM_DEBUG,
1044  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateFilterExpressionParams(SubscriptionUpdate): ")
1045  ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
1046  this->id().id(),
1047  sample->domain,
1048  std::string(part_converter).c_str(),
1049  std::string(sub_converter).c_str()));
1050  }
1051 
1053  sample->domain,
1054  sample->participant,
1055  sample->id,
1056  sample->expression_params);
1057 }
#define ACE_DEBUG(X)
virtual ::CORBA::Boolean update_subscription_params(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const OpenDDS::DCPS::GUID_t &subscriptionId, const DDS::StringSeq &params)
TAO_DDS_DCPSInfo_i * info_
The Info object reference to update.
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30

◆ processUpdateQos1() [1/5]

void OpenDDS::Federator::ManagerImpl::processUpdateQos1 ( const OwnerUpdate sample,
const DDS::SampleInfo info 
)
virtual

Process ownership changes.

Implements OpenDDS::Federator::UpdateProcessor< OwnerUpdate >.

Definition at line 917 of file FederatorManagerImpl_updates.cpp.

References ACE_DEBUG, ACE_GUARD, ACE_TEXT(), TAO_DDS_DCPSInfo_i::changeOwnership(), OpenDDS::DCPS::DCPS_debug_level, deferred_lock_, deferredOwnerships_, OpenDDS::Federator::OwnerUpdate::domain, info_, LM_DEBUG, OpenDDS::Federator::OwnerUpdate::owner, OpenDDS::Federator::OwnerUpdate::participant, and OpenDDS::Federator::OwnerUpdate::sender.

918 {
920  OpenDDS::DCPS::RepoIdConverter converter(sample->participant);
921  ACE_DEBUG((LM_DEBUG,
922  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos1( OwnerUpdate): ")
923  ACE_TEXT("repo %d - [ domain %d/ participant %C/ sender %d/ owner %d ]\n"),
924  this->id().id(),
925  sample->domain,
926  std::string(converter).c_str(),
927  sample->sender,
928  sample->owner));
929  }
930 
931  if (false == this->info_->changeOwnership(sample->domain,
932  sample->participant,
933  sample->sender,
934  sample->owner)) {
935  {
937  guard,
938  this->deferred_lock_);
939 
940  this->deferredOwnerships_.push_back(*sample);
941  }
942  ACE_DEBUG((LM_DEBUG,
943  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos1( OwnerUpdate): ")
944  ACE_TEXT("deferred update.\n")));
945  }
946 }
#define ACE_DEBUG(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
TAO_DDS_DCPSInfo_i * info_
The Info object reference to update.
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_Thread_Mutex deferred_lock_
Protect deferred updates.
std::list< OwnerUpdate > deferredOwnerships_
Deferred ownership updates.
bool changeOwnership(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, long sender, long owner)
assert new ownership for a participant and its contained entities.
Definition: DCPSInfo_i.cpp:152

◆ processUpdateQos1() [2/5]

void OpenDDS::Federator::ManagerImpl::processUpdateQos1 ( const PublicationUpdate sample,
const DDS::SampleInfo info 
)
virtual

Update the proxy DataWriterQos for a publication.

Implements OpenDDS::Federator::UpdateProcessor< PublicationUpdate >.

Definition at line 949 of file FederatorManagerImpl_updates.cpp.

References ACE_DEBUG, ACE_TEXT(), OpenDDS::Federator::PublicationUpdate::datawriter_qos, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::PublicationUpdate::domain, OpenDDS::Federator::PublicationUpdate::id, info_, LM_DEBUG, OpenDDS::Federator::PublicationUpdate::participant, and TAO_DDS_DCPSInfo_i::update_publication_qos().

950 {
952  OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
953  OpenDDS::DCPS::RepoIdConverter pub_converter(sample->id);
954  ACE_DEBUG((LM_DEBUG,
955  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos1( PublicationUpdate): ")
956  ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"),
957  this->id().id(),
958  sample->domain,
959  std::string(part_converter).c_str(),
960  std::string(pub_converter).c_str()));
961  }
962 
964  sample->domain,
965  sample->participant,
966  sample->id,
967  sample->datawriter_qos);
968 }
#define ACE_DEBUG(X)
TAO_DDS_DCPSInfo_i * info_
The Info object reference to update.
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
virtual CORBA::Boolean update_publication_qos(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &partId, const OpenDDS::DCPS::GUID_t &dwId, const DDS::DataWriterQos &qos, const DDS::PublisherQos &publisherQos)

◆ processUpdateQos1() [3/5]

void OpenDDS::Federator::ManagerImpl::processUpdateQos1 ( const SubscriptionUpdate sample,
const DDS::SampleInfo info 
)
virtual

Update the proxy DataReaderQos for a subscription.

Implements OpenDDS::Federator::UpdateProcessor< SubscriptionUpdate >.

Definition at line 993 of file FederatorManagerImpl_updates.cpp.

References ACE_DEBUG, ACE_TEXT(), OpenDDS::Federator::SubscriptionUpdate::datareader_qos, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::SubscriptionUpdate::domain, OpenDDS::Federator::SubscriptionUpdate::id, info_, LM_DEBUG, OpenDDS::Federator::SubscriptionUpdate::participant, and TAO_DDS_DCPSInfo_i::update_subscription_qos().

994 {
996  OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
997  OpenDDS::DCPS::RepoIdConverter sub_converter(sample->id);
998  ACE_DEBUG((LM_DEBUG,
999  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos1( SubscriptionUpdate): ")
1000  ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
1001  this->id().id(),
1002  sample->domain,
1003  std::string(part_converter).c_str(),
1004  std::string(sub_converter).c_str()));
1005  }
1006 
1008  sample->domain,
1009  sample->participant,
1010  sample->id,
1011  sample->datareader_qos);
1012 }
#define ACE_DEBUG(X)
TAO_DDS_DCPSInfo_i * info_
The Info object reference to update.
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
virtual CORBA::Boolean update_subscription_qos(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &partId, const OpenDDS::DCPS::GUID_t &drId, const DDS::DataReaderQos &qos, const DDS::SubscriberQos &subscriberQos)

◆ processUpdateQos1() [4/5]

void OpenDDS::Federator::ManagerImpl::processUpdateQos1 ( const ParticipantUpdate sample,
const DDS::SampleInfo info 
)
virtual

Update the proxy ParticipantQos for a participant.

Implements OpenDDS::Federator::UpdateProcessor< ParticipantUpdate >.

Definition at line 1060 of file FederatorManagerImpl_updates.cpp.

References ACE_DEBUG, ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::ParticipantUpdate::domain, OpenDDS::Federator::ParticipantUpdate::id, info_, LM_DEBUG, OpenDDS::Federator::ParticipantUpdate::qos, and TAO_DDS_DCPSInfo_i::update_domain_participant_qos().

1061 {
1063  OpenDDS::DCPS::RepoIdConverter converter(sample->id);
1064  ACE_DEBUG((LM_DEBUG,
1065  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos1( ParticipantUpdate): ")
1066  ACE_TEXT("repo %d - [ domain %d/ participant %C ]\n"),
1067  this->id().id(),
1068  sample->domain,
1069  std::string(converter).c_str()));
1070  }
1071 
1073  sample->domain,
1074  sample->id,
1075  sample->qos);
1076 }
#define ACE_DEBUG(X)
TAO_DDS_DCPSInfo_i * info_
The Info object reference to update.
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
virtual CORBA::Boolean update_domain_participant_qos(DDS::DomainId_t domain, const OpenDDS::DCPS::GUID_t &participantId, const DDS::DomainParticipantQos &qos)
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30

◆ processUpdateQos1() [5/5]

void OpenDDS::Federator::ManagerImpl::processUpdateQos1 ( const TopicUpdate sample,
const DDS::SampleInfo info 
)
virtual

Update the proxy TopicQos for a topic.

Implements OpenDDS::Federator::UpdateProcessor< TopicUpdate >.

Definition at line 1079 of file FederatorManagerImpl_updates.cpp.

References ACE_DEBUG, ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::TopicUpdate::domain, OpenDDS::Federator::TopicUpdate::id, info_, LM_DEBUG, OpenDDS::Federator::TopicUpdate::participant, OpenDDS::Federator::TopicUpdate::qos, and TAO_DDS_DCPSInfo_i::update_topic_qos().

1080 {
1082  OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
1083  OpenDDS::DCPS::RepoIdConverter topic_converter(sample->id);
1084  ACE_DEBUG((LM_DEBUG,
1085  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos1( TopicUpdate): ")
1086  ACE_TEXT("repo %d - [ domain %d/ participant %C/ topic %C ]\n"),
1087  this->id().id(),
1088  sample->domain,
1089  std::string(part_converter).c_str(),
1090  std::string(topic_converter).c_str()));
1091  }
1092 
1093  this->info_->update_topic_qos(
1094  sample->id,
1095  sample->domain,
1096  sample->participant,
1097  sample->qos);
1098 }
#define ACE_DEBUG(X)
TAO_DDS_DCPSInfo_i * info_
The Info object reference to update.
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
virtual CORBA::Boolean update_topic_qos(const OpenDDS::DCPS::GUID_t &topicId, DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const DDS::TopicQos &qos)

◆ processUpdateQos2() [1/2]

void OpenDDS::Federator::ManagerImpl::processUpdateQos2 ( const PublicationUpdate sample,
const DDS::SampleInfo info 
)
virtual

Update the proxy PublisherQos for a publication.

Reimplemented from OpenDDS::Federator::UpdateProcessor< PublicationUpdate >.

Definition at line 971 of file FederatorManagerImpl_updates.cpp.

References ACE_DEBUG, ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::PublicationUpdate::domain, OpenDDS::Federator::PublicationUpdate::id, info_, LM_DEBUG, OpenDDS::Federator::PublicationUpdate::participant, OpenDDS::Federator::PublicationUpdate::publisher_qos, and TAO_DDS_DCPSInfo_i::update_publication_qos().

972 {
974  OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
975  OpenDDS::DCPS::RepoIdConverter pub_converter(sample->id);
976  ACE_DEBUG((LM_DEBUG,
977  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos2( PublicationUpdate): ")
978  ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"),
979  this->id().id(),
980  sample->domain,
981  std::string(part_converter).c_str(),
982  std::string(pub_converter).c_str()));
983  }
984 
986  sample->domain,
987  sample->participant,
988  sample->id,
989  sample->publisher_qos);
990 }
#define ACE_DEBUG(X)
TAO_DDS_DCPSInfo_i * info_
The Info object reference to update.
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
virtual CORBA::Boolean update_publication_qos(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &partId, const OpenDDS::DCPS::GUID_t &dwId, const DDS::DataWriterQos &qos, const DDS::PublisherQos &publisherQos)

◆ processUpdateQos2() [2/2]

void OpenDDS::Federator::ManagerImpl::processUpdateQos2 ( const SubscriptionUpdate sample,
const DDS::SampleInfo info 
)
virtual

Update the proxy SubscriberQos for a subscription.

Reimplemented from OpenDDS::Federator::UpdateProcessor< SubscriptionUpdate >.

Definition at line 1015 of file FederatorManagerImpl_updates.cpp.

References ACE_DEBUG, ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::SubscriptionUpdate::domain, OpenDDS::Federator::SubscriptionUpdate::id, info_, LM_DEBUG, OpenDDS::Federator::SubscriptionUpdate::participant, OpenDDS::Federator::SubscriptionUpdate::subscriber_qos, and TAO_DDS_DCPSInfo_i::update_subscription_qos().

1016 {
1018  OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
1019  OpenDDS::DCPS::RepoIdConverter sub_converter(sample->id);
1020  ACE_DEBUG((LM_DEBUG,
1021  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos2( SubscriptionUpdate): ")
1022  ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
1023  this->id().id(),
1024  sample->domain,
1025  std::string(part_converter).c_str(),
1026  std::string(sub_converter).c_str()));
1027  }
1028 
1030  sample->domain,
1031  sample->participant,
1032  sample->id,
1033  sample->subscriber_qos);
1034 }
#define ACE_DEBUG(X)
TAO_DDS_DCPSInfo_i * info_
The Info object reference to update.
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
virtual CORBA::Boolean update_subscription_qos(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &partId, const OpenDDS::DCPS::GUID_t &drId, const DDS::DataReaderQos &qos, const DDS::SubscriberQos &subscriberQos)

◆ pushState()

void OpenDDS::Federator::ManagerImpl::pushState ( Manager_ptr  peer)

Push our current state to a remote repository.

Definition at line 1254 of file FederatorManagerImpl_updates.cpp.

References OpenDDS::Federator::OwnerUpdate::action, OpenDDS::Federator::TopicUpdate::action, OpenDDS::Federator::ParticipantUpdate::action, OpenDDS::Federator::PublicationUpdate::action, OpenDDS::Federator::SubscriptionUpdate::action, OpenDDS::Federator::PublicationUpdate::callback, OpenDDS::Federator::SubscriptionUpdate::callback, config_, OpenDDS::Federator::CreateEntity, OpenDDS::Federator::SubscriptionUpdate::datareader_qos, OpenDDS::Federator::TopicUpdate::datatype, OpenDDS::Federator::PublicationUpdate::datawriter_qos, OpenDDS::Federator::OwnerUpdate::domain, OpenDDS::Federator::TopicUpdate::domain, OpenDDS::Federator::ParticipantUpdate::domain, OpenDDS::Federator::PublicationUpdate::domain, OpenDDS::Federator::SubscriptionUpdate::domain, TAO_DDS_DCPSInfo_i::domains(), OpenDDS::Federator::SubscriptionUpdate::expression_params, OpenDDS::Federator::Config::federationDomain(), OpenDDS::Federator::SubscriptionUpdate::filter_expression, DCPS_IR_Subscription::get_datareader_qos(), DCPS_IR_Publication::get_datawriter_qos(), DCPS_IR_Subscription::get_expr_params(), DCPS_IR_Subscription::get_filter_expression(), DCPS_IR_Publication::get_id(), DCPS_IR_Subscription::get_id(), DCPS_IR_Publication::get_participant_id(), DCPS_IR_Subscription::get_participant_id(), DCPS_IR_Publication::get_publisher_qos(), DCPS_IR_Subscription::get_subscriber_qos(), DCPS_IR_Publication::get_topic_id(), DCPS_IR_Subscription::get_topic_id(), DCPS_IR_Publication::get_transportLocatorSeq(), DCPS_IR_Subscription::get_transportLocatorSeq(), TAO_DDS_DCPSFederationId::id(), OpenDDS::Federator::TopicUpdate::id, OpenDDS::Federator::ParticipantUpdate::id, id(), OpenDDS::Federator::PublicationUpdate::id, OpenDDS::Federator::SubscriptionUpdate::id, TAO::String_var< charT >::in(), info_, OPENDDS_END_VERSIONED_NAMESPACE_DECL, orb(), TAO_DDS_DCPSInfo_i::orb(), OpenDDS::Federator::OwnerUpdate::owner, OpenDDS::Federator::ParticipantUpdate::owner, OpenDDS::Federator::OwnerUpdate::participant, OpenDDS::Federator::TopicUpdate::participant, OpenDDS::Federator::PublicationUpdate::participant, OpenDDS::Federator::SubscriptionUpdate::participant, OpenDDS::Federator::PublicationUpdate::publisher_qos, OpenDDS::Federator::TopicUpdate::qos, OpenDDS::Federator::ParticipantUpdate::qos, DCPS_IR_Subscription::reader(), OpenDDS::Federator::OwnerUpdate::sender, OpenDDS::Federator::TopicUpdate::sender, OpenDDS::Federator::ParticipantUpdate::sender, OpenDDS::Federator::PublicationUpdate::sender, OpenDDS::Federator::SubscriptionUpdate::sender, OpenDDS::Federator::SubscriptionUpdate::subscriber_qos, OpenDDS::Federator::TopicUpdate::topic, OpenDDS::Federator::PublicationUpdate::topic, OpenDDS::Federator::SubscriptionUpdate::topic, OpenDDS::Federator::PublicationUpdate::transport_info, OpenDDS::Federator::SubscriptionUpdate::transport_info, and DCPS_IR_Publication::writer().

Referenced by join_federation().

1255 {
1256  // foreach DCPS_IR_Domain
1257  // foreach DCPS_IR_Participant
1258  // peer->initializeParticipant(...)
1259  // peer->initializeOwner(...)
1260  // foreach DCPS_IR_Participant
1261  // foreach DCPS_IR_Topic
1262  // peer->initializeTopic(...)
1263  // foreach DCPS_IR_Publication
1264  // peer->initializePublication(...)
1265  // foreach DCPS_IR_Subscription
1266  // peer->initializeSubscription(...)
1267 
1268  // Process each domain within the repository.
1269  for (DCPS_IR_Domain_Map::const_iterator currentDomain
1270  = this->info_->domains().begin();
1271  currentDomain != this->info_->domains().end();
1272  ++currentDomain) {
1273 
1274  if (currentDomain->second->get_id() == this->config_.federationDomain()) {
1275  // Do not push the Federation domain publications.
1276  //continue;
1277  }
1278 
1279  // Process each participant within the current domain.
1280  for (DCPS_IR_Participant_Map::const_iterator currentParticipant
1281  = currentDomain->second->participants().begin();
1282  currentParticipant != currentDomain->second->participants().end();
1283  ++currentParticipant) {
1284 
1285  if (currentParticipant->second->isBitPublisher() == true) {
1286  // Do not push the built-in topic publications.
1287  continue;
1288  }
1289 
1290  // Initialize the participant on the peer.
1291  ParticipantUpdate participantSample;
1292  participantSample.sender = this->id().id();
1293  participantSample.action = CreateEntity;
1294 
1295  participantSample.owner = currentParticipant->second->owner();
1296  participantSample.domain = currentDomain->second->get_id();
1297  participantSample.id = currentParticipant->second->get_id();
1298  participantSample.qos = *currentParticipant->second->get_qos();
1299 
1300  peer->initializeParticipant(participantSample);
1301 
1302  // Initialize the ownership of the participant on the peer.
1303  OwnerUpdate ownerSample;
1304  ownerSample.sender = this->id().id();
1305  ownerSample.action = CreateEntity;
1306 
1307  ownerSample.domain = currentDomain->second->get_id();
1308  ownerSample.participant = currentParticipant->second->get_id();
1309  ownerSample.owner = currentParticipant->second->owner();
1310 
1311  peer->initializeOwner(ownerSample);
1312  }
1313 
1314  // Process each participant within the current domain.
1315  for (DCPS_IR_Participant_Map::const_iterator currentParticipant
1316  = currentDomain->second->participants().begin();
1317  currentParticipant != currentDomain->second->participants().end();
1318  ++currentParticipant) {
1319 
1320  if (currentParticipant->second->isBitPublisher() == true) {
1321  // Do not push the built-in topic publications.
1322  continue;
1323  }
1324 
1325  // Process each topic within the current particpant.
1326  for (DCPS_IR_Topic_Map::const_iterator currentTopic
1327  = currentParticipant->second->topics().begin();
1328  currentTopic != currentParticipant->second->topics().end();
1329  ++currentTopic) {
1330  TopicUpdate topicSample;
1331  topicSample.sender = this->id().id();
1332  topicSample.action = CreateEntity;
1333 
1334  topicSample.id = currentTopic->second->get_id();
1335  topicSample.domain = currentDomain->second->get_id();
1336  topicSample.participant = currentTopic->second->get_participant_id();
1337  topicSample.topic = currentTopic->second->get_topic_description()->get_name();
1338  topicSample.datatype = currentTopic->second->get_topic_description()->get_dataTypeName();
1339  topicSample.qos = *currentTopic->second->get_topic_qos();
1340 
1341  peer->initializeTopic(topicSample);
1342  }
1343 
1344  // Process each publication within the current particpant.
1345  for (DCPS_IR_Publication_Map::const_iterator currentPublication
1346  = currentParticipant->second->publications().begin();
1347  currentPublication != currentParticipant->second->publications().end();
1348  ++currentPublication) {
1349  PublicationUpdate publicationSample;
1350  publicationSample.sender = this->id().id();
1351  publicationSample.action = CreateEntity;
1352 
1353  DCPS_IR_Publication* p = currentPublication->second.get();
1354  CORBA::ORB_var orb = this->info_->orb();
1355  CORBA::String_var callback = orb->object_to_string(p->writer());
1356 
1357  publicationSample.domain = currentDomain->second->get_id();
1358  publicationSample.participant = p->get_participant_id();
1359  publicationSample.topic = p->get_topic_id();
1360  publicationSample.id = p->get_id();
1361  publicationSample.callback = callback.in();
1362  publicationSample.datawriter_qos = *p->get_datawriter_qos();
1363  publicationSample.publisher_qos = *p->get_publisher_qos();
1364  publicationSample.transport_info = p->get_transportLocatorSeq();
1365 
1366  peer->initializePublication(publicationSample);
1367  }
1368 
1369  // Process each subscription within the current particpant.
1370  for (DCPS_IR_Subscription_Map::const_iterator currentSubscription
1371  = currentParticipant->second->subscriptions().begin();
1372  currentSubscription != currentParticipant->second->subscriptions().end();
1373  ++currentSubscription) {
1374  SubscriptionUpdate subscriptionSample;
1375  subscriptionSample.sender = this->id().id();
1376  subscriptionSample.action = CreateEntity;
1377 
1378  DCPS_IR_Subscription* s = currentSubscription->second.get();
1379  CORBA::ORB_var orb = this->info_->orb();
1380  CORBA::String_var callback = orb->object_to_string(s->reader());
1381 
1382  subscriptionSample.domain = currentDomain->second->get_id();
1383  subscriptionSample.participant = s->get_participant_id();
1384  subscriptionSample.topic = s->get_topic_id();
1385  subscriptionSample.id = s->get_id();
1386  subscriptionSample.callback = callback.in();
1387  subscriptionSample.datareader_qos = *s->get_datareader_qos();
1388  subscriptionSample.subscriber_qos = *s->get_subscriber_qos();
1389  subscriptionSample.transport_info = s->get_transportLocatorSeq();
1390  subscriptionSample.filter_expression = s->get_filter_expression().c_str();
1391  subscriptionSample.expression_params = s->get_expr_params();
1392 
1393  peer->initializeSubscription(subscriptionSample);
1394  }
1395  }
1396  }
1397 }
OpenDDS::DCPS::GUID_t get_participant_id()
void federationDomain(long domain)
Federation Id value.
TAO_DDS_DCPSInfo_i * info_
The Info object reference to update.
OpenDDS::DCPS::GUID_t get_id()
const TAO_DDS_DCPSFederationId & id() const
Accessors for the federation Id value.
const DCPS_IR_Domain_Map & domains() const
Expose a readable reference of the domain map.
CORBA::ORB_ptr orb()
Expose the ORB.
Definition: DCPSInfo_i.cpp:119
Config & config_
The configuration information for this manager.
OpenDDS::DCPS::GUID_t get_topic_id()
CORBA::ORB_ptr orb()
Accessors for the ORB.
OpenDDS::DCPS::TransportLocatorSeq get_transportLocatorSeq() const
std::string get_filter_expression() const
DDS::PublisherQos * get_publisher_qos()
OpenDDS::DCPS::GUID_t get_id()
OpenDDS::DCPS::DataReaderRemote_ptr reader()
OpenDDS::DCPS::DataWriterRemote_ptr writer()
OpenDDS::DCPS::GUID_t get_participant_id()
const DDS::SubscriberQos * get_subscriber_qos()
Representative of a Subscription.
OpenDDS::DCPS::GUID_t get_topic_id()
OpenDDS::DCPS::TransportLocatorSeq get_transportLocatorSeq() const
const character_type * in(void) const
Representative of a Publication.
DDS::StringSeq get_expr_params() const
DDS::DataWriterQos * get_datawriter_qos()
const DDS::DataReaderQos * get_datareader_qos()
void id(RepoKey fedId)

◆ repository()

OpenDDS::DCPS::DCPSInfo_ptr OpenDDS::Federator::ManagerImpl::repository ( void  )
virtual

Definition at line 881 of file FederatorManagerImpl.cpp.

References ACE_DEBUG, ACE_TEXT(), config_, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::Config::federationDomain(), CORBA::is_nil(), OpenDDS::DCPS::RcHandle< T >::is_nil(), LM_DEBUG, localRepo_, OpenDDS::DCPS::static_rchandle_cast(), and TheServiceParticipant.

882 {
884  ACE_DEBUG((LM_DEBUG,
885  ACE_TEXT("(%P|%t) ManagerImpl::repository()\n")));
886  }
887 
889  = TheServiceParticipant->get_discovery(
890  this->config_.federationDomain());
891  OpenDDS::DCPS::DCPSInfo_var repo;
892  if (!disco.is_nil()) {
894  DCPS::static_rchandle_cast<DCPS::InfoRepoDiscovery>(disco);
895  repo = irDisco->get_dcps_info();
896  }
897 
898  if (CORBA::is_nil(repo.in())) {
899  return OpenDDS::DCPS::DCPSInfo::_duplicate(this->localRepo_.in());
900 
901  } else {
902  return OpenDDS::DCPS::DCPSInfo::_duplicate(repo.in());
903  }
904 }
#define ACE_DEBUG(X)
void federationDomain(long domain)
Federation Id value.
Config & config_
The configuration information for this manager.
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
RcHandle< T > static_rchandle_cast(const RcHandle< U > &h)
Definition: RcHandle_T.h:202
#define TheServiceParticipant
Boolean is_nil(T x)
OpenDDS::DCPS::DCPSInfo_var localRepo_
Remotely callable reference to the local repository.

◆ requestImage()

void OpenDDS::Federator::ManagerImpl::requestImage ( )
virtual

Request an image refresh to be sent to the specified callback (asynchronously).

Implements Update::Updater.

Definition at line 28 of file FederatorManagerImpl_updates.cpp.

29 {
30  /* This method intentionally left unimplemented. */
31 }

◆ shutdown()

void OpenDDS::Federator::ManagerImpl::shutdown ( void  )
virtual

Definition at line 1120 of file FederatorManagerImpl.cpp.

References federated_, info_, and TAO_DDS_DCPSInfo_i::shutdown().

1122 {
1123  // Prevent the removal of this repository from the federation during
1124  // shutdown processing.
1125  this->federated_ = false;
1126 
1127  // Shutdown the process via the repository object.
1128  this->info_->shutdown();
1129 }
virtual void shutdown()
Cause the entire repository to exit.
Definition: DCPSInfo_i.cpp:113
TAO_DDS_DCPSInfo_i * info_
The Info object reference to update.

◆ unregisterCallback()

void OpenDDS::Federator::ManagerImpl::unregisterCallback ( )
virtual

Definition at line 22 of file FederatorManagerImpl_updates.cpp.

23 {
24  /* This method intentionally left unimplemented. */
25 }

◆ update() [1/7]

void OpenDDS::Federator::ManagerImpl::update ( const Update::IdPath id,
const DDS::DomainParticipantQos qos 
)
virtual

Propagate updated Qos parameters for an entity.

Implements Update::Updater.

Definition at line 353 of file FederatorManagerImpl_updates.cpp.

References ACE_DEBUG, ACE_TEXT(), OpenDDS::Federator::ParticipantUpdate::action, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::ParticipantUpdate::domain, DDS::HANDLE_NIL, TAO_DDS_DCPSFederationId::id(), OpenDDS::Federator::ParticipantUpdate::id, id(), CORBA::is_nil(), LM_DEBUG, participantWriter_, OpenDDS::Federator::ParticipantUpdate::qos, OpenDDS::Federator::ParticipantUpdate::sender, and OpenDDS::Federator::UpdateQosValue1.

354 {
355  if (CORBA::is_nil(this->participantWriter_.in())) {
356  // Decline to publish data until we can.
357  return;
358  }
359 
360  ParticipantUpdate sample = ParticipantUpdate();
361  sample.sender = this->id().id();
362  sample.action = UpdateQosValue1;
363 
364  sample.domain = id.domain;
365  sample.id = id.id;
366  sample.qos = qos;
367 
369  OpenDDS::DCPS::RepoIdConverter converter(sample.id);
370  ACE_DEBUG((LM_DEBUG,
371  ACE_TEXT("(%P|%t) Federator::ManagerImpl::update( ParticipantUpdate): ")
372  ACE_TEXT("repo %d - [ domain %d/ participant %C ]\n"),
373  this->id().id(),
374  sample.domain,
375  std::string(converter).c_str()));
376  }
377 
378  this->participantWriter_->write(sample, DDS::HANDLE_NIL);
379 }
#define ACE_DEBUG(X)
const InstanceHandle_t HANDLE_NIL
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
const TAO_DDS_DCPSFederationId & id() const
Accessors for the federation Id value.
ParticipantUpdateDataWriter_var participantWriter_
ParticipantUpdate writer.
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
Boolean is_nil(T x)
void id(RepoKey fedId)

◆ update() [2/7]

void OpenDDS::Federator::ManagerImpl::update ( const Update::IdPath id,
const DDS::TopicQos qos 
)
virtual

Propagate updated Qos parameters for an entity.

Implements Update::Updater.

Definition at line 382 of file FederatorManagerImpl_updates.cpp.

References ACE_DEBUG, ACE_TEXT(), OpenDDS::Federator::TopicUpdate::action, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::TopicUpdate::domain, DDS::HANDLE_NIL, TAO_DDS_DCPSFederationId::id(), OpenDDS::Federator::TopicUpdate::id, id(), CORBA::is_nil(), LM_DEBUG, OpenDDS::Federator::TopicUpdate::participant, OpenDDS::Federator::TopicUpdate::qos, OpenDDS::Federator::TopicUpdate::sender, topicWriter_, and OpenDDS::Federator::UpdateQosValue1.

383 {
384  if (CORBA::is_nil(this->topicWriter_.in())) {
385  // Decline to publish data until we can.
386  return;
387  }
388 
389  TopicUpdate sample = TopicUpdate();
390  sample.sender = this->id().id();
391  sample.action = UpdateQosValue1;
392 
393  sample.id = id.id;
394  sample.domain = id.domain;
395  sample.participant = id.participant;
396  sample.qos = qos;
397 
399  OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
400  OpenDDS::DCPS::RepoIdConverter topic_converter(sample.id);
401  ACE_DEBUG((LM_DEBUG,
402  ACE_TEXT("(%P|%t) Federator::ManagerImpl::update( TopicUpdate): ")
403  ACE_TEXT("repo %d - [ domain %d/ participant %C/ topic %C ]\n"),
404  this->id().id(),
405  sample.domain,
406  std::string(part_converter).c_str(),
407  std::string(topic_converter).c_str()));
408  }
409 
410  this->topicWriter_->write(sample, DDS::HANDLE_NIL);
411 }
#define ACE_DEBUG(X)
const InstanceHandle_t HANDLE_NIL
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
const TAO_DDS_DCPSFederationId & id() const
Accessors for the federation Id value.
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
TopicUpdateDataWriter_var topicWriter_
TopicUpdate writer.
Boolean is_nil(T x)
void id(RepoKey fedId)

◆ update() [3/7]

void OpenDDS::Federator::ManagerImpl::update ( const Update::IdPath id,
const DDS::DataWriterQos qos 
)
virtual

Propagate updated Qos parameters for an entity.

Implements Update::Updater.

Definition at line 414 of file FederatorManagerImpl_updates.cpp.

References ACE_DEBUG, ACE_TEXT(), OpenDDS::Federator::PublicationUpdate::action, OpenDDS::Federator::PublicationUpdate::datawriter_qos, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::PublicationUpdate::domain, DDS::HANDLE_NIL, TAO_DDS_DCPSFederationId::id(), id(), OpenDDS::Federator::PublicationUpdate::id, CORBA::is_nil(), LM_DEBUG, OpenDDS::Federator::PublicationUpdate::participant, publicationWriter_, OpenDDS::Federator::PublicationUpdate::sender, and OpenDDS::Federator::UpdateQosValue1.

415 {
416  if (CORBA::is_nil(this->publicationWriter_.in())) {
417  // Decline to publish data until we can.
418  return;
419  }
420 
421  PublicationUpdate sample = PublicationUpdate();
422  sample.sender = this->id().id();
423  sample.action = UpdateQosValue1;
424 
425  sample.domain = id.domain;
426  sample.participant = id.participant;
427  sample.id = id.id;
428  sample.datawriter_qos = qos;
429 
431  OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
432  OpenDDS::DCPS::RepoIdConverter pub_converter(sample.id);
433  ACE_DEBUG((LM_DEBUG,
434  ACE_TEXT("(%P|%t) Federator::ManagerImpl::update( WriterUpdate): ")
435  ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"),
436  this->id().id(),
437  sample.domain,
438  std::string(part_converter).c_str(),
439  std::string(pub_converter).c_str()));
440  }
441 
442  this->publicationWriter_->write(sample, DDS::HANDLE_NIL);
443 }
#define ACE_DEBUG(X)
const InstanceHandle_t HANDLE_NIL
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
const TAO_DDS_DCPSFederationId & id() const
Accessors for the federation Id value.
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
PublicationUpdateDataWriter_var publicationWriter_
PublicationUpdate writer.
Boolean is_nil(T x)
void id(RepoKey fedId)

◆ update() [4/7]

void OpenDDS::Federator::ManagerImpl::update ( const Update::IdPath id,
const DDS::PublisherQos qos 
)
virtual

Propagate updated Qos parameters for an entity.

Implements Update::Updater.

Definition at line 446 of file FederatorManagerImpl_updates.cpp.

References ACE_DEBUG, ACE_TEXT(), OpenDDS::Federator::PublicationUpdate::action, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::PublicationUpdate::domain, DDS::HANDLE_NIL, TAO_DDS_DCPSFederationId::id(), id(), OpenDDS::Federator::PublicationUpdate::id, CORBA::is_nil(), LM_DEBUG, OpenDDS::Federator::PublicationUpdate::participant, publicationWriter_, OpenDDS::Federator::PublicationUpdate::publisher_qos, OpenDDS::Federator::PublicationUpdate::sender, and OpenDDS::Federator::UpdateQosValue2.

447 {
448  if (CORBA::is_nil(this->publicationWriter_.in())) {
449  // Decline to publish data until we can.
450  return;
451  }
452 
453  PublicationUpdate sample = PublicationUpdate();
454  sample.sender = this->id().id();
455  sample.action = UpdateQosValue2;
456 
457  sample.domain = id.domain;
458  sample.participant = id.participant;
459  sample.id = id.id;
460  sample.publisher_qos = qos;
461 
463  OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
464  OpenDDS::DCPS::RepoIdConverter pub_converter(sample.id);
465  ACE_DEBUG((LM_DEBUG,
466  ACE_TEXT("(%P|%t) Federator::ManagerImpl::update( PublisherUpdate): ")
467  ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"),
468  this->id().id(),
469  sample.domain,
470  std::string(part_converter).c_str(),
471  std::string(pub_converter).c_str()));
472  }
473 
474  this->publicationWriter_->write(sample, DDS::HANDLE_NIL);
475 }
#define ACE_DEBUG(X)
const InstanceHandle_t HANDLE_NIL
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
const TAO_DDS_DCPSFederationId & id() const
Accessors for the federation Id value.
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
PublicationUpdateDataWriter_var publicationWriter_
PublicationUpdate writer.
Boolean is_nil(T x)
void id(RepoKey fedId)

◆ update() [5/7]

void OpenDDS::Federator::ManagerImpl::update ( const Update::IdPath id,
const DDS::DataReaderQos qos 
)
virtual

Propagate updated Qos parameters for an entity.

Implements Update::Updater.

Definition at line 478 of file FederatorManagerImpl_updates.cpp.

References ACE_DEBUG, ACE_TEXT(), OpenDDS::Federator::SubscriptionUpdate::action, OpenDDS::Federator::SubscriptionUpdate::datareader_qos, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::SubscriptionUpdate::domain, DDS::HANDLE_NIL, TAO_DDS_DCPSFederationId::id(), id(), OpenDDS::Federator::SubscriptionUpdate::id, CORBA::is_nil(), LM_DEBUG, OpenDDS::Federator::SubscriptionUpdate::participant, OpenDDS::Federator::SubscriptionUpdate::sender, subscriptionWriter_, and OpenDDS::Federator::UpdateQosValue1.

479 {
480  if (CORBA::is_nil(this->subscriptionWriter_.in())) {
481  // Decline to publish data until we can.
482  return;
483  }
484 
485  SubscriptionUpdate sample = SubscriptionUpdate();
486  sample.sender = this->id().id();
487  sample.action = UpdateQosValue1;
488 
489  sample.domain = id.domain;
490  sample.participant = id.participant;
491  sample.id = id.id;
492  sample.datareader_qos = qos;
493 
495  OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
496  OpenDDS::DCPS::RepoIdConverter sub_converter(sample.id);
497  ACE_DEBUG((LM_DEBUG,
498  ACE_TEXT("(%P|%t) Federator::ManagerImpl::update( ReaderUpdate): ")
499  ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
500  this->id().id(),
501  sample.domain,
502  std::string(part_converter).c_str(),
503  std::string(sub_converter).c_str()));
504  }
505 
506  this->subscriptionWriter_->write(sample, DDS::HANDLE_NIL);
507 }
#define ACE_DEBUG(X)
const InstanceHandle_t HANDLE_NIL
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
const TAO_DDS_DCPSFederationId & id() const
Accessors for the federation Id value.
SubscriptionUpdateDataWriter_var subscriptionWriter_
SubscriptionUpdate writer.
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
Boolean is_nil(T x)
void id(RepoKey fedId)

◆ update() [6/7]

void OpenDDS::Federator::ManagerImpl::update ( const Update::IdPath id,
const DDS::SubscriberQos qos 
)
virtual

Propagate updated Qos parameters for an entity.

Implements Update::Updater.

Definition at line 541 of file FederatorManagerImpl_updates.cpp.

References ACE_DEBUG, ACE_TEXT(), OpenDDS::Federator::SubscriptionUpdate::action, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::SubscriptionUpdate::domain, DDS::HANDLE_NIL, TAO_DDS_DCPSFederationId::id(), id(), OpenDDS::Federator::SubscriptionUpdate::id, CORBA::is_nil(), LM_DEBUG, OpenDDS::Federator::SubscriptionUpdate::participant, OpenDDS::Federator::SubscriptionUpdate::sender, OpenDDS::Federator::SubscriptionUpdate::subscriber_qos, subscriptionWriter_, and OpenDDS::Federator::UpdateQosValue2.

542 {
543  if (CORBA::is_nil(this->subscriptionWriter_.in())) {
544  // Decline to publish data until we can.
545  return;
546  }
547 
548  SubscriptionUpdate sample = SubscriptionUpdate();
549  sample.sender = this->id().id();
550  sample.action = UpdateQosValue2;
551 
552  sample.domain = id.domain;
553  sample.participant = id.participant;
554  sample.id = id.id;
555  sample.subscriber_qos = qos;
556 
558  OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
559  OpenDDS::DCPS::RepoIdConverter sub_converter(sample.id);
560  ACE_DEBUG((LM_DEBUG,
561  ACE_TEXT("(%P|%t) Federator::ManagerImpl::update( SubscriberUpdate): ")
562  ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
563  this->id().id(),
564  sample.domain,
565  std::string(part_converter).c_str(),
566  std::string(sub_converter).c_str()));
567  }
568 
569  this->subscriptionWriter_->write(sample, DDS::HANDLE_NIL);
570 }
#define ACE_DEBUG(X)
const InstanceHandle_t HANDLE_NIL
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
const TAO_DDS_DCPSFederationId & id() const
Accessors for the federation Id value.
SubscriptionUpdateDataWriter_var subscriptionWriter_
SubscriptionUpdate writer.
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
Boolean is_nil(T x)
void id(RepoKey fedId)

◆ update() [7/7]

void OpenDDS::Federator::ManagerImpl::update ( const Update::IdPath id,
const DDS::StringSeq exprParams 
)
virtual

Propagate updated Qos parameters for an entity.

Implements Update::Updater.

Definition at line 510 of file FederatorManagerImpl_updates.cpp.

References ACE_DEBUG, ACE_TEXT(), OpenDDS::Federator::SubscriptionUpdate::action, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::SubscriptionUpdate::domain, OpenDDS::Federator::SubscriptionUpdate::expression_params, DDS::HANDLE_NIL, TAO_DDS_DCPSFederationId::id(), id(), OpenDDS::Federator::SubscriptionUpdate::id, CORBA::is_nil(), LM_DEBUG, OpenDDS::Federator::SubscriptionUpdate::participant, OpenDDS::Federator::SubscriptionUpdate::sender, subscriptionWriter_, and OpenDDS::Federator::UpdateFilterExpressionParams.

511 {
512  if (CORBA::is_nil(this->subscriptionWriter_.in())) {
513  // Decline to publish data until we can.
514  return;
515  }
516 
517  SubscriptionUpdate sample = SubscriptionUpdate();
518  sample.sender = this->id().id();
519  sample.action = UpdateFilterExpressionParams;
520  sample.domain = id.domain;
521  sample.participant = id.participant;
522  sample.id = id.id;
523  sample.expression_params = params;
524 
526  OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
527  OpenDDS::DCPS::RepoIdConverter sub_converter(sample.id);
528  ACE_DEBUG((LM_DEBUG,
529  ACE_TEXT("(%P|%t) Federator::ManagerImpl::update(FilterParams): ")
530  ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
531  this->id().id(),
532  sample.domain,
533  std::string(part_converter).c_str(),
534  std::string(sub_converter).c_str()));
535  }
536 
537  this->subscriptionWriter_->write(sample, DDS::HANDLE_NIL);
538 }
#define ACE_DEBUG(X)
const InstanceHandle_t HANDLE_NIL
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
const TAO_DDS_DCPSFederationId & id() const
Accessors for the federation Id value.
SubscriptionUpdateDataWriter_var subscriptionWriter_
SubscriptionUpdate writer.
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
Boolean is_nil(T x)
void id(RepoKey fedId)

Member Data Documentation

◆ config_

Config& OpenDDS::Federator::ManagerImpl::config_
private

The configuration information for this manager.

Definition at line 231 of file FederatorManagerImpl.h.

Referenced by destroy(), id(), initialize(), join_federation(), leave_federation(), pushState(), and repository().

◆ deferred_lock_

ACE_Thread_Mutex OpenDDS::Federator::ManagerImpl::deferred_lock_
private

Protect deferred updates.

Definition at line 294 of file FederatorManagerImpl.h.

Referenced by processCreate(), processDeferred(), processDelete(), and processUpdateQos1().

◆ deferredOwnerships_

std::list<OwnerUpdate> OpenDDS::Federator::ManagerImpl::deferredOwnerships_
private

Deferred ownership updates.

Definition at line 279 of file FederatorManagerImpl.h.

Referenced by processCreate(), processDeferred(), processDelete(), and processUpdateQos1().

◆ deferredPublications_

std::list<PublicationUpdate> OpenDDS::Federator::ManagerImpl::deferredPublications_
private

Deferred publication updates.

Definition at line 285 of file FederatorManagerImpl.h.

Referenced by processCreate(), and processDeferred().

◆ deferredSubscriptions_

std::list<SubscriptionUpdate> OpenDDS::Federator::ManagerImpl::deferredSubscriptions_
private

Deferred subscription updates.

Definition at line 288 of file FederatorManagerImpl.h.

Referenced by processCreate(), and processDeferred().

◆ deferredTopics_

std::list<TopicUpdate> OpenDDS::Federator::ManagerImpl::deferredTopics_
private

Deferred topic updates.

Definition at line 282 of file FederatorManagerImpl.h.

Referenced by processCreate(), and processDeferred().

◆ federated_

bool OpenDDS::Federator::ManagerImpl::federated_
private

Flag indicating that we are actively participating in a federation of repositories.

Definition at line 219 of file FederatorManagerImpl.h.

Referenced by finalize(), join_federation(), and shutdown().

◆ federationParticipant_

DDS::DomainParticipant_var OpenDDS::Federator::ManagerImpl::federationParticipant_
private

local DomainParticipant

Definition at line 246 of file FederatorManagerImpl.h.

Referenced by finalize(), and initialize().

◆ info_

TAO_DDS_DCPSInfo_i* OpenDDS::Federator::ManagerImpl::info_
private

◆ joiner_

RepoKey OpenDDS::Federator::ManagerImpl::joiner_
private

Simple recursion avoidance during the join operations.

Definition at line 212 of file FederatorManagerImpl.h.

Referenced by join_federation().

◆ joining_

ACE_Condition<ACE_SYNCH_MUTEX> OpenDDS::Federator::ManagerImpl::joining_
private

Condition used to gate joining activities.

Definition at line 209 of file FederatorManagerImpl.h.

Referenced by join_federation().

◆ joinRepo_

RepoKey OpenDDS::Federator::ManagerImpl::joinRepo_
private

Repository to which we joined.

Definition at line 215 of file FederatorManagerImpl.h.

Referenced by finalize(), and join_federation().

◆ localRepo_

OpenDDS::DCPS::DCPSInfo_var OpenDDS::Federator::ManagerImpl::localRepo_
private

Remotely callable reference to the local repository.

Definition at line 237 of file FederatorManagerImpl.h.

Referenced by localRepo(), and repository().

◆ lock_

ACE_SYNCH_MUTEX OpenDDS::Federator::ManagerImpl::lock_
private

Critical section MUTEX.

Definition at line 206 of file FederatorManagerImpl.h.

Referenced by join_federation().

◆ multicastEnabled_

bool OpenDDS::Federator::ManagerImpl::multicastEnabled_
private

Is multicast enabled?

Definition at line 291 of file FederatorManagerImpl.h.

Referenced by initialize(), and ManagerImpl().

◆ multicastResponder_

InfoRepoMulticastResponder OpenDDS::Federator::ManagerImpl::multicastResponder_
private

Multicast responder.

Definition at line 243 of file FederatorManagerImpl.h.

Referenced by finalize(), and initialize().

◆ orb_

CORBA::ORB_var OpenDDS::Federator::ManagerImpl::orb_
private

The ORB in which we are activated.

Definition at line 240 of file FederatorManagerImpl.h.

Referenced by finalize(), initialize(), and orb().

◆ ownerListener_

UpdateListener<OwnerUpdate, OwnerUpdateDataReader> OpenDDS::Federator::ManagerImpl::ownerListener_
private

TopicUpdate listener.

Definition at line 249 of file FederatorManagerImpl.h.

Referenced by finalize(), and initialize().

◆ ownerWriter_

OwnerUpdateDataWriter_var OpenDDS::Federator::ManagerImpl::ownerWriter_
private

TopicUpdate writer.

Definition at line 264 of file FederatorManagerImpl.h.

Referenced by create(), and initialize().

◆ participantListener_

UpdateListener<ParticipantUpdate, ParticipantUpdateDataReader> OpenDDS::Federator::ManagerImpl::participantListener_
private

ParticipantUpdate listener.

Definition at line 255 of file FederatorManagerImpl.h.

Referenced by finalize(), and initialize().

◆ participantWriter_

ParticipantUpdateDataWriter_var OpenDDS::Federator::ManagerImpl::participantWriter_
private

ParticipantUpdate writer.

Definition at line 270 of file FederatorManagerImpl.h.

Referenced by create(), destroy(), initialize(), join_federation(), and update().

◆ peers_

IdToManagerMap OpenDDS::Federator::ManagerImpl::peers_
private

The peer with which we have federated.

Definition at line 225 of file FederatorManagerImpl.h.

Referenced by finalize(), join_federation(), and leave_federation().

◆ publicationListener_

UpdateListener<PublicationUpdate, PublicationUpdateDataReader> OpenDDS::Federator::ManagerImpl::publicationListener_
private

PublicationUpdate listener.

Definition at line 258 of file FederatorManagerImpl.h.

Referenced by finalize(), and initialize().

◆ publicationWriter_

PublicationUpdateDataWriter_var OpenDDS::Federator::ManagerImpl::publicationWriter_
private

PublicationUpdate writer.

Definition at line 273 of file FederatorManagerImpl.h.

Referenced by create(), destroy(), initialize(), and update().

◆ sequence_

OpenDDS::DCPS::SequenceNumber OpenDDS::Federator::ManagerImpl::sequence_
private

The packet sequence number for data that we publish.

Definition at line 228 of file FederatorManagerImpl.h.

◆ subscriptionListener_

UpdateListener<SubscriptionUpdate, SubscriptionUpdateDataReader> OpenDDS::Federator::ManagerImpl::subscriptionListener_
private

SubscriptionUpdate listener.

Definition at line 261 of file FederatorManagerImpl.h.

Referenced by finalize(), and initialize().

◆ subscriptionWriter_

SubscriptionUpdateDataWriter_var OpenDDS::Federator::ManagerImpl::subscriptionWriter_
private

SubscriptionUpdate writer.

Definition at line 276 of file FederatorManagerImpl.h.

Referenced by create(), destroy(), initialize(), and update().

◆ topicListener_

UpdateListener<TopicUpdate, TopicUpdateDataReader> OpenDDS::Federator::ManagerImpl::topicListener_
private

TopicUpdate listener.

Definition at line 252 of file FederatorManagerImpl.h.

Referenced by finalize(), and initialize().

◆ topicWriter_

TopicUpdateDataWriter_var OpenDDS::Federator::ManagerImpl::topicWriter_
private

TopicUpdate writer.

Definition at line 267 of file FederatorManagerImpl.h.

Referenced by create(), destroy(), initialize(), and update().


The documentation for this class was generated from the following files: