12 #include "FederatorTypeSupportC.h" 13 #include "FederatorTypeSupportImpl.h" 33 #if !defined (__ACE_INLINE__) 43 : joining_(this->
lock_),
49 ownerListener_(*this),
50 topicListener_(*this),
51 participantListener_(*this),
52 publicationListener_(*this),
53 subscriptionListener_(*this),
54 multicastEnabled_(false)
58 ACE_TEXT(
"(%P|%t) Federator::ManagerImpl::ManagerImpl()\n")));
76 ACE_TEXT(
"(%P|%t) Federator::ManagerImpl::~ManagerImpl()\n")));
85 ACE_TEXT(
"(%P|%t) Federation::ManagerImpl::initialize()\n")));
101 DDS::DomainParticipantListener::_nil(),
105 ACE_TEXT(
"(%P|%t) ERROR: create_participant failed for ")
106 ACE_TEXT(
"repository %d in federation domain %d.\n"),
116 OwnerUpdateTypeSupportImpl::_var_type ownerUpdate =
new OwnerUpdateTypeSupportImpl();
119 this->federationParticipant_.in(),
122 ACE_TEXT(
"(%P|%t) ERROR: Unable to install ")
123 ACE_TEXT(
"OwnerUpdate type support for repository %d.\n"),
128 ParticipantUpdateTypeSupportImpl::_var_type participantUpdate =
new ParticipantUpdateTypeSupportImpl();
131 this->federationParticipant_.in(),
134 ACE_TEXT(
"(%P|%t) ERROR: Unable to install ")
135 ACE_TEXT(
"ParticipantUpdate type support for repository %d.\n"),
140 TopicUpdateTypeSupportImpl::_var_type topicUpdate =
new TopicUpdateTypeSupportImpl();
143 this->federationParticipant_.in(),
146 ACE_TEXT(
"(%P|%t) ERROR: Unable to install ")
147 ACE_TEXT(
"TopicUpdate type support for repository %d.\n"),
152 PublicationUpdateTypeSupportImpl::_var_type publicationUpdate =
new PublicationUpdateTypeSupportImpl();
155 this->federationParticipant_.in(),
158 ACE_TEXT(
"(%P|%t) ERROR: Unable to install ")
159 ACE_TEXT(
"PublicationUpdate type support for repository %d.\n"),
164 SubscriptionUpdateTypeSupportImpl::_var_type subscriptionUpdate =
new SubscriptionUpdateTypeSupportImpl();
167 this->federationParticipant_.in(),
170 ACE_TEXT(
"(%P|%t) ERROR: Unable to install ")
171 ACE_TEXT(
"SubscriptionUpdate type support for repository %d.\n"),
179 std::string config_name =
181 + std::string(
"FederationBITTransportConfig");
186 + std::string(
"FederationBITTCPTransportInst");
196 DDS::Subscriber_var subscriber
199 DDS::SubscriberListener::_nil(),
204 ACE_TEXT(
"(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
205 ACE_TEXT(
"failed to create subscriber for repository %d\n"),
211 ACE_TEXT(
"(%P|%t) Federator::ManagerImpl::initialize() - ")
212 ACE_TEXT(
"created federation subscriber for repository %d\n"),
224 ACE_TEXT(
"(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
225 ACE_TEXT(
"failed to bind transport config to federation subscriber.\n")));
233 DDS::Publisher_var publisher
236 DDS::PublisherListener::_nil(),
241 ACE_TEXT(
"(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
242 ACE_TEXT(
"failed to create publisher for repository %d\n"),
248 ACE_TEXT(
"(%P|%t) Federator::ManagerImpl::initialize() - ")
249 ACE_TEXT(
"created federation publisher for repository %d\n"),
261 ACE_TEXT(
"(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
262 ACE_TEXT(
"failed to bind transport config to federation publisher.\n")));
269 DDS::Topic_var topic;
270 DDS::TopicDescription_var description;
271 DDS::DataReader_var dataReader;
272 DDS::DataWriter_var dataWriter;
275 subscriber->get_default_datareader_qos(readerQos);
284 publisher->get_default_datawriter_qos(writerQos);
308 DDS::TopicListener::_nil(),
311 dataWriter = publisher->create_datawriter(
314 DDS::DataWriterListener::_nil(),
319 ACE_TEXT(
"(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
320 ACE_TEXT(
"failed to create OwnerUpdate writer for repository %d\n"),
325 this->
ownerWriter_ = OwnerUpdateDataWriter::_narrow(dataWriter.in());
329 ACE_TEXT(
"(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
330 ACE_TEXT(
"failed to extract typed OwnerUpdate writer.\n")));
339 ACE_TEXT(
"(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
340 ACE_TEXT(
"unable to extract typed OwnerUpdate writer.\n")));
345 ACE_TEXT(
"(%P|%t) Federator::ManagerImpl::initialize() - ")
346 ACE_TEXT(
"created federation OwnerUpdate writer %C for repository %d\n"),
347 std::string(converter).c_str(),
353 dataReader = subscriber->create_datareader(
361 ACE_TEXT(
"(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
362 ACE_TEXT(
"failed to create OwnerUpdate reader for repository %d\n"),
372 ACE_TEXT(
"(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
373 ACE_TEXT(
"unable to extract typed OwnerUpdate reader.\n")));
378 ACE_TEXT(
"(%P|%t) Federator::ManagerImpl::initialize() - ")
379 ACE_TEXT(
"created federation OwnerUpdate reader %C for repository %d\n"),
380 std::string(converter).c_str(),
389 DDS::TopicListener::_nil(),
391 dataWriter = publisher->create_datawriter(
394 DDS::DataWriterListener::_nil(),
399 ACE_TEXT(
"(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
400 ACE_TEXT(
"failed to create TopicUpdate writer for repository %d\n"),
406 = TopicUpdateDataWriter::_narrow(dataWriter.in());
410 ACE_TEXT(
"(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
411 ACE_TEXT(
"failed to extract typed TopicUpdate writer.\n")));
420 ACE_TEXT(
"(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
421 ACE_TEXT(
"unable to extract typed TopicUpdate writer.\n")));
426 ACE_TEXT(
"(%P|%t) Federator::ManagerImpl::initialize() - ")
427 ACE_TEXT(
"created federation TopicUpdate writer %C for repository %d\n"),
428 std::string(converter).c_str(),
434 dataReader = subscriber->create_datareader(
442 ACE_TEXT(
"(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
443 ACE_TEXT(
"failed to create TopicUpdate reader for repository %d\n"),
453 ACE_TEXT(
"(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
454 ACE_TEXT(
"unable to extract typed TopicUpdate reader.\n")));
459 ACE_TEXT(
"(%P|%t) Federator::ManagerImpl::initialize() - ")
460 ACE_TEXT(
"created federation TopicUpdate reader %C for repository %d\n"),
461 std::string(converter).c_str(),
470 DDS::TopicListener::_nil(),
472 dataWriter = publisher->create_datawriter(
475 DDS::DataWriterListener::_nil(),
480 ACE_TEXT(
"(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
481 ACE_TEXT(
"failed to create ParticipantUpdate writer for repository %d\n"),
487 = ParticipantUpdateDataWriter::_narrow(dataWriter.in());
491 ACE_TEXT(
"(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
492 ACE_TEXT(
"failed to extract typed ParticipantUpdate writer.\n")));
501 ACE_TEXT(
"(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
502 ACE_TEXT(
"unable to extract typed ParticipantUpdate writer.\n")));
507 ACE_TEXT(
"(%P|%t) Federator::ManagerImpl::initialize() - ")
508 ACE_TEXT(
"created federation ParticipantUpdate writer %C for repository %d\n"),
509 std::string(converter).c_str(),
515 dataReader = subscriber->create_datareader(
523 ACE_TEXT(
"(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
524 ACE_TEXT(
"failed to create ParticipantUpdate reader for repository %d\n"),
534 ACE_TEXT(
"(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
535 ACE_TEXT(
"unable to extract typed ParticipantUpdate reader.\n")));
540 ACE_TEXT(
"(%P|%t) Federator::ManagerImpl::initialize() - ")
541 ACE_TEXT(
"created federation ParticipantUpdate reader %C for repository %d\n"),
542 std::string(converter).c_str(),
551 DDS::TopicListener::_nil(),
553 dataWriter = publisher->create_datawriter(
556 DDS::DataWriterListener::_nil(),
561 ACE_TEXT(
"(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
562 ACE_TEXT(
"failed to create PublicationUpdate writer for repository %d\n"),
568 = PublicationUpdateDataWriter::_narrow(dataWriter.in());
572 ACE_TEXT(
"(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
573 ACE_TEXT(
"failed to extract typed PublicationUpdate writer.\n")));
582 ACE_TEXT(
"(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
583 ACE_TEXT(
"unable to extract typed PublicationUpdate writer.\n")));
588 ACE_TEXT(
"(%P|%t) Federator::ManagerImpl::initialize() - ")
589 ACE_TEXT(
"created federation PublicationUpdate writer %C for repository %d\n"),
590 std::string(converter).c_str(),
596 dataReader = subscriber->create_datareader(
604 ACE_TEXT(
"(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
605 ACE_TEXT(
"failed to create PublicationUpdate reader for repository %d\n"),
615 ACE_TEXT(
"(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
616 ACE_TEXT(
"unable to extract typed PublicationUpdate reader.\n")));
621 ACE_TEXT(
"(%P|%t) Federator::ManagerImpl::initialize() - ")
622 ACE_TEXT(
"created federation PublicationUpdate reader %C for repository %d\n"),
623 std::string(converter).c_str(),
632 DDS::TopicListener::_nil(),
634 dataWriter = publisher->create_datawriter(
637 DDS::DataWriterListener::_nil(),
642 ACE_TEXT(
"(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
643 ACE_TEXT(
"failed to create SubscriptionUpdate writer for repository %d\n"),
649 = SubscriptionUpdateDataWriter::_narrow(dataWriter.in());
653 ACE_TEXT(
"(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
654 ACE_TEXT(
"failed to extract typed SubscriptionUpdate writer.\n")));
663 ACE_TEXT(
"(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
664 ACE_TEXT(
"unable to extract typed SubscriptionUpdate writer.\n")));
669 ACE_TEXT(
"(%P|%t) Federator::ManagerImpl::initialize() - ")
670 ACE_TEXT(
"created federation SubscriptionUpdate writer %C for repository %d\n"),
671 std::string(converter).c_str(),
677 dataReader = subscriber->create_datareader(
685 ACE_TEXT(
"(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
686 ACE_TEXT(
"failed to create SubscriptionUpdate reader for repository %d\n"),
696 ACE_TEXT(
"(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
697 ACE_TEXT(
"unable to extract typed SubscriptionUpdate reader.\n")));
702 ACE_TEXT(
"(%P|%t) Federator::ManagerImpl::initialize() - ")
703 ACE_TEXT(
"created federation SubscriptionUpdate reader %C for repository %d\n"),
704 std::string(converter).c_str(),
710 #if defined (ACE_HAS_IP_MULTICAST) 720 ACE_CString mde(this->
orb_->orb_core()->orb_params()->mcast_discovery_endpoint());
727 const char *port_number =
ACE_OS::getenv(
"OpenDDSFederationPort");
729 if (port_number != 0) {
739 if (mde.length() != 0) {
742 mde.c_str()) == -1) {
744 ACE_TEXT(
"(%P|%t) ERROR: Unable to initialize ")
745 ACE_TEXT(
"the multicast responder for repository %d.\n"),
754 #if defined (ACE_HAS_IPV6) 761 ACE_TEXT(
"(%P|%t) ERROR: Unable to initialize ")
762 ACE_TEXT(
"the multicast responder for repository %d.\n"),
772 ACE_TEXT(
"(%P|%t) ERROR: Unable to register event handler ")
780 ACE_TEXT(
"(%P|%t) Federator::ManagerImpl::initialize() - ")
781 ACE_TEXT(
"multicast server setup is complete.\n")));
795 ACE_TEXT(
"(%P|%t) Federator::ManagerImpl::finalize()\n")));
813 if (where == this->
peers_.end()) {
815 ACE_TEXT(
"(%P|%t) Federator::Manager::finalize: ")
816 ACE_TEXT(
"repository %d - all attachment to federation left.\n"),
822 ACE_TEXT(
"(%P|%t) ERROR: Federator::Manager::finalize: ")
823 ACE_TEXT(
"repository %d not currently attached to a federation.\n"),
827 where->second->leave_federation(this->
id().
id());
834 ACE_TEXT(
"ERROR: Federator::ManagerImpl::finalize() - ")
835 ACE_TEXT(
"unable to leave remote federation "));
841 this->
orb_->orb_core()->reactor()->remove_handler(
850 if (entities_error) {
852 "unable to release resources for repository %d: %C\n",
861 "unable to release the participant for repository %d: %C\n",
874 ACE_TEXT(
"(%P|%t) ManagerImpl::federation_id()\n")));
877 return this->
id().
id();
880 OpenDDS::DCPS::DCPSInfo_ptr
885 ACE_TEXT(
"(%P|%t) ManagerImpl::repository()\n")));
891 OpenDDS::DCPS::DCPSInfo_var repo;
895 repo = irDisco->get_dcps_info();
899 return OpenDDS::DCPS::DCPSInfo::_duplicate(this->
localRepo_.in());
902 return OpenDDS::DCPS::DCPSInfo::_duplicate(repo.in());
911 ACE_TEXT(
"(%P|%t) ManagerImpl::discover_federation( %C)\n"),
927 ACE_TEXT(
"(%P|%t) ManagerImpl::join_federation( peer, %d)\n"),
935 remote = peer->federation_id();
939 ACE_TEXT(
"(%P|%t) Federator::ManagerImpl::join_federation() - ")
940 ACE_TEXT(
"repo id %d entered from repository with id %d.\n"),
947 ACE_TEXT(
"ERROR: Federator::ManagerImpl::join_federation() - ")
948 ACE_TEXT(
"unable to obtain remote federation Id value: "));
956 ACE_TEXT(
"(%P|%t) Federator::ManagerImpl::join_federation() - ")
957 ACE_TEXT(
"repo id %d leaving after reentry from repository with id %d.\n"),
962 return this->_this();
974 return this->_this();
999 OpenDDS::DCPS::DCPSInfo_var remoteRepo = peer->repository();
1005 ACE_TEXT(
"(%P|%t) FederatorManagerImpl::join_federation() - ")
1006 ACE_TEXT(
"id %d obtained reference to id %d:\n")
1010 remoteRepoIor.
in()));
1014 std::ostringstream oss;
1016 std::string key_string = oss.str();
1022 "ERROR: Federator::ManagerImpl::join_federation() - Unable to join with remote: ");
1029 Manager_var
self = this->_this();
1030 Manager_var remoteManager
1035 = OpenDDS::Federator::Manager::_duplicate(remoteManager.in());
1045 ACE_TEXT(
"(%P|%t) Federator::ManagerImpl::join_federation() - ")
1046 ACE_TEXT(
"repo id %d pushing state to repository with id %d.\n"),
1055 "ERROR: Federator::ManagerImpl::join_federation() - unsuccessful call to remote->join: ");
1070 ACE_TEXT(
"(%P|%t) Federator::ManagerImpl::join_federation() - ")
1071 ACE_TEXT(
"repo id %d joined to repository with id %d.\n"),
1079 return this->_this();
1088 ACE_TEXT(
"(%P|%t) ManagerImpl::leave_federation( %d)\n"),
1093 IdToManagerMap::iterator where = this->
peers_.find(
id);
1095 if (where != this->
peers_.end()) {
1096 this->
peers_.erase(where);
1106 ACE_TEXT(
"(%P|%t) ManagerImpl::leave_federation( %d) complete.\n"),
bool multicastEnabled_
Is multicast enabled?
RepoKey joiner_
Simple recursion avoidance during the join operations.
#define PARTICIPANT_QOS_DEFAULT
const string PUBLICATIONUPDATETOPICNAME
const string TOPICUPDATETOPICNAME
const string OWNERUPDATETYPENAME
virtual void initializeSubscription(const OpenDDS::Federator::SubscriptionUpdate &data)
UpdateListener< SubscriptionUpdate, SubscriptionUpdateDataReader > subscriptionListener_
SubscriptionUpdate listener.
ReliabilityQosPolicy reliability
virtual void shutdown()
Cause the entire repository to exit.
void federationDomain(long domain)
Federation Id value.
TAO_DDS_DCPSInfo_i * info_
The Info object reference to update.
Duration_t max_blocking_time
InfoRepoMulticastResponder multicastResponder_
Multicast responder.
DurabilityQosPolicy durability
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
const TAO_DDS_DCPSFederationId & id() const
Accessors for the federation Id value.
HistoryQosPolicyKind kind
CORBA::ORB_var orb_
The ORB in which we are activated.
virtual void initializePublication(const OpenDDS::Federator::PublicationUpdate &data)
ParticipantUpdateDataWriter_var participantWriter_
ParticipantUpdate writer.
IdToManagerMap peers_
The peer with which we have federated.
virtual OpenDDS::DCPS::DCPSInfo_ptr repository()
virtual void initializeOwner(const OpenDDS::Federator::OwnerUpdate &data)
Config & config_
The configuration information for this manager.
SubscriptionUpdateDataWriter_var subscriptionWriter_
SubscriptionUpdate writer.
char * getenv(const char *symbol)
ACE_Guard< ACE_Thread_Mutex > lock_
const string SUBSCRIPTIONUPDATETOPICNAME
OwnerUpdateDataWriter_var ownerWriter_
TopicUpdate writer.
void bind_config(const OPENDDS_STRING &name, DDS::Entity_ptr entity)
const DDS::StatusMask DEFAULT_STATUS_MASK
Discovery Strategy class that implements InfoRepo discovery.
CORBA::ORB_ptr orb()
Accessors for the ORB.
const RepoKey NIL_REPOSITORY
int register_handler(ACE_Event_Handler *event_handler, ACE_Reactor_Mask mask)
int wait(const ACE_Time_Value *abstime)
virtual void initializeParticipant(const OpenDDS::Federator::ParticipantUpdate &data)
ACE_Condition< ACE_SYNCH_MUTEX > joining_
Condition used to gate joining activities.
Implements the OpenDDS::DCPS::DataWriterRemote interfaces and DDS::DataWriter interfaces.
const string SUBSCRIPTIONUPDATETYPENAME
ReliabilityQosPolicyKind kind
DurabilityQosPolicyKind kind
DurabilityQosPolicy durability
UpdateListener< OwnerUpdate, OwnerUpdateDataReader > ownerListener_
TopicUpdate listener.
RepoKey joinRepo_
Repository to which we joined.
bool remove_by_owner(DDS::DomainId_t domain, long owner)
virtual Manager_ptr join_federation(Manager_ptr peer, FederationDomain federation)
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
void initialize()
Establish the update publications and subscriptions.
virtual void initializeTopic(const OpenDDS::Federator::TopicUpdate &data)
virtual void leave_and_shutdown()
Implements the DDS::DataReader interface.
const string PUBLICATIONUPDATETYPENAME
UpdateListener< PublicationUpdate, PublicationUpdateDataReader > publicationListener_
PublicationUpdate listener.
DDS::DomainId_t FederationDomain
ManagerImpl(Config &config)
virtual CORBA::Boolean discover_federation(const char *ior)
UpdateListener< ParticipantUpdate, ParticipantUpdateDataReader > participantListener_
ParticipantUpdate listener.
#define PUBLISHER_QOS_DEFAULT
TransportInst_rch create_inst(const OPENDDS_STRING &name, const OPENDDS_STRING &transport_type)
TransportConfig_rch create_config(const OPENDDS_STRING &name)
int init(CORBA::ORB_ptr orb, u_short port, const char *mcast_addr)
Initialization method.
virtual void leave_federation(RepoKey id)
void processCreate(const OwnerUpdate *sample, const DDS::SampleInfo *info)
Null implementation for OwnerUpdate samples.
virtual RepoKey federation_id()
ACE_SYNCH_MUTEX lock_
Critical section MUTEX.
#define SUBSCRIBER_QOS_DEFAULT
#define ACE_DEFAULT_MULTICASTV6_ADDR
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
ReliabilityQosPolicy reliability
const string PARTICIPANTUPDATETOPICNAME
const char * retcode_to_string(DDS::ReturnCode_t value)
RcHandle< T > static_rchandle_cast(const RcHandle< U > &h)
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
void finalize()
Release resources gracefully.
UpdateListener< TopicUpdate, TopicUpdateDataReader > topicListener_
TopicUpdate listener.
static const char DEFAULT_INST_PREFIX[]
static TransportRegistry * instance()
Return a singleton instance of this class.
const ReturnCode_t RETCODE_OK
PublicationUpdateDataWriter_var publicationWriter_
PublicationUpdate writer.
#define TheParticipantFactory
DDS::DomainParticipant_var federationParticipant_
local DomainParticipant
const character_type * in(void) const
const string OWNERUPDATETOPICNAME
#define ACE_DEFAULT_MULTICAST_ADDR
TopicUpdateDataWriter_var topicWriter_
TopicUpdate writer.
#define TheServiceParticipant
The Internal API and Implementation of OpenDDS.
void _tao_print_exception(const char *info, FILE *f=stdout) const
void pushState(Manager_ptr peer)
Push our current state to a remote repository.
const string PARTICIPANTUPDATETYPENAME
OpenDDS::DCPS::DCPSInfo_var localRepo_
Remotely callable reference to the local repository.
#define TOPIC_QOS_DEFAULT
const string TOPICUPDATETYPENAME