OpenDDS  Snapshot(2023/03/10-19:29)
Classes | Public Member Functions | Private Attributes | List of all members
TAO_DDS_DCPSInfo_i Class Reference

Implementation of the DCPSInfo. More...

#include <DCPSInfo_i.h>

Inheritance diagram for TAO_DDS_DCPSInfo_i:
Inheritance graph
[legend]
Collaboration diagram for TAO_DDS_DCPSInfo_i:
Collaboration graph
[legend]

Classes

struct  BIT_Cleanup_Handler
 

Public Member Functions

 TAO_DDS_DCPSInfo_i (CORBA::ORB_ptr orb, bool reincarnate, ShutdownInterface *shutdown, const TAO_DDS_DCPSFederationId &federation)
 
virtual ~TAO_DDS_DCPSInfo_i ()
 
virtual int handle_timeout (const ACE_Time_Value &now, const void *arg)
 
virtual CORBA::Boolean attach_participant (DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId)
 
virtual OpenDDS::DCPS::TopicStatus assert_topic (OpenDDS::DCPS::GUID_t_out topicId, DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const char *topicName, const char *dataTypeName, const DDS::TopicQos &qos, bool hasDcpsKey)
 
bool add_topic (const OpenDDS::DCPS::GUID_t &topicId, DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const char *topicName, const char *dataTypeName, const DDS::TopicQos &qos)
 Add a previously existing topic to the repository. More...
 
virtual OpenDDS::DCPS::TopicStatus find_topic (DDS::DomainId_t domainId, const char *topicName, CORBA::String_out dataTypeName, DDS::TopicQos_out qos, OpenDDS::DCPS::GUID_t_out topicId)
 
virtual OpenDDS::DCPS::TopicStatus remove_topic (DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const OpenDDS::DCPS::GUID_t &topicId)
 
virtual OpenDDS::DCPS::GUID_t add_publication (DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const OpenDDS::DCPS::GUID_t &topicId, OpenDDS::DCPS::DataWriterRemote_ptr publication, const DDS::DataWriterQos &qos, const OpenDDS::DCPS::TransportLocatorSeq &transInfo, const DDS::PublisherQos &publisherQos, const DDS::OctetSeq &serializedTypeInfo)
 
bool add_publication (DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const OpenDDS::DCPS::GUID_t &topicId, const OpenDDS::DCPS::GUID_t &pubId, const char *pub_str, const DDS::DataWriterQos &qos, const OpenDDS::DCPS::TransportLocatorSeq &transInfo, ACE_CDR::ULong transportContext, const DDS::PublisherQos &publisherQos, const DDS::OctetSeq &serializedTypeInfo, bool associate=false)
 Add a previously existing publication to the repository. More...
 
virtual void remove_publication (DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const OpenDDS::DCPS::GUID_t &publicationId)
 
virtual OpenDDS::DCPS::GUID_t add_subscription (DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const OpenDDS::DCPS::GUID_t &topicId, OpenDDS::DCPS::DataReaderRemote_ptr subscription, const DDS::DataReaderQos &qos, const OpenDDS::DCPS::TransportLocatorSeq &transInfo, const DDS::SubscriberQos &subscriberQos, const char *filterClassName, const char *filterExpression, const DDS::StringSeq &exprParams, const DDS::OctetSeq &serializedTypeInfo)
 
bool add_subscription (DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const OpenDDS::DCPS::GUID_t &topicId, const OpenDDS::DCPS::GUID_t &subId, const char *sub_str, const DDS::DataReaderQos &qos, const OpenDDS::DCPS::TransportLocatorSeq &transInfo, ACE_CDR::ULong transportContext, const DDS::SubscriberQos &subscriberQos, const char *filterClassName, const char *filterExpression, const DDS::StringSeq &exprParams, const DDS::OctetSeq &serializedTypeInfo, bool associate=false)
 Add a previously existing subscription to the repository. More...
 
virtual void remove_subscription (DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const OpenDDS::DCPS::GUID_t &subscriptionId)
 
virtual OpenDDS::DCPS::AddDomainStatus add_domain_participant (DDS::DomainId_t domain, const DDS::DomainParticipantQos &qos)
 
bool add_domain_participant (DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const DDS::DomainParticipantQos &qos)
 Add a previously existing participant to the repository. More...
 
virtual void remove_domain_participant (DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId)
 
bool remove_by_owner (DDS::DomainId_t domain, long owner)
 
virtual void disassociate_participant (DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &local_id, const OpenDDS::DCPS::GUID_t &remote_id)
 
virtual void disassociate_subscription (DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const OpenDDS::DCPS::GUID_t &local_id, const OpenDDS::DCPS::GUID_t &remote_id)
 
virtual void disassociate_publication (DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const OpenDDS::DCPS::GUID_t &local_id, const OpenDDS::DCPS::GUID_t &remote_id)
 
virtual void ignore_domain_participant (DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &myParticipantId, const OpenDDS::DCPS::GUID_t &ignoreId)
 
virtual void ignore_topic (DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &myParticipantId, const OpenDDS::DCPS::GUID_t &ignoreId)
 
virtual void ignore_subscription (DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &myParticipantId, const OpenDDS::DCPS::GUID_t &ignoreId)
 
virtual void ignore_publication (DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &myParticipantId, const OpenDDS::DCPS::GUID_t &ignoreId)
 
virtual CORBA::Boolean update_publication_qos (DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &partId, const OpenDDS::DCPS::GUID_t &dwId, const DDS::DataWriterQos &qos, const DDS::PublisherQos &publisherQos)
 
void update_publication_qos (DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &partId, const OpenDDS::DCPS::GUID_t &dwId, const DDS::DataWriterQos &qos)
 Entry for federation updates of DataWriterQos values. More...
 
void update_publication_qos (DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &partId, const OpenDDS::DCPS::GUID_t &dwId, const DDS::PublisherQos &qos)
 Entry for federation updates of PublisherQos values. More...
 
virtual CORBA::Boolean update_subscription_qos (DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &partId, const OpenDDS::DCPS::GUID_t &drId, const DDS::DataReaderQos &qos, const DDS::SubscriberQos &subscriberQos)
 
void update_subscription_qos (DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &partId, const OpenDDS::DCPS::GUID_t &drId, const DDS::DataReaderQos &qos)
 Entry for federation updates of DataReaderQos values. More...
 
void update_subscription_qos (DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &partId, const OpenDDS::DCPS::GUID_t &drId, const DDS::SubscriberQos &qos)
 Entry for federation updates of SubscriberQos values. More...
 
virtual ::CORBA::Boolean update_subscription_params (DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const OpenDDS::DCPS::GUID_t &subscriptionId, const DDS::StringSeq &params)
 
virtual CORBA::Boolean update_topic_qos (const OpenDDS::DCPS::GUID_t &topicId, DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const DDS::TopicQos &qos)
 
virtual CORBA::Boolean update_domain_participant_qos (DDS::DomainId_t domain, const OpenDDS::DCPS::GUID_t &participantId, const DDS::DomainParticipantQos &qos)
 
virtual void shutdown ()
 Cause the entire repository to exit. More...
 
virtual char * dump_to_string ()
 Dump the Repos state to string. More...
 
bool changeOwnership (DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, long sender, long owner)
 assert new ownership for a participant and its contained entities. More...
 
int init_transport (int listen_address_given, const char *listen_str)
 
bool receive_image (const Update::UImage &image)
 
void add (Update::Updater *updater)
 Add an additional Updater interface. More...
 
DCPS_IR_Domaindomain (DDS::DomainId_t domain)
 Convert a domain Id into a reference to a DCPS_IR_Domain object. More...
 
const DCPS_IR_Domain_Mapdomains () const
 Expose a readable reference of the domain map. More...
 
CORBA::ORB_ptr orb ()
 Expose the ORB. More...
 
bool init_persistence ()
 
bool init_reassociation (const ACE_Time_Value &delay)
 
bool init_dispatchChecking (const ACE_Time_Value &delay)
 
void finalize ()
 Cleanup state for shutdown. More...
 
void cleanup_all_built_in_topics ()
 
- Public Member Functions inherited from ACE_Event_Handler
virtual ~ACE_Event_Handler (void)
 
virtual ACE_HANDLE get_handle (void) const
 
virtual void set_handle (ACE_HANDLE)
 
virtual int priority (void) const
 
virtual void priority (int priority)
 
virtual int handle_input (ACE_HANDLE fd=ACE_INVALID_HANDLE)
 
virtual int handle_output (ACE_HANDLE fd=ACE_INVALID_HANDLE)
 
virtual int handle_exception (ACE_HANDLE fd=ACE_INVALID_HANDLE)
 
virtual int handle_exit (ACE_Process *)
 
virtual int handle_close (ACE_HANDLE handle, ACE_Reactor_Mask close_mask)
 
virtual int handle_signal (int signum, siginfo_t *=0, ucontext_t *=0)
 
virtual int resume_handler (void)
 
virtual int handle_qos (ACE_HANDLE=ACE_INVALID_HANDLE)
 
virtual int handle_group_qos (ACE_HANDLE=ACE_INVALID_HANDLE)
 
virtual void reactor (ACE_Reactor *reactor)
 
virtual ACE_Reactorreactor (void) const
 
virtual ACE_Reactor_Timer_Interfacereactor_timer_interface (void) const
 
virtual Reference_Count add_reference (void)
 
virtual Reference_Count remove_reference (void)
 
Reference_Counting_Policyreference_counting_policy (void)
 

Private Attributes

DCPS_IR_Domain_Map domains_
 
CORBA::ORB_var orb_
 
CORBA::ORB_var dispatchingOrb_
 
const TAO_DDS_DCPSFederationIdfederation_
 
OpenDDS::DCPS::RepoIdGenerator participantIdGenerator_
 
Update::Managerum_
 
bool reincarnate_
 
ShutdownInterfaceshutdown_
 Interface to effect shutdown of the process. More...
 
ACE_Recursive_Thread_Mutex lock_
 
long reassociate_timer_id_
 
long dispatch_check_timer_id_
 
bool in_cleanup_all_built_in_topics_
 

Additional Inherited Members

- Public Types inherited from ACE_Event_Handler
typedef long Reference_Count
 
- Static Public Member Functions inherited from ACE_Event_Handler
static ACE_THR_FUNC_RETURN read_adapter (void *event_handler)
 
static int register_stdin_handler (ACE_Event_Handler *eh, ACE_Reactor *reactor, ACE_Thread_Manager *thr_mgr, int flags=THR_DETACHED)
 
static int remove_stdin_handler (ACE_Reactor *reactor, ACE_Thread_Manager *thr_mgr)
 
- Public Attributes inherited from ACE_Event_Handler
 LO_PRIORITY
 
 HI_PRIORITY
 
 NULL_MASK
 
 READ_MASK
 
 WRITE_MASK
 
 EXCEPT_MASK
 
 ACCEPT_MASK
 
 CONNECT_MASK
 
 TIMER_MASK
 
 QOS_MASK
 
 GROUP_QOS_MASK
 
 SIGNAL_MASK
 
 ALL_EVENTS_MASK
 
 RWE_MASK
 
 DONT_CALL
 
 ACE_EVENT_HANDLER_NOT_RESUMED
 
 ACE_REACTOR_RESUMES_HANDLER
 
 ACE_APPLICATION_RESUMES_HANDLER
 
- Protected Types inherited from ACE_Event_Handler
typedef ACE_Atomic_Op< ACE_SYNCH_MUTEX, Reference_CountAtomic_Reference_Count
 
- Protected Member Functions inherited from ACE_Event_Handler
 ACE_Event_Handler (ACE_Reactor *=0, int priority=ACE_Event_Handler::LO_PRIORITY)
 
- Protected Attributes inherited from ACE_Event_Handler
Atomic_Reference_Count reference_count_
 

Detailed Description

Implementation of the DCPSInfo.

This is the Information Repository object. Clients of the system will use the CORBA reference of this object.

Definition at line 53 of file DCPSInfo_i.h.

Constructor & Destructor Documentation

◆ TAO_DDS_DCPSInfo_i()

TAO_DDS_DCPSInfo_i::TAO_DDS_DCPSInfo_i ( CORBA::ORB_ptr  orb,
bool  reincarnate,
ShutdownInterface shutdown,
const TAO_DDS_DCPSFederationId federation 
)

Definition at line 42 of file DCPSInfo_i.cpp.

References _duplicate(), dispatchingOrb_, CORBA::ORB_init(), and TheServiceParticipant.

47  , federation_(federation)
48  , participantIdGenerator_(federation.id())
49  , um_(0)
50  , reincarnate_(reincarnate)
51  , shutdown_(shutdown)
54 #ifndef DDS_HAS_MINIMUM_BIT
56 #endif
57 {
58  if (!TheServiceParticipant->use_bidir_giop()) {
59  int argc = 0;
60  char** no_argv = 0;
61  dispatchingOrb_ = CORBA::ORB_init(argc, no_argv, "dispatchingOnly");
62  }
63 }
Update::Manager * um_
Definition: DCPSInfo_i.h:421
const TAO_DDS_DCPSFederationId & federation_
Definition: DCPSInfo_i.h:418
long dispatch_check_timer_id_
Definition: DCPSInfo_i.h:430
static CORBA::ORB_ptr _duplicate(CORBA::ORB_ptr orb)
ShutdownInterface * shutdown_
Interface to effect shutdown of the process.
Definition: DCPSInfo_i.h:425
ORB_ptr ORB_init(int &argc, char *argv[], const char *orb_name=0)
OpenDDS::DCPS::RepoIdGenerator participantIdGenerator_
Definition: DCPSInfo_i.h:419
long reassociate_timer_id_
Definition: DCPSInfo_i.h:429
void id(RepoKey fedId)
bool in_cleanup_all_built_in_topics_
Definition: DCPSInfo_i.h:450
CORBA::ORB_var dispatchingOrb_
Definition: DCPSInfo_i.h:416
#define TheServiceParticipant
CORBA::ORB_var orb_
Definition: DCPSInfo_i.h:415

◆ ~TAO_DDS_DCPSInfo_i()

TAO_DDS_DCPSInfo_i::~TAO_DDS_DCPSInfo_i ( )
virtual

Definition at line 65 of file DCPSInfo_i.cpp.

66 {
67 }

Member Function Documentation

◆ add()

void TAO_DDS_DCPSInfo_i::add ( Update::Updater updater)

Add an additional Updater interface.

Definition at line 2403 of file DCPSInfo_i.cpp.

References Update::Manager::add(), and um_.

2404 {
2405  if (this->um_) {
2406  this->um_->add(updater);
2407  }
2408 }
Update::Manager * um_
Definition: DCPSInfo_i.h:421
void add(TAO_DDS_DCPSInfo_i *info)

◆ add_domain_participant() [1/2]

OpenDDS::DCPS::AddDomainStatus TAO_DDS_DCPSInfo_i::add_domain_participant ( DDS::DomainId_t  domain,
const DDS::DomainParticipantQos qos 
)
virtual

Definition at line 1010 of file DCPSInfo_i.cpp.

References ACE_DEBUG, ACE_GUARD_RETURN, ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, domain(), OpenDDS::DCPS::AddDomainStatus::federated, federation_, OpenDDS::DCPS::RcHandle< T >::get(), DCPS_IR_Domain::get_next_participant_id(), OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::AddDomainStatus::id, LM_DEBUG, lock_, TAO_DDS_DCPSFederationId::overridden(), OpenDDS::DCPS::RepoIdConverter::participantId(), DCPS_IR_Domain::participants(), TheServiceParticipant, um_, and value.

Referenced by Update::Manager::add(), OpenDDS::Federator::ManagerImpl::processCreate(), and receive_image().

1013 {
1014  // A value to return.
1017  value.federated = this->federation_.overridden();
1018 
1019  ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, value);
1020 
1021  // Grab the domain.
1022  DCPS_IR_Domain* domainPtr = this->domain(domain);
1023 
1024  if (0 == domainPtr) {
1026  }
1027 
1028  // Obtain a shiny new GUID value.
1029  OpenDDS::DCPS::GUID_t participantId = domainPtr->get_next_participant_id();
1030 
1031  // Determine if this is the 'special' repository internal participant
1032  // that publishes the built-in topics for a domain.
1033  bool isBitPart = domainPtr->participants().empty() && TheServiceParticipant->get_BIT();
1034 
1035  DCPS_IR_Participant_rch participant =
1036  OpenDDS::DCPS::make_rch<DCPS_IR_Participant>(
1037  this->federation_,
1038  participantId,
1039  domainPtr,
1040  qos, um_, isBitPart);
1041 
1042  // We created the participant, now we can return the Id value (eventually).
1043  value.id = participantId;
1044 
1045  if (isBitPart) {
1046  participant->isBitPublisher() = true;
1047 
1049  OpenDDS::DCPS::RepoIdConverter converter(participantId);
1050  ACE_DEBUG((LM_DEBUG,
1051  ACE_TEXT("(%P|%t) (GUID_t)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
1052  ACE_TEXT("participant %C in domain %d is BIT publisher for this domain.\n"),
1053  std::string(converter).c_str(),
1054  domain));
1055  }
1056  }
1057 
1058  // Assume responsibility for writing back to the participant.
1059  participant->takeOwnership();
1060 
1061  int status = domainPtr->add_participant(participant);
1062 
1063  if (0 != status) {
1064  // Adding the participant failed return the invalid
1065  // participant Id number.
1066  participantId = OpenDDS::DCPS::GUID_UNKNOWN;
1067 
1068  } else if (this->um_) {
1069  OpenDDS::DCPS::RepoIdConverter converter(participantId);
1070  if (participant->isBitPublisher() == false) {
1071  // Push this participant to interested observers.
1072  Update::UParticipant updateParticipant(
1073  domain,
1074  participant->owner(),
1075  participantId,
1076  const_cast<DDS::DomainParticipantQos &>(qos));
1077  this->um_->create(updateParticipant);
1078 
1080  ACE_DEBUG((LM_DEBUG,
1081  ACE_TEXT("(%P|%t) (GUID_t)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
1082  ACE_TEXT("pushing creation of participant %C in domain %d.\n"),
1083  std::string(converter).c_str(),
1084  domain));
1085  }
1086  }
1087 
1088  // Update what the last participant id was
1089  um_->updateLastPartId(converter.participantId());
1090  }
1091 
1093  OpenDDS::DCPS::RepoIdConverter converter(participantId);
1094  ACE_DEBUG((LM_DEBUG,
1095  ACE_TEXT("(%P|%t) (GUID_t)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
1096  ACE_TEXT("domain %d loaded participant %C at 0x%x.\n"),
1097  domain,
1098  std::string(converter).c_str(),
1099  participant.get()));
1100  }
1101  return value;
1102 }
DCPS_IR_Domain * domain(DDS::DomainId_t domain)
Convert a domain Id into a reference to a DCPS_IR_Domain object.
#define ACE_DEBUG(X)
Update::Manager * um_
Definition: DCPSInfo_i.h:421
const LogLevel::Value value
Definition: debug.cpp:61
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
ACE_Recursive_Thread_Mutex lock_
Definition: DCPSInfo_i.h:427
OpenDDS::DCPS::GUID_t get_next_participant_id()
Next Entity Id value in sequence.
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
const TAO_DDS_DCPSFederationId & federation_
Definition: DCPSInfo_i.h:418
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
#define TheServiceParticipant
Representation of a Domain in the system.
const DCPS_IR_Participant_Map & participants() const
Expose a readable reference to the participant map.

◆ add_domain_participant() [2/2]

bool TAO_DDS_DCPSInfo_i::add_domain_participant ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::GUID_t participantId,
const DDS::DomainParticipantQos qos 
)

Add a previously existing participant to the repository.

Parameters
domainIdthe Domain in which the Participant is contained.
participantIdthe GUID Id value to use for the Participant.
qosthe QoS value of the Participant.

Adds a Participant to the repository using a specified Participant GUID Id value. If the ParticipantId indicates that this Participant was created by within this repository (the federation Id is the current repositories federation Id), this method will ensure that any subsequent calls to add a Publication and obtain a newly generated Id value will return an Id value greater than the Id value of the current one.

Definition at line 1105 of file DCPSInfo_i.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_GUARD_RETURN, ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, domain(), federation_, OpenDDS::DCPS::RepoIdConverter::federationId(), OpenDDS::DCPS::RcHandle< T >::in(), LM_DEBUG, LM_ERROR, LM_WARNING, lock_, DCPS_IR_Domain::participant(), OpenDDS::DCPS::RepoIdConverter::participantId(), DCPS_IR_Domain::participants(), TheServiceParticipant, and um_.

1108 {
1109  ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, false);
1110 
1111  // Grab the domain.
1112  DCPS_IR_Domain* domainPtr = this->domain(domainId);
1113 
1114  if (0 == domainPtr) {
1116  ACE_DEBUG((LM_WARNING,
1117  ACE_TEXT("(%P|%t) WARNING: (bool)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
1118  ACE_TEXT("invalid domain Id: %d\n"),
1119  domainId));
1120  }
1121 
1122  return false;
1123  }
1124 
1125  // Prepare to manipulate the participant's Id value.
1126  OpenDDS::DCPS::RepoIdConverter converter(participantId);
1127 
1128  // Determine if this is the 'special' repository internal participant
1129  // that publishes the built-in topics for a domain.
1130  bool isBitPart = domainPtr->participants().empty() && TheServiceParticipant->get_BIT();
1131 
1132  // Grab the participant.
1133  DCPS_IR_Participant* partPtr = domainPtr->participant(participantId);
1134 
1135  if (0 != partPtr) {
1137  ACE_ERROR((LM_ERROR,
1138  ACE_TEXT("(%P|%t) (bool)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
1139  ACE_TEXT("participant id %C already exists.\n"),
1140  std::string(converter).c_str()));
1141  }
1142 
1143  return false;
1144  }
1145 
1146  DCPS_IR_Participant_rch participant =
1147  OpenDDS::DCPS::make_rch<DCPS_IR_Participant>(this->federation_,
1148  participantId,
1149  domainPtr,
1150  qos, um_, isBitPart);
1151 
1152  switch (domainPtr->add_participant(participant)) {
1153  case -1: {
1154  ACE_ERROR((LM_ERROR,
1155  ACE_TEXT("(%P|%t) ERROR: (bool)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
1156  ACE_TEXT("failed to load participant %C in domain %d.\n"),
1157  std::string(converter).c_str(),
1158  domainId));
1159  }
1160  return false;
1161 
1162  case 1:
1163 
1165  ACE_DEBUG((LM_WARNING,
1166  ACE_TEXT("(%P|%t) WARNING: (bool)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
1167  ACE_TEXT("attempt to load duplicate participant %C in domain %d.\n"),
1168  std::string(converter).c_str(),
1169  domainId));
1170  }
1171 
1172  return false;
1173 
1174  case 0:
1175  default:
1176  break;
1177  }
1178 
1179  // See if we are adding a participant that was created within this
1180  // repository or a different repository.
1181  if (converter.federationId() == this->federation_.id()) {
1182  // Ensure the participant GUID values do not conflict.
1183  domainPtr->last_participant_key(converter.participantId());
1184 
1186  ACE_DEBUG((LM_DEBUG,
1187  ACE_TEXT("(%P|%t) (bool)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
1188  ACE_TEXT("Adjusting highest participant Id value to at least %d.\n"),
1189  converter.participantId()));
1190  }
1191  }
1192 
1194  ACE_DEBUG((LM_DEBUG,
1195  ACE_TEXT("(%P|%t) (bool)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
1196  ACE_TEXT("loaded participant %C at 0x%x in domain %d.\n"),
1197  std::string(converter).c_str(),
1198  participant.in(),
1199  domainId));
1200  }
1201 
1202  return true;
1203 }
DCPS_IR_Domain * domain(DDS::DomainId_t domain)
Convert a domain Id into a reference to a DCPS_IR_Domain object.
#define ACE_DEBUG(X)
Update::Manager * um_
Definition: DCPSInfo_i.h:421
#define ACE_ERROR(X)
ACE_Recursive_Thread_Mutex lock_
Definition: DCPSInfo_i.h:427
Representative of the Domain Participant.
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
const TAO_DDS_DCPSFederationId & federation_
Definition: DCPSInfo_i.h:418
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
#define TheServiceParticipant
Representation of a Domain in the system.
const DCPS_IR_Participant_Map & participants() const
Expose a readable reference to the participant map.
DCPS_IR_Participant * participant(const OpenDDS::DCPS::GUID_t &id) const
Find the participant with the id.

◆ add_publication() [1/2]

OpenDDS::DCPS::GUID_t TAO_DDS_DCPSInfo_i::add_publication ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::GUID_t participantId,
const OpenDDS::DCPS::GUID_t topicId,
OpenDDS::DCPS::DataWriterRemote_ptr  publication,
const DDS::DataWriterQos qos,
const OpenDDS::DCPS::TransportLocatorSeq transInfo,
const DDS::PublisherQos publisherQos,
const DDS::OctetSeq serializedTypeInfo 
)
virtual

Definition at line 374 of file DCPSInfo_i.cpp.

References ACE_DEBUG, ACE_GUARD_RETURN, ACE_TEXT(), DCPS_IR_Participant::add_publication(), DCPS_IR_Topic::add_publication_reference(), Update::Manager::create(), Update::DataWriter, OpenDDS::DCPS::DCPS_debug_level, dispatchingOrb_, domains_, OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), DCPS_IR_Participant::get_next_publication_id(), OpenDDS::DCPS::GUID_UNKNOWN, TAO::String_var< charT >::in(), CORBA::is_nil(), DCPS_IR_Participant::isBitPublisher(), LM_DEBUG, LM_WARNING, lock_, OpenDDS::DCPS::move(), orb_, DCPS_IR_Participant::remove_publication(), OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL::transportContextDefault, and um_.

Referenced by Update::Manager::add(), OpenDDS::Federator::ManagerImpl::processCreate(), OpenDDS::Federator::ManagerImpl::processDeferred(), and receive_image().

383 {
384  if (CORBA::is_nil(publication)) {
386  ACE_DEBUG((LM_WARNING,
387  ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_publication: ")
388  ACE_TEXT("invalid publication reference.\n")));
389  }
391  }
392 
394 
395  // Grab the domain.
396  DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
397 
398  if (where == this->domains_.end()) {
400  }
401 
402  // Grab the participant.
403  DCPS_IR_Participant* partPtr
404  = where->second->participant(participantId);
405 
406  if (0 == partPtr) {
408  }
409 
410  DCPS_IR_Topic* topic = where->second->find_topic(topicId);
411 
412  if (topic == 0) {
414  }
415 
416  // Get a Id for the Writer, make it a builtin kind if this is for a BIT
418  OpenDDS::DCPS::RepoIdConverter(topicId).isBuiltinDomainEntity());
419 
420  OpenDDS::DCPS::DataWriterRemote_var dispatchingPublication =
421  OpenDDS::DCPS::DataWriterRemote::_duplicate(publication);
422 
423  if (dispatchingOrb_) {
424  // Remarshall the remote reference onto the dispatching orb.
425  CORBA::String_var pubStr = orb_->object_to_string(dispatchingPublication);
426  CORBA::Object_var pubObj = dispatchingOrb_->string_to_object(pubStr);
427  if (CORBA::is_nil(pubObj)) {
429  ACE_DEBUG((LM_WARNING,
430  ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_publication: ")
431  ACE_TEXT("failure marshalling publication on dispatching orb.\n")));
432  }
434  }
435 
436  dispatchingPublication = OpenDDS::DCPS::DataWriterRemote::_unchecked_narrow(pubObj);
437  }
438 
441  pubId,
442  partPtr,
443  topic,
444  dispatchingPublication.in(),
445  qos,
446  transInfo,
448  publisherQos,
449  serializedTypeInfo));
450 
451  DCPS_IR_Publication* pub = pubPtr.get();
452  if (partPtr->add_publication(OpenDDS::DCPS::move(pubPtr)) != 0) {
453  // failed to add. we are responsible for the memory.
455  } else if (topic->add_publication_reference(pub) != 0) {
456  // Failed to add to the topic
457  // so remove from participant and fail.
458  partPtr->remove_publication(pubId);
460  }
461 
462  if (this->um_ && (partPtr->isBitPublisher() == false)) {
463  CORBA::String_var callback = orb_->object_to_string(publication);
465 
466  Update::UWActor actor(domainId, pubId, topicId, participantId, Update::DataWriter
467  , callback.in()
468  , const_cast<DDS::PublisherQos&>(publisherQos)
469  , const_cast<DDS::DataWriterQos&>(qos)
470  , const_cast<OpenDDS::DCPS::TransportLocatorSeq&>(transInfo)
472  , const_cast<DDS::OctetSeq&>(serializedTypeInfo));
473  this->um_->create(actor);
474 
476  OpenDDS::DCPS::RepoIdConverter converter(pubId);
477  ACE_DEBUG((LM_DEBUG,
478  ACE_TEXT("(%P|%t) (GUID_t)TAO_DDS_DCPSInfo_i::add_publication: ")
479  ACE_TEXT("pushing creation of publication %C in domain %d.\n"),
480  std::string(converter).c_str(),
481  domainId));
482  }
483  }
484 
485  where->second->remove_dead_participants();
486  return pubId;
487 }
#define ACE_DEBUG(X)
Update::Manager * um_
Definition: DCPSInfo_i.h:421
int add_publication(OpenDDS::DCPS::unique_ptr< DCPS_IR_Publication > pub)
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
const ACE_CDR::ULong transportContextDefault
Definition: DCPSInfo_i.cpp:38
Representative of a Publication.
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
sequence< TransportLocator > TransportLocatorSeq
int remove_publication(OpenDDS::DCPS::GUID_t pubId)
ACE_Recursive_Thread_Mutex lock_
Definition: DCPSInfo_i.h:427
Representative of a Topic.
Definition: DCPS_IR_Topic.h:43
Representative of the Domain Participant.
int add_publication_reference(DCPS_IR_Publication *publication, bool associate=true)
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
void create(const UType &info)
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
DCPS_IR_Domain_Map domains_
Definition: DCPSInfo_i.h:414
CORBA::ORB_var dispatchingOrb_
Definition: DCPSInfo_i.h:416
const character_type * in(void) const
CORBA::ORB_var orb_
Definition: DCPSInfo_i.h:415
Boolean is_nil(T x)
OpenDDS::DCPS::GUID_t get_next_publication_id(bool builtin)

◆ add_publication() [2/2]

bool TAO_DDS_DCPSInfo_i::add_publication ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::GUID_t participantId,
const OpenDDS::DCPS::GUID_t topicId,
const OpenDDS::DCPS::GUID_t pubId,
const char *  pub_str,
const DDS::DataWriterQos qos,
const OpenDDS::DCPS::TransportLocatorSeq transInfo,
ACE_CDR::ULong  transportContext,
const DDS::PublisherQos publisherQos,
const DDS::OctetSeq serializedTypeInfo,
bool  associate = false 
)

Add a previously existing publication to the repository.

Parameters
domainIdthe Domain in which the Publication is contained.
participantIdthe Participant in which the Publication is contained.
topicIdthe Topic of the Publication.
pubIdthe GUID Id value to use for the Publication.
pub_strstringified publication callback to DataWriter.
qosthe QoS value of the DataWriter.
transInfothe transport information for the Publication.
publisherQosthe QoS value of the Publisher.
associateindicate whether to create new associations.

Adds a Publication to the repository using a specified Publication GUID Id value. If the PublicationId indicates that this Publication was created by within this repository (the federation Id is the current repositories federation Id), this method will ensure that any subsequent calls to add a Publication and obtain a newly generated Id value will return an Id value greater than the Id value of the current one.

: Check if this is already stored. If so, just clear the callback IOR.

Definition at line 490 of file DCPSInfo_i.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_GUARD_RETURN, ACE_TEXT(), DCPS_IR_Participant::add_publication(), DCPS_IR_Topic::add_publication_reference(), OpenDDS::DCPS::DCPS_debug_level, dispatchingOrb_, domains_, OpenDDS::DCPS::GuidConverter::entityKey(), federation_, OpenDDS::DCPS::RepoIdConverter::federationId(), TAO_DDS_DCPSFederationId::id(), TAO_Pseudo_Var_T< Object >::in(), CORBA::is_nil(), DCPS_IR_Participant::last_publication_key(), LM_ERROR, LM_WARNING, lock_, OpenDDS::DCPS::move(), orb_, and DCPS_IR_Participant::remove_publication().

501 {
502  ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, false);
503 
504  // Grab the domain.
505  DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
506 
507  if (where == this->domains_.end()) {
509  ACE_DEBUG((LM_WARNING,
510  ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_publication: ")
511  ACE_TEXT("invalid domain %d.\n"),
512  domainId));
513  }
514 
515  return false;
516  }
517 
518  // Grab the participant.
519  DCPS_IR_Participant* partPtr
520  = where->second->participant(participantId);
521 
522  if (0 == partPtr) {
524  OpenDDS::DCPS::RepoIdConverter converter(pubId);
525  ACE_DEBUG((LM_WARNING,
526  ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_publication: ")
527  ACE_TEXT("invalid participant %C in domain %d.\n"),
528  std::string(converter).c_str(),
529  domainId));
530  }
531 
532  return false;
533  }
534 
535  DCPS_IR_Topic* topic = where->second->find_topic(topicId);
536 
537  if (topic == 0) {
538  OpenDDS::DCPS::RepoIdConverter converter(topicId);
539  ACE_DEBUG((LM_WARNING,
540  ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_publication: ")
541  ACE_TEXT("invalid topic %C in domain %d.\n"),
542  std::string(converter).c_str(),
543  domainId));
544  return false;
545  }
546 
547  /// @TODO: Check if this is already stored. If so, just clear the callback IOR.
548 
549  CORBA::Object_var obj = (dispatchingOrb_ ? dispatchingOrb_ : orb_)->string_to_object(pub_str);
550  if (CORBA::is_nil(obj.in())) {
552  ACE_DEBUG((LM_WARNING,
553  ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_publication: ")
554  ACE_TEXT("failure converting string %C to objref\n"),
555  pub_str));
556  }
557  return false;
558  }
559 
560  OpenDDS::DCPS::DataWriterRemote_var publication = OpenDDS::DCPS::DataWriterRemote::_unchecked_narrow(obj.in());
561 
564  pubId,
565  partPtr,
566  topic,
567  publication.in(),
568  qos,
569  transInfo,
570  transportContext,
571  publisherQos,
572  serializedTypeInfo));
573 
574  DCPS_IR_Publication* pub = pubPtr.get();
575  switch (partPtr->add_publication(move(pubPtr))) {
576  case -1: {
577  OpenDDS::DCPS::RepoIdConverter converter(pubId);
578  ACE_ERROR((LM_ERROR,
579  ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::add_publication: ")
580  ACE_TEXT("failed to add publication to participant %C.\n"),
581  std::string(converter).c_str()));
582  return false;
583  }
584 
585  case 1:
586  return false;
587  case 0:
588  default:
589  break;
590  }
591 
592  switch (topic->add_publication_reference(pub, associate)) {
593  case -1: {
594  OpenDDS::DCPS::RepoIdConverter converter(pubId);
595  ACE_ERROR((LM_ERROR,
596  ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::add_publication: ")
597  ACE_TEXT("failed to add publication to participant %C topic list.\n"),
598  std::string(converter).c_str()));
599 
600  // Remove the publication.
601  partPtr->remove_publication(pubId);
602 
603  }
604  return false;
605 
606  case 1: // This is actually a really really bad place to jump to.
607  // This means that we successfully added the new publication
608  // to the participant (it had not been inserted before) but
609  // that we are adding a duplicate publication to the topic
610  // list - which should never ever be able to happen.
611  return false;
612 
613  case 0:
614  default:
615  break;
616  }
617 
618  OpenDDS::DCPS::RepoIdConverter converter(pubId);
619 
620  // See if we are adding a publication that was created within this
621  // repository or a different repository.
622  if (converter.federationId() == federation_.id()) {
623  // Ensure the publication GUID_t values do not conflict.
624  partPtr->last_publication_key(converter.entityKey());
625  }
626 
627  return true;
628 }
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
int add_publication(OpenDDS::DCPS::unique_ptr< DCPS_IR_Publication > pub)
Representative of a Publication.
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
int remove_publication(OpenDDS::DCPS::GUID_t pubId)
ACE_Recursive_Thread_Mutex lock_
Definition: DCPSInfo_i.h:427
Representative of a Topic.
Definition: DCPS_IR_Topic.h:43
Representative of the Domain Participant.
int add_publication_reference(DCPS_IR_Publication *publication, bool associate=true)
void last_publication_key(long key)
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
const TAO_DDS_DCPSFederationId & federation_
Definition: DCPSInfo_i.h:418
_in_type in(void) const
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
DCPS_IR_Domain_Map domains_
Definition: DCPSInfo_i.h:414
void id(RepoKey fedId)
CORBA::ORB_var dispatchingOrb_
Definition: DCPSInfo_i.h:416
CORBA::ORB_var orb_
Definition: DCPSInfo_i.h:415
Boolean is_nil(T x)

◆ add_subscription() [1/2]

OpenDDS::DCPS::GUID_t TAO_DDS_DCPSInfo_i::add_subscription ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::GUID_t participantId,
const OpenDDS::DCPS::GUID_t topicId,
OpenDDS::DCPS::DataReaderRemote_ptr  subscription,
const DDS::DataReaderQos qos,
const OpenDDS::DCPS::TransportLocatorSeq transInfo,
const DDS::SubscriberQos subscriberQos,
const char *  filterClassName,
const char *  filterExpression,
const DDS::StringSeq exprParams,
const DDS::OctetSeq serializedTypeInfo 
)
virtual

Definition at line 681 of file DCPSInfo_i.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_GUARD_RETURN, ACE_TEXT(), DCPS_IR_Participant::add_subscription(), DCPS_IR_Topic::add_subscription_reference(), Update::Manager::create(), Update::DataReader, OpenDDS::DCPS::DCPS_debug_level, dispatchingOrb_, domains_, OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), DCPS_IR_Participant::get_next_subscription_id(), OpenDDS::DCPS::GUID_UNKNOWN, TAO::String_var< charT >::in(), TAO_Pseudo_Var_T< Object >::in(), CORBA::is_nil(), DCPS_IR_Participant::isBitPublisher(), LM_DEBUG, LM_ERROR, LM_WARNING, lock_, OpenDDS::DCPS::move(), orb_, DCPS_IR_Domain::participant(), DCPS_IR_Domain::remove_dead_participants(), DCPS_IR_Participant::remove_subscription(), OpenDDS::DCPS::unique_ptr< T, Deleter >::reset(), OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL::transportContextDefault, and um_.

Referenced by Update::Manager::add(), OpenDDS::Federator::ManagerImpl::processCreate(), OpenDDS::Federator::ManagerImpl::processDeferred(), and receive_image().

693 {
694  if (CORBA::is_nil(subscription)) {
696  ACE_DEBUG((LM_WARNING,
697  ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_subscription: ")
698  ACE_TEXT("invalid subscription reference.\n")));
699  }
701  }
702 
703  DCPS_IR_Domain* domainPtr;
704  DCPS_IR_Participant* partPtr;
705  DCPS_IR_Topic* topic;
706  OpenDDS::DCPS::GUID_t subId;
708  {
710 
711  // Grab the domain.
712  DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
713 
714  if (where == this->domains_.end()) {
716  }
717 
718  // Grab the domain and participant.
719  domainPtr = where->second.get();
720  partPtr = domainPtr->participant(participantId);
721 
722  if (0 == partPtr) {
724  }
725 
726  topic = where->second->find_topic(topicId);
727 
728  if (topic == 0) {
730  }
731 
732  // Get a Id for the Reader, make it a builtin kind if this is for a BIT
733  subId = partPtr->get_next_subscription_id(
734  OpenDDS::DCPS::RepoIdConverter(topicId).isBuiltinDomainEntity());
735 
736  OpenDDS::DCPS::DataReaderRemote_var dispatchingSubscription (
737  OpenDDS::DCPS::DataReaderRemote::_duplicate(subscription));
738 
739  if (dispatchingOrb_) {
740  // Remarshall the remote reference onto the dispatching orb.
741  CORBA::String_var subStr = orb_->object_to_string(dispatchingSubscription);
742  CORBA::Object_var subObj = dispatchingOrb_->string_to_object(subStr);
743  if (CORBA::is_nil(subObj.in())) {
745  ACE_DEBUG((LM_WARNING,
746  ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_subscription: ")
747  ACE_TEXT("failure marshalling subscription on dispatching orb.\n")));
748  }
750  }
751  dispatchingSubscription = OpenDDS::DCPS::DataReaderRemote::_unchecked_narrow(subObj);
752  }
753 
754  subPtr.reset(
756  subId,
757  partPtr,
758  topic,
759  dispatchingSubscription.in(),
760  qos,
761  transInfo,
763  subscriberQos,
764  filterClassName,
765  filterExpression,
766  exprParams,
767  serializedTypeInfo));
768 
769  // Release lock
770  }
771 
772  DCPS_IR_Subscription* sub = subPtr.get();
773  if (partPtr->add_subscription(move(subPtr)) != 0) {
774  // failed to add. we are responsible for the memory.
776  } else if (topic->add_subscription_reference(sub) != 0) {
777  ACE_ERROR((LM_ERROR, ACE_TEXT("Failed to add subscription to topic list.\n")));
778  // No associations were made so remove and fail.
779  partPtr->remove_subscription(subId);
781  }
782 
783  if (this->um_ && (partPtr->isBitPublisher() == false)) {
784  CORBA::String_var callback = orb_->object_to_string(subscription);
785  Update::ContentSubscriptionInfo csi(filterClassName, filterExpression, exprParams);
786 
787  Update::URActor actor(domainId, subId, topicId, participantId, Update::DataReader
788  , callback.in()
789  , const_cast<DDS::SubscriberQos&>(subscriberQos)
790  , const_cast<DDS::DataReaderQos&>(qos)
791  , const_cast<OpenDDS::DCPS::TransportLocatorSeq&>(transInfo)
793  , const_cast<DDS::OctetSeq&>(serializedTypeInfo));
794 
795  this->um_->create(actor);
796 
798  OpenDDS::DCPS::RepoIdConverter converter(subId);
799  ACE_DEBUG((LM_DEBUG,
800  ACE_TEXT("(%P|%t) (GUID_t)TAO_DDS_DCPSInfo_i::add_subscription: ")
801  ACE_TEXT("pushing creation of subscription %C in domain %d.\n"),
802  std::string(converter).c_str(),
803  domainId));
804  }
805  }
806 
807  domainPtr->remove_dead_participants();
808 
809  return subId;
810 }
#define ACE_DEBUG(X)
Update::Manager * um_
Definition: DCPSInfo_i.h:421
#define ACE_ERROR(X)
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
const ACE_CDR::ULong transportContextDefault
Definition: DCPSInfo_i.cpp:38
int add_subscription(OpenDDS::DCPS::unique_ptr< DCPS_IR_Subscription > sub)
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
sequence< TransportLocator > TransportLocatorSeq
ACE_Recursive_Thread_Mutex lock_
Definition: DCPSInfo_i.h:427
Representative of a Topic.
Definition: DCPS_IR_Topic.h:43
Representative of the Domain Participant.
Representative of a Subscription.
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
void remove_dead_participants(bool part_of_cleanup=false)
Remove any participants currently marked as dead.
int add_subscription_reference(DCPS_IR_Subscription *subscription, bool associate=true)
void create(const UType &info)
_in_type in(void) const
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
DCPS_IR_Domain_Map domains_
Definition: DCPSInfo_i.h:414
CORBA::ORB_var dispatchingOrb_
Definition: DCPSInfo_i.h:416
const character_type * in(void) const
CORBA::ORB_var orb_
Definition: DCPSInfo_i.h:415
Representation of a Domain in the system.
Boolean is_nil(T x)
int remove_subscription(OpenDDS::DCPS::GUID_t subId)
OpenDDS::DCPS::GUID_t get_next_subscription_id(bool builtin)
DCPS_IR_Participant * participant(const OpenDDS::DCPS::GUID_t &id) const
Find the participant with the id.

◆ add_subscription() [2/2]

bool TAO_DDS_DCPSInfo_i::add_subscription ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::GUID_t participantId,
const OpenDDS::DCPS::GUID_t topicId,
const OpenDDS::DCPS::GUID_t subId,
const char *  sub_str,
const DDS::DataReaderQos qos,
const OpenDDS::DCPS::TransportLocatorSeq transInfo,
ACE_CDR::ULong  transportContext,
const DDS::SubscriberQos subscriberQos,
const char *  filterClassName,
const char *  filterExpression,
const DDS::StringSeq exprParams,
const DDS::OctetSeq serializedTypeInfo,
bool  associate = false 
)

Add a previously existing subscription to the repository.

Parameters
domainIdthe Domain in which the Subscription is contained.
participantIdthe Participant in which the Subscription is contained.
topicIdthe Topic of the Subscription.
subIdthe GUID Id value to use for the Subscription.
sub_strstringified publication callback to DataReader.
qosthe QoS value of the DataReader.
transInfothe transport information for the Subscription.
subscriberQosthe QoS value of the Subscriber.
associateindicate whether to create new associations.

Adds a Subscription to the repository using a specified Subscription GUID Id value. If the SubscriptionId indicates that this Subscription was created by within this repository (the federation Id is the current repositories federation Id), this method will ensure that any subsequent calls to add a Publication and obtain a newly generated Id value will return an Id value greater than the Id value of the current one.

Definition at line 813 of file DCPSInfo_i.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_GUARD_RETURN, ACE_TEXT(), DCPS_IR_Participant::add_subscription(), DCPS_IR_Topic::add_subscription_reference(), OpenDDS::DCPS::DCPS_debug_level, dispatchingOrb_, domains_, OpenDDS::DCPS::GuidConverter::entityKey(), federation_, OpenDDS::DCPS::RepoIdConverter::federationId(), TAO_DDS_DCPSFederationId::id(), TAO_Pseudo_Var_T< Object >::in(), CORBA::is_nil(), DCPS_IR_Participant::last_subscription_key(), LM_ERROR, LM_WARNING, lock_, OpenDDS::DCPS::move(), orb_, and DCPS_IR_Participant::remove_subscription().

828 {
829  ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, false);
830 
831  // Grab the domain.
832  DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
833 
834  if (where == this->domains_.end()) {
836  ACE_DEBUG((LM_WARNING,
837  ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_subscription: ")
838  ACE_TEXT("invalid domain %d.\n"),
839  domainId));
840  }
841 
842  return false;
843  }
844 
845  // Grab the participant.
846  DCPS_IR_Participant* partPtr
847  = where->second->participant(participantId);
848 
849  if (0 == partPtr) {
851  OpenDDS::DCPS::RepoIdConverter converter(participantId);
852  ACE_DEBUG((LM_WARNING,
853  ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_subscription: ")
854  ACE_TEXT("invalid participant %C in domain %d.\n"),
855  std::string(converter).c_str(),
856  domainId));
857  }
858 
859  return false;
860  }
861 
862  DCPS_IR_Topic* topic = where->second->find_topic(topicId);
863 
864  if (topic == 0) {
866  OpenDDS::DCPS::RepoIdConverter converter(topicId);
867  ACE_DEBUG((LM_WARNING,
868  ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_subscription: ")
869  ACE_TEXT("invalid topic %C in domain %d.\n"),
870  std::string(converter).c_str(),
871  domainId));
872  }
873 
874  return false;
875  }
876 
877  CORBA::Object_var obj = (dispatchingOrb_ ? dispatchingOrb_ : orb_) ->string_to_object(sub_str);
878  if (CORBA::is_nil(obj.in())) {
880  ACE_DEBUG((LM_WARNING,
881  ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_subscription: ")
882  ACE_TEXT("failure converting string %C to objref\n"),
883  sub_str));
884  }
885  return false;
886  }
887 
888  OpenDDS::DCPS::DataReaderRemote_var subscription = OpenDDS::DCPS::DataReaderRemote::_unchecked_narrow(obj.in());
889 
892  subId,
893  partPtr,
894  topic,
895  subscription.in(),
896  qos,
897  transInfo,
898  transportContext,
899  subscriberQos,
900  filterClassName,
901  filterExpression,
902  exprParams,
903  serializedTypeInfo));
904 
905  DCPS_IR_Subscription* sub = subPtr.get();
906  switch (partPtr->add_subscription(OpenDDS::DCPS::move(subPtr))) {
907  case -1: {
908  OpenDDS::DCPS::RepoIdConverter converter(subId);
909  ACE_ERROR((LM_ERROR,
910  ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::add_subscription: ")
911  ACE_TEXT("failed to add subscription to participant %C.\n"),
912  std::string(converter).c_str()));
913  return false;
914  }
915 
916  case 1:
917  return false;
918 
919  case 0:
920  default:
921  break;
922  }
923 
924  switch (topic->add_subscription_reference(sub, associate)) {
925  case -1: {
926  OpenDDS::DCPS::RepoIdConverter converter(subId);
927  ACE_ERROR((LM_ERROR,
928  ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::add_subscription: ")
929  ACE_TEXT("failed to add subscription to participant %C topic list.\n"),
930  std::string(converter).c_str()));
931 
932  // Remove the subscription.
933  partPtr->remove_subscription(subId);
934 
935  }
936  return false;
937 
938  case 1: // This is actually a really really bad place to jump to.
939  // This means that we successfully added the new subscription
940  // to the participant (it had not been inserted before) but
941  // that we are adding a duplicate subscription to the topic
942  // list - which should never ever be able to happen.
943  return false;
944 
945  case 0:
946  default:
947  break;
948  }
949 
950  OpenDDS::DCPS::RepoIdConverter converter(subId);
951 
952  // See if we are adding a subscription that was created within this
953  // repository or a different repository.
954  if (converter.federationId() == federation_.id()) {
955  // Ensure the subscription GUID_t values do not conflict.
956  partPtr->last_subscription_key(converter.entityKey());
957  }
958 
959  return true;
960 }
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
int add_subscription(OpenDDS::DCPS::unique_ptr< DCPS_IR_Subscription > sub)
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
ACE_Recursive_Thread_Mutex lock_
Definition: DCPSInfo_i.h:427
Representative of a Topic.
Definition: DCPS_IR_Topic.h:43
Representative of the Domain Participant.
Representative of a Subscription.
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
int add_subscription_reference(DCPS_IR_Subscription *subscription, bool associate=true)
const TAO_DDS_DCPSFederationId & federation_
Definition: DCPSInfo_i.h:418
_in_type in(void) const
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
DCPS_IR_Domain_Map domains_
Definition: DCPSInfo_i.h:414
void last_subscription_key(long key)
void id(RepoKey fedId)
CORBA::ORB_var dispatchingOrb_
Definition: DCPSInfo_i.h:416
CORBA::ORB_var orb_
Definition: DCPSInfo_i.h:415
Boolean is_nil(T x)
int remove_subscription(OpenDDS::DCPS::GUID_t subId)

◆ add_topic()

bool TAO_DDS_DCPSInfo_i::add_topic ( const OpenDDS::DCPS::GUID_t topicId,
DDS::DomainId_t  domainId,
const OpenDDS::DCPS::GUID_t participantId,
const char *  topicName,
const char *  dataTypeName,
const DDS::TopicQos qos 
)

Add a previously existing topic to the repository.

Parameters
topicIdthe Topic Entity GUID Id to use.
domainIdthe Domain in which the Topic is contained.
participantIdthe Participant in which the Topic is contained.
topicNamethe name of the Topic.
dataTypeNamethe name of the data type.
qosthe QoS value to use for the Topic.

Adds a Topic Entity to the repository using a specified TopicId value. If the TopicId indicates that this Topic was created by within this repository (the federation Id is the current repositories federation Id), this method will ensure that any subsequent calls to add a Topic and obtain a newly generated Id value will return an Id value greater than the Id value of the current one.

Definition at line 232 of file DCPSInfo_i.cpp.

References ACE_DEBUG, ACE_GUARD_RETURN, ACE_TEXT(), OpenDDS::DCPS::CREATED, OpenDDS::DCPS::DCPS_debug_level, domains_, OpenDDS::DCPS::GuidConverter::entityKey(), federation_, OpenDDS::DCPS::RepoIdConverter::federationId(), TAO_DDS_DCPSFederationId::id(), DCPS_IR_Participant::last_topic_key(), LM_WARNING, and lock_.

Referenced by Update::Manager::add(), OpenDDS::Federator::ManagerImpl::processCreate(), OpenDDS::Federator::ManagerImpl::processDeferred(), and receive_image().

238 {
239  ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, false);
240 
241  // Grab the domain.
242  DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
243 
244  if (where == this->domains_.end()) {
246  ACE_DEBUG((LM_WARNING,
247  ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_topic: ")
248  ACE_TEXT("invalid domain %d.\n"),
249  domainId));
250  }
251 
252  return false;
253  }
254 
255  // Grab the participant.
256  DCPS_IR_Participant* participantPtr
257  = where->second->participant(participantId);
258 
259  if (0 == participantPtr) {
261  OpenDDS::DCPS::RepoIdConverter converter(participantId);
262  ACE_DEBUG((LM_WARNING,
263  ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_topic: ")
264  ACE_TEXT("invalid participant %C.\n"),
265  std::string(converter).c_str()));
266  }
267 
268  return false;
269  }
270 
271  OpenDDS::DCPS::TopicStatus topicStatus
272  = where->second->force_add_topic(topicId, topicName, dataTypeName,
273  qos, participantPtr);
274 
275  if (topicStatus != OpenDDS::DCPS::CREATED) {
276  return false;
277  }
278 
279  OpenDDS::DCPS::RepoIdConverter converter(topicId);
280 
281  // See if we are adding a topic that was created within this
282  // repository or a different repository.
283  if (converter.federationId() == federation_.id()) {
284  // Ensure the topic GUID_t values do not conflict.
285  participantPtr->last_topic_key(converter.entityKey());
286  }
287 
288  return true;
289 }
#define ACE_DEBUG(X)
ACE_Recursive_Thread_Mutex lock_
Definition: DCPSInfo_i.h:427
Representative of the Domain Participant.
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
const TAO_DDS_DCPSFederationId & federation_
Definition: DCPSInfo_i.h:418
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
DCPS_IR_Domain_Map domains_
Definition: DCPSInfo_i.h:414
void id(RepoKey fedId)

◆ assert_topic()

OpenDDS::DCPS::TopicStatus TAO_DDS_DCPSInfo_i::assert_topic ( OpenDDS::DCPS::GUID_t_out  topicId,
DDS::DomainId_t  domainId,
const OpenDDS::DCPS::GUID_t participantId,
const char *  topicName,
const char *  dataTypeName,
const DDS::TopicQos qos,
bool  hasDcpsKey 
)
virtual

Definition at line 180 of file DCPSInfo_i.cpp.

References ACE_DEBUG, ACE_GUARD_RETURN, ACE_TEXT(), Update::Manager::create(), OpenDDS::DCPS::DCPS_debug_level, domains_, OpenDDS::DCPS::INTERNAL_ERROR, DCPS_IR_Participant::isBitPublisher(), LM_DEBUG, lock_, and um_.

188 {
190  // Grab the domain.
191  DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
192 
193  if (where == this->domains_.end()) {
195  }
196 
197  // Grab the participant.
198  DCPS_IR_Participant* participantPtr
199  = where->second->participant(participantId);
200 
201  if (0 == participantPtr) {
203  }
204 
205  OpenDDS::DCPS::TopicStatus topicStatus
206  = where->second->add_topic(
207  topicId,
208  topicName,
209  dataTypeName,
210  qos,
211  participantPtr);
212 
213  if (this->um_ && (participantPtr->isBitPublisher() == false)) {
214  Update::UTopic topic(domainId, topicId, participantId
215  , topicName, dataTypeName
216  , const_cast<DDS::TopicQos &>(qos));
217  this->um_->create(topic);
218 
220  OpenDDS::DCPS::RepoIdConverter converter(topicId);
221  ACE_DEBUG((LM_DEBUG,
222  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::assert_topic: ")
223  ACE_TEXT("pushing creation of topic %C in domain %d.\n"),
224  std::string(converter).c_str(),
225  domainId));
226  }
227  }
228  return topicStatus;
229 }
#define ACE_DEBUG(X)
Update::Manager * um_
Definition: DCPSInfo_i.h:421
ACE_Recursive_Thread_Mutex lock_
Definition: DCPSInfo_i.h:427
Representative of the Domain Participant.
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
void create(const UType &info)
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
DCPS_IR_Domain_Map domains_
Definition: DCPSInfo_i.h:414

◆ attach_participant()

CORBA::Boolean TAO_DDS_DCPSInfo_i::attach_participant ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::GUID_t participantId 
)
virtual

Definition at line 124 of file DCPSInfo_i.cpp.

References ACE_GUARD_RETURN, domains_, lock_, and DCPS_IR_Participant::takeOwnership().

127 {
129 
130  // Grab the domain.
131  DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
132 
133  if (where == this->domains_.end()) {
135  }
136 
137  // Grab the participant.
138  DCPS_IR_Participant* participant
139  = where->second->participant(participantId);
140 
141  if (0 == participant) {
143  }
144 
145  // Establish ownership within the local repository.
146  participant->takeOwnership();
147 
148  return false;
149 }
ACE_Recursive_Thread_Mutex lock_
Definition: DCPSInfo_i.h:427
Representative of the Domain Participant.
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
void takeOwnership()
Take local ownership of this participant and publish an update.
DCPS_IR_Domain_Map domains_
Definition: DCPSInfo_i.h:414

◆ changeOwnership()

bool TAO_DDS_DCPSInfo_i::changeOwnership ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::GUID_t participantId,
long  sender,
long  owner 
)

assert new ownership for a participant and its contained entities.

Parameters
domainIdthe domain in which the participant resides.
participantIdthe participant to be owned.
senderthe repository sending the update data.
ownerthe repository which is to make callbacks for entities within the participant.
Returns
boolean indicating that ownership has been assigned.

This establishes owner as the new owner of the participant. Ownership consists of calling back to the reader and writer remote interfaces when associations are established and removed from a publication or subscription. Owner may be the special value of OWNER_NONE to indicate that the previous owner is no longer available to make callbacks and the application has not indicated which repository is to replace it in this capacity.

The sender of the update is included so that the participant can check that transitions to OWNER_NONE are only honored when initiated by the current owner of the participant.

A return value of false indicates that the ownership was specified for a domain or participant which could not be found.

Definition at line 152 of file DCPSInfo_i.cpp.

References ACE_GUARD_RETURN, DCPS_IR_Participant::changeOwner(), domains_, and lock_.

Referenced by OpenDDS::Federator::ManagerImpl::processCreate(), OpenDDS::Federator::ManagerImpl::processDeferred(), OpenDDS::Federator::ManagerImpl::processDelete(), and OpenDDS::Federator::ManagerImpl::processUpdateQos1().

157 {
158  ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, false);
159 
160  // Grab the domain.
161  DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
162 
163  if (where == this->domains_.end()) {
164  return false;
165  }
166 
167  // Grab the participant.
168  DCPS_IR_Participant* participant
169  = where->second->participant(participantId);
170 
171  if (0 == participant) {
172  return false;
173  }
174 
175  // Establish the ownership.
176  participant->changeOwner(sender, owner);
177  return true;
178 }
ACE_Recursive_Thread_Mutex lock_
Definition: DCPSInfo_i.h:427
Representative of the Domain Participant.
void changeOwner(long sender, long owner)
Process an incoming update that changes ownership.
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
DCPS_IR_Domain_Map domains_
Definition: DCPSInfo_i.h:414

◆ cleanup_all_built_in_topics()

void TAO_DDS_DCPSInfo_i::cleanup_all_built_in_topics ( )

Definition at line 2497 of file DCPSInfo_i.cpp.

References ACE_GUARD, copy(), DCPS_IR_Domain_Map, domains_, in_cleanup_all_built_in_topics_, lock_, and OPENDDS_END_VERSIONED_NAMESPACE_DECL.

Referenced by InfoRepo::finalize(), and InfoRepo::handle_exception().

2498 {
2499 #ifndef DDS_HAS_MINIMUM_BIT
2501  {
2503  if (domains_.empty() || in_cleanup_all_built_in_topics_) {
2504  return;
2505  }
2506  copy = domains_;
2508  }
2509 
2510  for (DCPS_IR_Domain_Map::iterator it = copy.begin(); it != copy.end(); ++it) {
2511  it->second->cleanup_built_in_topics();
2512  }
2513 
2514  {
2517  copy.clear();
2518  domains_.clear();
2519  }
2520 #endif
2521 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
ACE_Recursive_Thread_Mutex lock_
Definition: DCPSInfo_i.h:427
DCPS_IR_Domain_Map domains_
Definition: DCPSInfo_i.h:414
OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL typedef std::map< DDS::DomainId_t, OpenDDS::DCPS::RcHandle< DCPS_IR_Domain > > DCPS_IR_Domain_Map
Definition: DCPSInfo_i.h:36
T * copy(T const &st)
bool in_cleanup_all_built_in_topics_
Definition: DCPSInfo_i.h:450

◆ disassociate_participant()

void TAO_DDS_DCPSInfo_i::disassociate_participant ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::GUID_t local_id,
const OpenDDS::DCPS::GUID_t remote_id 
)
virtual

Definition at line 1330 of file DCPSInfo_i.cpp.

References ACE_GUARD, domains_, lock_, DCPS_IR_Participant::publications(), and DCPS_IR_Participant::subscriptions().

1334 {
1336 
1337  DCPS_IR_Domain_Map::iterator it(this->domains_.find(domainId));
1338  if (it == this->domains_.end()) {
1340  }
1341 
1342  DCPS_IR_Participant* participant = it->second->participant(local_id);
1343  if (participant == 0) {
1345  }
1346 
1347  // Disassociate from participant temporarily:
1348  const DCPS_IR_Subscription_Map& subscriptions = participant->subscriptions();
1349  for (DCPS_IR_Subscription_Map::const_iterator sub(subscriptions.begin());
1350  sub != subscriptions.end(); ++sub) {
1351  sub->second->disassociate_participant(remote_id, true);
1352  }
1353 
1354  const DCPS_IR_Publication_Map& publications = participant->publications();
1355  for (DCPS_IR_Publication_Map::const_iterator pub(publications.begin());
1356  pub != publications.end(); ++pub) {
1357  pub->second->disassociate_participant(remote_id, true);
1358  }
1359 
1360  it->second->remove_dead_participants();
1361 }
const DCPS_IR_Subscription_Map & subscriptions() const
Expose a readable reference to the subscription map.
#define ACE_GUARD(MUTEX, OBJ, LOCK)
ACE_Recursive_Thread_Mutex lock_
Definition: DCPSInfo_i.h:427
std::map< OpenDDS::DCPS::GUID_t, OpenDDS::DCPS::container_supported_unique_ptr< DCPS_IR_Subscription >, OpenDDS::DCPS::GUID_tKeyLessThan > DCPS_IR_Subscription_Map
Representative of the Domain Participant.
const DCPS_IR_Publication_Map & publications() const
Expose a readable reference to the publication map.
DCPS_IR_Domain_Map domains_
Definition: DCPSInfo_i.h:414
std::map< OpenDDS::DCPS::GUID_t, OpenDDS::DCPS::container_supported_unique_ptr< DCPS_IR_Publication >, OpenDDS::DCPS::GUID_tKeyLessThan > DCPS_IR_Publication_Map

◆ disassociate_publication()

void TAO_DDS_DCPSInfo_i::disassociate_publication ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::GUID_t participantId,
const OpenDDS::DCPS::GUID_t local_id,
const OpenDDS::DCPS::GUID_t remote_id 
)
virtual

Definition at line 1406 of file DCPSInfo_i.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_GUARD, ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, DCPS_IR_Publication::disassociate_subscription(), domains_, DCPS_IR_Participant::find_publication_reference(), LM_ERROR, LM_INFO, and lock_.

1411 {
1413 
1414  DCPS_IR_Domain_Map::iterator it(this->domains_.find(domainId));
1415  if (it == this->domains_.end()) {
1417  }
1418 
1419  DCPS_IR_Participant* participant = it->second->participant(participantId);
1420  if (participant == 0) {
1422  }
1423 
1425  ACE_DEBUG((LM_INFO, "(%P|%t) disassociating publication\n"));
1426  }
1427 
1428  DCPS_IR_Publication* publication;
1429  if (participant->find_publication_reference(local_id, publication)
1430  != 0 || publication == 0) {
1431  OpenDDS::DCPS::RepoIdConverter part_converter(participantId);
1432  OpenDDS::DCPS::RepoIdConverter pub_converter(local_id);
1433  ACE_ERROR((LM_ERROR,
1434  ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::disassociate_publication: ")
1435  ACE_TEXT("participant %C could not find publication %C.\n"),
1436  std::string(part_converter).c_str(),
1437  std::string(pub_converter).c_str()));
1439  }
1440 
1441  // Disassociate from subscription temporarily:
1442  publication->disassociate_subscription(remote_id, true);
1443 
1444  it->second->remove_dead_participants();
1445 }
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
Representative of a Publication.
ACE_Recursive_Thread_Mutex lock_
Definition: DCPSInfo_i.h:427
Representative of the Domain Participant.
int find_publication_reference(OpenDDS::DCPS::GUID_t pubId, DCPS_IR_Publication *&pub)
Return the publication object.
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
DCPS_IR_Domain_Map domains_
Definition: DCPSInfo_i.h:414
void disassociate_subscription(OpenDDS::DCPS::GUID_t id, bool reassociate=false)
Remove any subscriptions with the id.

◆ disassociate_subscription()

void TAO_DDS_DCPSInfo_i::disassociate_subscription ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::GUID_t participantId,
const OpenDDS::DCPS::GUID_t local_id,
const OpenDDS::DCPS::GUID_t remote_id 
)
virtual

Definition at line 1364 of file DCPSInfo_i.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_GUARD, ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, DCPS_IR_Subscription::disassociate_publication(), domains_, DCPS_IR_Participant::find_subscription_reference(), LM_ERROR, LM_INFO, and lock_.

1369 {
1371 
1372  DCPS_IR_Domain_Map::iterator it(this->domains_.find(domainId));
1373  if (it == this->domains_.end()) {
1375  }
1376 
1377  DCPS_IR_Participant* participant = it->second->participant(participantId);
1378  if (participant == 0) {
1380  }
1381 
1383  ACE_DEBUG((LM_INFO, "(%P|%t) disassociating subscription\n"));
1384  }
1385 
1386  DCPS_IR_Subscription* subscription;
1387  if (participant->find_subscription_reference(local_id, subscription)
1388  != 0 || subscription == 0) {
1389  OpenDDS::DCPS::RepoIdConverter part_converter(participantId);
1390  OpenDDS::DCPS::RepoIdConverter sub_converter(local_id);
1391  ACE_ERROR((LM_ERROR,
1392  ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::disassociate_subscription: ")
1393  ACE_TEXT("participant %C could not find subscription %C.\n"),
1394  std::string(part_converter).c_str(),
1395  std::string(sub_converter).c_str()));
1397  }
1398 
1399  // Disassociate from publication temporarily:
1400  subscription->disassociate_publication(remote_id, true);
1401 
1402  it->second->remove_dead_participants();
1403 }
void disassociate_publication(OpenDDS::DCPS::GUID_t id, bool reassociate=false)
Remove any publications with id.
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
int find_subscription_reference(OpenDDS::DCPS::GUID_t subId, DCPS_IR_Subscription *&sub)
Return the subscription object.
ACE_Recursive_Thread_Mutex lock_
Definition: DCPSInfo_i.h:427
Representative of the Domain Participant.
Representative of a Subscription.
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
DCPS_IR_Domain_Map domains_
Definition: DCPSInfo_i.h:414

◆ domain()

DCPS_IR_Domain * TAO_DDS_DCPSInfo_i::domain ( DDS::DomainId_t  domain)

Convert a domain Id into a reference to a DCPS_IR_Domain object.

Definition at line 2129 of file DCPSInfo_i.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_TEXT(), OpenDDS::DCPS::Service_Participant::ANY_DOMAIN, OpenDDS::DCPS::DCPS_debug_level, domains_, federation_, OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), DCPS_IR_Domain::init_built_in_topics(), LM_DEBUG, LM_ERROR, OpenDDS::DCPS::move(), TAO_DDS_DCPSFederationId::overridden(), participantIdGenerator_, reincarnate_, TheServiceParticipant, and DCPS_IR_Domain::useBIT().

Referenced by add_domain_participant(), receive_image(), remove_by_owner(), and update_subscription_params().

2130 {
2132  ACE_ERROR((LM_ERROR,
2133  ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::domain: ")
2134  ACE_TEXT("ANY_DOMAIN not supported for operations.\n")));
2135  return 0;
2136  }
2137 
2138  // Check if the domain is already in the map.
2139  DCPS_IR_Domain_Map::iterator where = this->domains_.find(domain);
2140 
2141  if (where == this->domains_.end()) {
2142  // We will attempt to insert a new domain, go ahead and allocate it.
2145 
2146  DCPS_IR_Domain* domainPtr = domain_uptr.get();
2147 
2148  // We need to insert the domain into the map at this time since it
2149  // might be looked up during the init_built_in_topics() call.
2150  this->domains_.insert(
2151  where,
2152  DCPS_IR_Domain_Map::value_type(domain, OpenDDS::DCPS::move(domain_uptr)));
2153 
2154 #ifndef DDS_HAS_MINIMUM_BIT
2155  if (TheServiceParticipant->get_BIT() && !domainPtr->useBIT() &&
2157  ) {
2158  ACE_ERROR((LM_ERROR,
2159  ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::domain: ")
2160  ACE_TEXT("failed to initialize the Built-In Topics ")
2161  ACE_TEXT("when loading domain %d.\n"),
2162  domain));
2163  this->domains_.erase(domain);
2164  return 0;
2165  }
2166 #endif
2167 
2169  ACE_DEBUG((LM_DEBUG,
2170  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::domain: ")
2171  ACE_TEXT("successfully loaded domain %d at %x.\n"),
2172  domain,
2173  domainPtr));
2174  }
2175  return domainPtr;
2176 
2177  } else {
2178  return where->second.get();
2179  }
2180 }
DCPS_IR_Domain * domain(DDS::DomainId_t domain)
Convert a domain Id into a reference to a DCPS_IR_Domain object.
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
bool useBIT() const
const TAO_DDS_DCPSFederationId & federation_
Definition: DCPSInfo_i.h:418
int init_built_in_topics(bool federated, bool persistent)
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
DCPS_IR_Domain_Map domains_
Definition: DCPSInfo_i.h:414
OpenDDS::DCPS::RepoIdGenerator participantIdGenerator_
Definition: DCPSInfo_i.h:419
#define TheServiceParticipant
Representation of a Domain in the system.

◆ domains()

const DCPS_IR_Domain_Map & TAO_DDS_DCPSInfo_i::domains ( ) const

Expose a readable reference of the domain map.

Definition at line 2473 of file DCPSInfo_i.cpp.

References domains_.

Referenced by OpenDDS::Federator::ManagerImpl::pushState().

2474 {
2475  return this->domains_;
2476 }
DCPS_IR_Domain_Map domains_
Definition: DCPSInfo_i.h:414

◆ dump_to_string()

char * TAO_DDS_DCPSInfo_i::dump_to_string ( )
virtual

Dump the Repos state to string.

Definition at line 2480 of file DCPSInfo_i.cpp.

References domains_, and CORBA::string_dup().

2481 {
2482  std::string dump;
2483 #if !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT)
2484  std::string indent (" ");
2485 
2486  for (DCPS_IR_Domain_Map::const_iterator dm = domains_.begin();
2487  dm != domains_.end();
2488  dm++)
2489  {
2490  dump += dm->second->dump_to_string(indent, 0);
2491  }
2492 #endif // !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT)
2493  return CORBA::string_dup(dump.c_str());
2494 
2495 }
char * string_dup(const char *)
DCPS_IR_Domain_Map domains_
Definition: DCPSInfo_i.h:414

◆ finalize()

void TAO_DDS_DCPSInfo_i::finalize ( void  )

Cleanup state for shutdown.

Definition at line 2455 of file DCPSInfo_i.cpp.

References ACE_Reactor::cancel_timer(), DCPS_IR_Domain_Map, dispatch_check_timer_id_, orb_, ACE_Event_Handler::reactor(), and reassociate_timer_id_.

Referenced by InfoRepo::finalize(), and InfoRepo::handle_exception().

2456 {
2457  if (reassociate_timer_id_ != -1) {
2458  ACE_Reactor* reactor = this->orb_->orb_core()->reactor();
2459 
2460  reactor->cancel_timer(this->reassociate_timer_id_);
2461  this->reassociate_timer_id_ = -1;
2462  }
2463 
2464  if (dispatch_check_timer_id_ != -1) {
2465  ACE_Reactor* reactor = this->orb_->orb_core()->reactor();
2466 
2467  reactor->cancel_timer(this->dispatch_check_timer_id_);
2468  this->dispatch_check_timer_id_ = -1;
2469  }
2470 }
long dispatch_check_timer_id_
Definition: DCPSInfo_i.h:430
virtual ACE_Reactor * reactor(void) const
virtual int cancel_timer(long timer_id, const void **arg=0, int dont_call_handle_close=1)
long reassociate_timer_id_
Definition: DCPSInfo_i.h:429
CORBA::ORB_var orb_
Definition: DCPSInfo_i.h:415

◆ find_topic()

OpenDDS::DCPS::TopicStatus TAO_DDS_DCPSInfo_i::find_topic ( DDS::DomainId_t  domainId,
const char *  topicName,
CORBA::String_out  dataTypeName,
DDS::TopicQos_out  qos,
OpenDDS::DCPS::GUID_t_out  topicId 
)
virtual

Definition at line 291 of file DCPSInfo_i.cpp.

References ACE_GUARD_RETURN, domains_, OpenDDS::DCPS::FOUND, DCPS_IR_Topic_Description::get_dataTypeName(), DCPS_IR_Topic::get_id(), DCPS_IR_Topic::get_topic_description(), DCPS_IR_Topic::get_topic_qos(), OpenDDS::DCPS::INTERNAL_ERROR, lock_, OpenDDS::DCPS::NOT_FOUND, and Update::TopicQos.

297 {
299 
300  // Grab the domain.
301  DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
302 
303  if (where == this->domains_.end()) {
305  }
306 
308 
309  DCPS_IR_Topic* topic = 0;
310  qos = new DDS::TopicQos;
311 
312  status = where->second->find_topic(topicName, topic);
313 
314  if (0 != topic) {
315  status = OpenDDS::DCPS::FOUND;
316  const DCPS_IR_Topic_Description* desc = topic->get_topic_description();
317  dataTypeName = desc->get_dataTypeName();
318  *qos = *(topic->get_topic_qos());
319  topicId = topic->get_id();
320  }
321 
322  return status;
323 }
OpenDDS::DCPS::GUID_t get_id() const
DDS::TopicQos * get_topic_qos()
DCPS_IR_Topic_Description * get_topic_description()
ACE_Recursive_Thread_Mutex lock_
Definition: DCPSInfo_i.h:427
Representative of a Topic.
Definition: DCPS_IR_Topic.h:43
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
Representative of a Topic Description.
DCPS_IR_Domain_Map domains_
Definition: DCPSInfo_i.h:414

◆ handle_timeout()

int TAO_DDS_DCPSInfo_i::handle_timeout ( const ACE_Time_Value now,
const void *  arg 
)
virtual

Reimplemented from ACE_Event_Handler.

Definition at line 70 of file DCPSInfo_i.cpp.

References ACE_GUARD_RETURN, dispatchingOrb_, domains_, and lock_.

72 {
74 
75  if (arg == this) {
76  if (dispatchingOrb_) {
77  if (dispatchingOrb_->work_pending()) {
78  // Ten microseconds
79  ACE_Time_Value smallval(0, 10);
80  dispatchingOrb_->perform_work(smallval);
81  }
82  }
83  } else {
84  // NOTE: This is a purposefully naive approach to addressing defunct
85  // associations. In the future, it may be worthwhile to introduce a
86  // callback model to fix the heinous runtime cost below:
87  for (DCPS_IR_Domain_Map::const_iterator dom(this->domains_.begin());
88  dom != this->domains_.end(); ++dom) {
89 
90  const DCPS_IR_Participant_Map& participants(dom->second->participants());
91  for (DCPS_IR_Participant_Map::const_iterator part(participants.begin());
92  part != participants.end(); ++part) {
93 
94  const DCPS_IR_Subscription_Map& subscriptions(part->second->subscriptions());
95  for (DCPS_IR_Subscription_Map::const_iterator sub(subscriptions.begin());
96  sub != subscriptions.end(); ++sub) {
97  sub->second->reevaluate_defunct_associations();
98  }
99 
100  const DCPS_IR_Publication_Map& publications(part->second->publications());
101  for (DCPS_IR_Publication_Map::const_iterator pub(publications.begin());
102  pub != publications.end(); ++pub) {
103  pub->second->reevaluate_defunct_associations();
104  }
105  }
106  }
107  }
108 
109  return 0;
110 }
std::map< OpenDDS::DCPS::GUID_t, DCPS_IR_Participant_rch, OpenDDS::DCPS::GUID_tKeyLessThan > DCPS_IR_Participant_Map
ACE_Recursive_Thread_Mutex lock_
Definition: DCPSInfo_i.h:427
std::map< OpenDDS::DCPS::GUID_t, OpenDDS::DCPS::container_supported_unique_ptr< DCPS_IR_Subscription >, OpenDDS::DCPS::GUID_tKeyLessThan > DCPS_IR_Subscription_Map
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
DCPS_IR_Domain_Map domains_
Definition: DCPSInfo_i.h:414
std::map< OpenDDS::DCPS::GUID_t, OpenDDS::DCPS::container_supported_unique_ptr< DCPS_IR_Publication >, OpenDDS::DCPS::GUID_tKeyLessThan > DCPS_IR_Publication_Map
CORBA::ORB_var dispatchingOrb_
Definition: DCPSInfo_i.h:416

◆ ignore_domain_participant()

void TAO_DDS_DCPSInfo_i::ignore_domain_participant ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::GUID_t myParticipantId,
const OpenDDS::DCPS::GUID_t ignoreId 
)
virtual

Definition at line 1552 of file DCPSInfo_i.cpp.

References ACE_GUARD, domains_, DCPS_IR_Participant::ignore_participant(), and lock_.

1556 {
1558 
1559  // Grab the domain.
1560  DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
1561 
1562  if (where == this->domains_.end()) {
1564  }
1565 
1566  // Grab the participant.
1567  DCPS_IR_Participant* partPtr
1568  = where->second->participant(myParticipantId);
1569 
1570  if (0 == partPtr) {
1572  }
1573 
1574  partPtr->ignore_participant(ignoreId);
1575 
1576  where->second->remove_dead_participants();
1577 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
ACE_Recursive_Thread_Mutex lock_
Definition: DCPSInfo_i.h:427
Representative of the Domain Participant.
DCPS_IR_Domain_Map domains_
Definition: DCPSInfo_i.h:414
void ignore_participant(OpenDDS::DCPS::GUID_t id)
Ignore the participant with the id.

◆ ignore_publication()

void TAO_DDS_DCPSInfo_i::ignore_publication ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::GUID_t myParticipantId,
const OpenDDS::DCPS::GUID_t ignoreId 
)
virtual

Definition at line 1633 of file DCPSInfo_i.cpp.

References ACE_GUARD, domains_, DCPS_IR_Participant::ignore_publication(), and lock_.

1637 {
1639 
1640  // Grab the domain.
1641  DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
1642 
1643  if (where == this->domains_.end()) {
1645  }
1646 
1647  // Grab the participant.
1648  DCPS_IR_Participant* partPtr
1649  = where->second->participant(myParticipantId);
1650 
1651  if (0 == partPtr) {
1653  }
1654 
1655  partPtr->ignore_publication(ignoreId);
1656 
1657  where->second->remove_dead_participants();
1658 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
ACE_Recursive_Thread_Mutex lock_
Definition: DCPSInfo_i.h:427
Representative of the Domain Participant.
void ignore_publication(OpenDDS::DCPS::GUID_t id)
Ignore the publication with the id.
DCPS_IR_Domain_Map domains_
Definition: DCPSInfo_i.h:414

◆ ignore_subscription()

void TAO_DDS_DCPSInfo_i::ignore_subscription ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::GUID_t myParticipantId,
const OpenDDS::DCPS::GUID_t ignoreId 
)
virtual

Definition at line 1606 of file DCPSInfo_i.cpp.

References ACE_GUARD, domains_, DCPS_IR_Participant::ignore_subscription(), and lock_.

1610 {
1612 
1613  // Grab the domain.
1614  DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
1615 
1616  if (where == this->domains_.end()) {
1618  }
1619 
1620  // Grab the participant.
1621  DCPS_IR_Participant* partPtr
1622  = where->second->participant(myParticipantId);
1623 
1624  if (0 == partPtr) {
1626  }
1627 
1628  partPtr->ignore_subscription(ignoreId);
1629 
1630  where->second->remove_dead_participants();
1631 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
void ignore_subscription(OpenDDS::DCPS::GUID_t id)
Ignore the subscription with the id.
ACE_Recursive_Thread_Mutex lock_
Definition: DCPSInfo_i.h:427
Representative of the Domain Participant.
DCPS_IR_Domain_Map domains_
Definition: DCPSInfo_i.h:414

◆ ignore_topic()

void TAO_DDS_DCPSInfo_i::ignore_topic ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::GUID_t myParticipantId,
const OpenDDS::DCPS::GUID_t ignoreId 
)
virtual

Definition at line 1579 of file DCPSInfo_i.cpp.

References ACE_GUARD, domains_, DCPS_IR_Participant::ignore_topic(), and lock_.

1583 {
1585 
1586  // Grab the domain.
1587  DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
1588 
1589  if (where == this->domains_.end()) {
1591  }
1592 
1593  // Grab the participant.
1594  DCPS_IR_Participant* partPtr
1595  = where->second->participant(myParticipantId);
1596 
1597  if (0 == partPtr) {
1599  }
1600 
1601  partPtr->ignore_topic(ignoreId);
1602 
1603  where->second->remove_dead_participants();
1604 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
ACE_Recursive_Thread_Mutex lock_
Definition: DCPSInfo_i.h:427
Representative of the Domain Participant.
DCPS_IR_Domain_Map domains_
Definition: DCPSInfo_i.h:414
void ignore_topic(OpenDDS::DCPS::GUID_t id)
Ignore the topic with the id.

◆ init_dispatchChecking()

bool TAO_DDS_DCPSInfo_i::init_dispatchChecking ( const ACE_Time_Value delay)

Definition at line 2444 of file DCPSInfo_i.cpp.

References dispatch_check_timer_id_, orb_, ACE_Event_Handler::reactor(), and ACE_Reactor::schedule_timer().

2445 {
2446  if (this->dispatch_check_timer_id_ != -1) return false; // already scheduled
2447 
2448  ACE_Reactor* reactor = this->orb_->orb_core()->reactor();
2449 
2450  this->dispatch_check_timer_id_ = reactor->schedule_timer(this, this, delay, delay);
2451  return this->dispatch_check_timer_id_ != -1;
2452 }
virtual long schedule_timer(ACE_Event_Handler *event_handler, const void *arg, const ACE_Time_Value &delay, const ACE_Time_Value &interval=ACE_Time_Value::zero)
long dispatch_check_timer_id_
Definition: DCPSInfo_i.h:430
virtual ACE_Reactor * reactor(void) const
CORBA::ORB_var orb_
Definition: DCPSInfo_i.h:415

◆ init_persistence()

bool TAO_DDS_DCPSInfo_i::init_persistence ( )

Definition at line 2411 of file DCPSInfo_i.cpp.

References ACE_ERROR_RETURN, ACE_TEXT(), Update::Manager::add(), ACE_Dynamic_Service< class >::instance(), LM_ERROR, reincarnate_, Update::Manager::requestImage(), and um_.

2412 {
2414  ("UpdateManagerSvc");
2415 
2416  if (um_ != 0) {
2417  um_->add(this);
2418 
2419  // Request persistent image.
2420  if (reincarnate_) {
2421  um_->requestImage();
2422  }
2423 
2424  } else {
2425  ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("TAO_DDS_DCPSInfo_i> Failed to discover ")
2426  ACE_TEXT("UpdateManagerSvc.\n")), false);
2427  }
2428 
2429  return true;
2430 }
Update::Manager * um_
Definition: DCPSInfo_i.h:421
static TYPE * instance(const ACE_TCHAR *name)
void add(TAO_DDS_DCPSInfo_i *info)
ACE_TEXT("TCP_Factory")
void requestImage()
Force a clean shutdown.
#define ACE_ERROR_RETURN(X, Y)

◆ init_reassociation()

bool TAO_DDS_DCPSInfo_i::init_reassociation ( const ACE_Time_Value delay)

Definition at line 2433 of file DCPSInfo_i.cpp.

References orb_, ACE_Event_Handler::reactor(), reassociate_timer_id_, and ACE_Reactor::schedule_timer().

2434 {
2435  if (this->reassociate_timer_id_ != -1) return false; // already scheduled
2436 
2437  ACE_Reactor* reactor = this->orb_->orb_core()->reactor();
2438 
2439  this->reassociate_timer_id_ = reactor->schedule_timer(this, 0, delay, delay);
2440  return this->reassociate_timer_id_ != -1;
2441 }
virtual long schedule_timer(ACE_Event_Handler *event_handler, const void *arg, const ACE_Time_Value &delay, const ACE_Time_Value &interval=ACE_Time_Value::zero)
virtual ACE_Reactor * reactor(void) const
long reassociate_timer_id_
Definition: DCPSInfo_i.h:429
CORBA::ORB_var orb_
Definition: DCPSInfo_i.h:415

◆ init_transport()

int TAO_DDS_DCPSInfo_i::init_transport ( int  listen_address_given,
const char *  listen_str 
)

Initialize the transport for the Built-In Topics Returns 0 (zero) if succeeds

Definition at line 2182 of file DCPSInfo_i.cpp.

References ACE_TEXT(), OpenDDS::DCPS::TransportRegistry::create_config(), OpenDDS::DCPS::TransportRegistry::create_inst(), ACE_Service_Config::current(), OpenDDS::DCPS::TransportInst::datalink_release_delay_, OpenDDS::DCPS::TransportRegistry::DEFAULT_INST_PREFIX, OpenDDS::DCPS::dynamic_rchandle_cast(), find(), OpenDDS::DCPS::TransportRegistry::instance(), OpenDDS::DCPS::TransportConfig::instances_, and ACE_Service_Config::process_directive().

2184 {
2185  int status = 0;
2186 
2187 #ifndef DDS_HAS_MINIMUM_BIT
2188  try {
2189 
2190 #ifndef ACE_AS_STATIC_LIBS
2191  if (ACE_Service_Config::current()->find(ACE_TEXT("OpenDDS_Tcp"))
2192  < 0 /* not found (-1) or suspended (-2) */) {
2193  static const ACE_TCHAR directive[] =
2194  ACE_TEXT("dynamic OpenDDS_Tcp Service_Object * ")
2195  ACE_TEXT("OpenDDS_Tcp:_make_TcpLoader()");
2196  ACE_Service_Config::process_directive(directive);
2197  }
2198 #endif
2199 
2200  const std::string config_name =
2202  + std::string("InfoRepoBITTransportConfig");
2205 
2206  const std::string inst_name =
2208  + std::string("InfoRepoBITTCPTransportInst");
2211  "tcp");
2212  config->instances_.push_back(inst);
2213 
2214  OpenDDS::DCPS::TcpInst_rch tcp_inst =
2216  inst->datalink_release_delay_ = 0;
2217 
2218  tcp_inst->conn_retry_attempts_ = 0;
2219 
2220  if (listen_address_given) {
2221  tcp_inst->local_address(listen_str);
2222  }
2223 
2224  } catch (...) {
2225  // TransportRegistry is extremely varied in the exceptions that
2226  // it throws on failure; do not allow exceptions to bubble up
2227  // beyond this point.
2228  status = 1;
2229  }
2230 #else
2231  ACE_UNUSED_ARG(listen_address_given);
2232  ACE_UNUSED_ARG(listen_str);
2233 #endif
2234 
2235  return status;
2236 }
RcHandle< T > dynamic_rchandle_cast(const RcHandle< U > &h)
Definition: RcHandle_T.h:214
static const char DEFAULT_INST_PREFIX[]
TransportInst_rch create_inst(const OPENDDS_STRING &name, const OPENDDS_STRING &transport_type)
char ACE_TCHAR
TransportConfig_rch create_config(const OPENDDS_STRING &name)
static ACE_Service_Gestalt * current(void)
int find(const PG_Property_Set &decoder, const ACE_CString &key, TYPE &value)
ACE_TEXT("TCP_Factory")
static TransportRegistry * instance()
Return a singleton instance of this class.

◆ orb()

CORBA::ORB_ptr TAO_DDS_DCPSInfo_i::orb ( void  )

Expose the ORB.

Definition at line 119 of file DCPSInfo_i.cpp.

References CORBA::ORB::_duplicate(), TAO_Pseudo_Var_T< ORB >::in(), and orb_.

Referenced by OpenDDS::Federator::ManagerImpl::pushState().

120 {
121  return CORBA::ORB::_duplicate(this->orb_.in());
122 }
static CORBA::ORB_ptr _duplicate(CORBA::ORB_ptr orb)
_in_type in(void) const
CORBA::ORB_var orb_
Definition: DCPSInfo_i.h:415

◆ receive_image()

bool TAO_DDS_DCPSInfo_i::receive_image ( const Update::UImage image)

Definition at line 2239 of file DCPSInfo_i.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_TEXT(), Update::ActorStrt< PSQ, RWQ, C, T, CSP, STI >::actorId, Update::ImageData< T, P, A, W >::actors, add_domain_participant(), add_publication(), add_subscription(), add_topic(), Update::ActorStrt< PSQ, RWQ, C, T, CSP, STI >::callback, Update::ActorStrt< PSQ, RWQ, C, T, CSP, STI >::contentSubscriptionProfile, Update::TopicStrt< Q, S >::dataType, OpenDDS::DCPS::DCPS_debug_level, domain(), Update::TopicStrt< Q, S >::domainId, Update::ParticipantStrt< Q >::domainId, Update::ActorStrt< PSQ, RWQ, C, T, CSP, STI >::domainId, domains_, Update::ActorStrt< PSQ, RWQ, C, T, CSP, STI >::drdwQos, OpenDDS::DCPS::RepoIdGenerator::last(), Update::ImageData< T, P, A, W >::lastPartId, LM_DEBUG, LM_ERROR, LM_WARNING, Update::TopicStrt< Q, S >::name, Update::TopicStrt< Q, S >::participantId, Update::ParticipantStrt< Q >::participantId, Update::ActorStrt< PSQ, RWQ, C, T, CSP, STI >::participantId, participantIdGenerator_, Update::ParticipantStrt< Q >::participantQos, Update::ImageData< T, P, A, W >::participants, Update::ActorStrt< PSQ, RWQ, C, T, CSP, STI >::pubsubQos, Update::ActorStrt< PSQ, RWQ, C, T, CSP, STI >::serializedTypeInfo, TheServiceParticipant, Update::TopicStrt< Q, S >::topicId, Update::ActorStrt< PSQ, RWQ, C, T, CSP, STI >::topicId, Update::TopicStrt< Q, S >::topicQos, Update::ImageData< T, P, A, W >::topics, Update::ActorStrt< PSQ, RWQ, C, T, CSP, STI >::transportContext, Update::ActorStrt< PSQ, RWQ, C, T, CSP, STI >::transportInterfaceInfo, and Update::ImageData< T, P, A, W >::wActors.

Referenced by Update::Manager::pushImage().

2240 {
2242  ACE_DEBUG((LM_DEBUG,
2243  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::receive_image: ")
2244  ACE_TEXT("processing persistent data.\n")));
2245  }
2246 
2247  // Initialize builtin topics first so that they always have the same IDs
2248 #ifndef DDS_HAS_MINIMUM_BIT
2249  if (TheServiceParticipant->get_BIT()) {
2250  for (Update::UImage::ParticipantSeq::const_iterator
2251  iter = image.participants.begin();
2252  iter != image.participants.end(); iter++) {
2253  const Update::UParticipant* part = *iter;
2254  if (!domain(part->domainId)) {
2256  ACE_DEBUG((LM_WARNING,
2257  ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i::receive_image: ")
2258  ACE_TEXT("invalid domain Id: %d\n"),
2259  part->domainId));
2260  }
2261  return false;
2262  }
2263  }
2264  }
2265 #endif
2266 
2267  // Ensure that new non-BIT participants do not reuse an id
2269 
2270  for (Update::UImage::ParticipantSeq::const_iterator
2271  iter = image.participants.begin();
2272  iter != image.participants.end(); iter++) {
2273  const Update::UParticipant* part = *iter;
2274 
2275  if (!this->add_domain_participant(part->domainId, part->participantId
2276  , part->participantQos)) {
2278  ACE_ERROR((LM_ERROR,
2279  ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::receive_image: ")
2280  ACE_TEXT("failed to add participant %C to domain %d.\n"),
2281  std::string(converter).c_str(),
2282  part->domainId));
2283  return false;
2284 
2285  } else if (OpenDDS::DCPS::DCPS_debug_level > 0) {
2287  ACE_DEBUG((LM_DEBUG,
2288  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::receive_image: ")
2289  ACE_TEXT("added participant %C to domain %d.\n"),
2290  std::string(converter).c_str(),
2291  part->domainId));
2292  }
2293  }
2294 
2295  for (Update::UImage::TopicSeq::const_iterator iter = image.topics.begin();
2296  iter != image.topics.end(); iter++) {
2297  const Update::UTopic* topic = *iter;
2298 
2299  if (!this->add_topic(topic->topicId, topic->domainId
2300  , topic->participantId, topic->name.c_str()
2301  , topic->dataType.c_str(), topic->topicQos)) {
2302  OpenDDS::DCPS::RepoIdConverter topic_converter(topic->topicId);
2303  OpenDDS::DCPS::RepoIdConverter part_converter(topic->participantId);
2304  ACE_ERROR((LM_ERROR,
2305  ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::receive_image: ")
2306  ACE_TEXT("failed to add topic %C to participant %C.\n"),
2307  std::string(topic_converter).c_str(),
2308  std::string(part_converter).c_str()));
2309  return false;
2310 
2311  } else if (OpenDDS::DCPS::DCPS_debug_level > 0) {
2312  OpenDDS::DCPS::RepoIdConverter topic_converter(topic->topicId);
2313  OpenDDS::DCPS::RepoIdConverter part_converter(topic->participantId);
2314  ACE_DEBUG((LM_DEBUG,
2315  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::receive_image: ")
2316  ACE_TEXT("added topic %C to participant %C.\n"),
2317  std::string(topic_converter).c_str(),
2318  std::string(part_converter).c_str()));
2319  }
2320  }
2321 
2322  for (Update::UImage::ReaderSeq::const_iterator iter = image.actors.begin();
2323  iter != image.actors.end(); iter++) {
2324  const Update::URActor* sub = *iter;
2325  // no reason to associate, there are no publishers yet to associate with
2326  if (!this->add_subscription(sub->domainId, sub->participantId
2327  , sub->topicId, sub->actorId
2328  , sub->callback.c_str(), sub->drdwQos
2329  , sub->transportInterfaceInfo
2330  , sub->transportContext
2331  , sub->pubsubQos
2332  , sub->contentSubscriptionProfile.filterClassName
2333  , sub->contentSubscriptionProfile.filterExpr
2334  , sub->contentSubscriptionProfile.exprParams
2335  , sub->serializedTypeInfo)) {
2336  OpenDDS::DCPS::RepoIdConverter sub_converter(sub->actorId);
2337  OpenDDS::DCPS::RepoIdConverter part_converter(sub->participantId);
2338  ACE_ERROR((LM_ERROR,
2339  ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::receive_image: ")
2340  ACE_TEXT("failed to add subscription %C to participant %C.\n"),
2341  std::string(sub_converter).c_str(),
2342  std::string(part_converter).c_str()));
2343  return false;
2344 
2345  } else if (OpenDDS::DCPS::DCPS_debug_level > 0) {
2346  OpenDDS::DCPS::RepoIdConverter sub_converter(sub->actorId);
2347  OpenDDS::DCPS::RepoIdConverter part_converter(sub->participantId);
2348  ACE_DEBUG((LM_DEBUG,
2349  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::receive_image: ")
2350  ACE_TEXT("added subscription %C to participant %C.\n"),
2351  std::string(sub_converter).c_str(),
2352  std::string(part_converter).c_str()));
2353  }
2354  }
2355 
2356  for (Update::UImage::WriterSeq::const_iterator iter = image.wActors.begin();
2357  iter != image.wActors.end(); iter++) {
2358  const Update::UWActor* pub = *iter;
2359 
2360  // try to associate with any persisted subscriptions to track any expected
2361  // existing associations
2362  if (!this->add_publication(pub->domainId, pub->participantId
2363  , pub->topicId, pub->actorId
2364  , pub->callback.c_str() , pub->drdwQos
2366  , pub->pubsubQos
2367  , pub->serializedTypeInfo
2368  , true)) {
2369  OpenDDS::DCPS::RepoIdConverter pub_converter(pub->actorId);
2370  OpenDDS::DCPS::RepoIdConverter part_converter(pub->participantId);
2371  ACE_ERROR((LM_ERROR,
2372  ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::receive_image: ")
2373  ACE_TEXT("failed to add publication %C to participant %C.\n"),
2374  std::string(pub_converter).c_str(),
2375  std::string(part_converter).c_str()));
2376  return false;
2377 
2378  } else if (OpenDDS::DCPS::DCPS_debug_level > 0) {
2379  OpenDDS::DCPS::RepoIdConverter pub_converter(pub->actorId);
2380  OpenDDS::DCPS::RepoIdConverter part_converter(pub->participantId);
2381  ACE_DEBUG((LM_DEBUG,
2382  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::receive_image: ")
2383  ACE_TEXT("added publication %C to participant %C.\n"),
2384  std::string(pub_converter).c_str(),
2385  std::string(part_converter).c_str()));
2386  }
2387  }
2388 
2389 #ifndef DDS_HAS_MINIMUM_BIT
2390  if (TheServiceParticipant->get_BIT()) {
2391  for (DCPS_IR_Domain_Map::const_iterator currentDomain = domains_.begin();
2392  currentDomain != domains_.end();
2393  ++currentDomain) {
2394  currentDomain->second->reassociate_built_in_topic_pubs();
2395  }
2396  }
2397 #endif
2398 
2399  return true;
2400 }
DCPS_IR_Domain * domain(DDS::DomainId_t domain)
Convert a domain Id into a reference to a DCPS_IR_Domain object.
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
ACE_CDR::ULong transportContext
virtual OpenDDS::DCPS::GUID_t add_publication(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const OpenDDS::DCPS::GUID_t &topicId, OpenDDS::DCPS::DataWriterRemote_ptr publication, const DDS::DataWriterQos &qos, const OpenDDS::DCPS::TransportLocatorSeq &transInfo, const DDS::PublisherQos &publisherQos, const DDS::OctetSeq &serializedTypeInfo)
Definition: DCPSInfo_i.cpp:374
DomainIdType domainId
ParticipantSeq participants
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
DomainIdType domainId
DCPS_IR_Domain_Map domains_
Definition: DCPSInfo_i.h:414
virtual OpenDDS::DCPS::AddDomainStatus add_domain_participant(DDS::DomainId_t domain, const DDS::DomainParticipantQos &qos)
virtual OpenDDS::DCPS::GUID_t add_subscription(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const OpenDDS::DCPS::GUID_t &topicId, OpenDDS::DCPS::DataReaderRemote_ptr subscription, const DDS::DataReaderQos &qos, const OpenDDS::DCPS::TransportLocatorSeq &transInfo, const DDS::SubscriberQos &subscriberQos, const char *filterClassName, const char *filterExpression, const DDS::StringSeq &exprParams, const DDS::OctetSeq &serializedTypeInfo)
Definition: DCPSInfo_i.cpp:681
OpenDDS::DCPS::RepoIdGenerator participantIdGenerator_
Definition: DCPSInfo_i.h:419
#define TheServiceParticipant
PartIdType lastPartId
What the last participant id is/was.
bool add_topic(const OpenDDS::DCPS::GUID_t &topicId, DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const char *topicName, const char *dataTypeName, const DDS::TopicQos &qos)
Add a previously existing topic to the repository.
Definition: DCPSInfo_i.cpp:232

◆ remove_by_owner()

bool TAO_DDS_DCPSInfo_i::remove_by_owner ( DDS::DomainId_t  domain,
long  owner 
)

Definition at line 1206 of file DCPSInfo_i.cpp.

References ACE_DEBUG, ACE_GUARD_RETURN, ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, domain(), domains_, LM_DEBUG, lock_, DCPS_IR_Participant::publications(), remove_domain_participant(), DCPS_IR_Participant::remove_publication(), DCPS_IR_Participant::remove_subscription(), DCPS_IR_Participant::remove_topic_reference(), DCPS_IR_Participant::subscriptions(), and DCPS_IR_Participant::topics().

Referenced by OpenDDS::Federator::ManagerImpl::leave_federation().

1209 {
1210  ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, false);
1211 
1212  // Grab the domain.
1213  DCPS_IR_Domain_Map::iterator where = this->domains_.find(domain);
1214 
1215  if (where == this->domains_.end()) {
1216  return false;
1217  }
1218 
1219  std::vector<OpenDDS::DCPS::GUID_t> candidates;
1220 
1221  for (DCPS_IR_Participant_Map::const_iterator
1222  current = where->second->participants().begin();
1223  current != where->second->participants().end();
1224  ++current) {
1225  if (current->second->owner() == owner) {
1226  candidates.push_back(current->second->get_id());
1227  }
1228  }
1229 
1231  ACE_DEBUG((LM_DEBUG,
1232  ACE_TEXT("(%P|%t) (bool)TAO_DDS_DCPSInfo_i::remove_by_owner: ")
1233  ACE_TEXT("%d participants to remove from domain %d.\n"),
1234  candidates.size(),
1235  domain));
1236  }
1237 
1238  bool status = true;
1239 
1240  for (unsigned int index = 0; index < candidates.size(); ++index) {
1241  DCPS_IR_Participant* participant
1242  = where->second->participant(candidates[index]);
1243  if (participant) {
1244  std::vector<OpenDDS::DCPS::GUID_t> keylist;
1245 
1246  // Remove Subscriptions
1247  for (DCPS_IR_Subscription_Map::const_iterator
1248  current = participant->subscriptions().begin();
1249  current != participant->subscriptions().end();
1250  ++current) {
1251  keylist.push_back(current->second->get_id());
1252  }
1253 
1255  OpenDDS::DCPS::RepoIdConverter converter(candidates[index]);
1256  ACE_DEBUG((LM_DEBUG,
1257  ACE_TEXT("(%P|%t) (bool)TAO_DDS_DCPSInfo_i::remove_by_owner: ")
1258  ACE_TEXT("%d subscriptions to remove from participant %C.\n"),
1259  keylist.size(),
1260  std::string(converter).c_str()));
1261  }
1262 
1263  for (unsigned int key = 0; key < keylist.size(); ++key) {
1264  if (participant->remove_subscription(keylist[key]) != 0) {
1265  status = false;
1266  }
1267  }
1268 
1269  // Remove Publications
1270  keylist.clear();
1271 
1272  for (DCPS_IR_Publication_Map::const_iterator
1273  current = participant->publications().begin();
1274  current != participant->publications().end();
1275  ++current) {
1276  keylist.push_back(current->second->get_id());
1277  }
1278 
1280  OpenDDS::DCPS::RepoIdConverter converter(candidates[index]);
1281  ACE_DEBUG((LM_DEBUG,
1282  ACE_TEXT("(%P|%t) (bool)TAO_DDS_DCPSInfo_i::remove_by_owner: ")
1283  ACE_TEXT("%d publications to remove from participant %C.\n"),
1284  keylist.size(),
1285  std::string(converter).c_str()));
1286  }
1287 
1288  for (unsigned int key = 0; key < keylist.size(); ++key) {
1289  if (participant->remove_publication(keylist[key]) != 0) {
1290  status = false;
1291  }
1292  }
1293 
1294  // Remove Topics
1295  keylist.clear();
1296 
1297  for (DCPS_IR_Topic_Map::const_iterator
1298  current = participant->topics().begin();
1299  current != participant->topics().end();
1300  ++current) {
1301  keylist.push_back(current->second->get_id());
1302  }
1303 
1305  OpenDDS::DCPS::RepoIdConverter converter(candidates[index]);
1306  ACE_DEBUG((LM_DEBUG,
1307  ACE_TEXT("(%P|%t) (bool)TAO_DDS_DCPSInfo_i::remove_by_owner: ")
1308  ACE_TEXT("%d topics to remove from participant %C.\n"),
1309  keylist.size(),
1310  std::string(converter).c_str()));
1311  }
1312 
1313  for (unsigned int key = 0; key < keylist.size(); ++key) {
1314  DCPS_IR_Topic* discard;
1315 
1316  if (participant->remove_topic_reference(keylist[key], discard) != 0) {
1317  status = false;
1318  }
1319  }
1320  }
1321 
1322  // Remove Participant
1323  this->remove_domain_participant(domain, candidates[ index]);
1324  }
1325 
1326  return status;
1327 }
DCPS_IR_Domain * domain(DDS::DomainId_t domain)
Convert a domain Id into a reference to a DCPS_IR_Domain object.
const DCPS_IR_Subscription_Map & subscriptions() const
Expose a readable reference to the subscription map.
#define ACE_DEBUG(X)
int remove_topic_reference(OpenDDS::DCPS::GUID_t topicId, DCPS_IR_Topic *&topic)
const DCPS_IR_Topic_Map & topics() const
Expose a readable reference to the topic map.
sequence< octet > key
virtual void remove_domain_participant(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId)
int remove_publication(OpenDDS::DCPS::GUID_t pubId)
ACE_Recursive_Thread_Mutex lock_
Definition: DCPSInfo_i.h:427
Representative of a Topic.
Definition: DCPS_IR_Topic.h:43
Representative of the Domain Participant.
const DCPS_IR_Publication_Map & publications() const
Expose a readable reference to the publication map.
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
DCPS_IR_Domain_Map domains_
Definition: DCPSInfo_i.h:414
int remove_subscription(OpenDDS::DCPS::GUID_t subId)

◆ remove_domain_participant()

void TAO_DDS_DCPSInfo_i::remove_domain_participant ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::GUID_t participantId 
)
virtual

Definition at line 1447 of file DCPSInfo_i.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_GUARD, ACE_TEXT(), TAO_DDS_DCPSInfo_i::BIT_Cleanup_Handler::cv_, OpenDDS::DCPS::CvStatus_NoTimeout, OpenDDS::DCPS::DCPS_debug_level, Update::Manager::destroy(), domains_, TAO_DDS_DCPSInfo_i::BIT_Cleanup_Handler::done_, ACE_Event_Handler_var::handler(), in_cleanup_all_built_in_topics_, LM_DEBUG, LM_ERROR, lock_, Update::Participant, TheServiceParticipant, um_, and OpenDDS::DCPS::ConditionVariable< Mutex >::wait().

Referenced by OpenDDS::Federator::ManagerImpl::processDelete(), and remove_by_owner().

1450 {
1452 
1453  // Grab the domain.
1454  DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
1455 
1456  if (where == this->domains_.end()) {
1458  }
1459 
1460  DCPS_IR_Participant_rch participant = where->second->participant_rch(participantId);
1461  if (!participant) {
1462  OpenDDS::DCPS::RepoIdConverter converter(participantId);
1463  ACE_ERROR((LM_ERROR,
1464  ACE_TEXT("(%P|%t) ERROR: (bool)TAO_DDS_DCPSInfo_i::remove_domain_participant: ")
1465  ACE_TEXT("failed to locate participant %C in domain %d.\n"),
1466  std::string(converter).c_str(),
1467  domainId));
1469  }
1470 
1471  // Determine if we should propagate this event; we need to cache this
1472  // result as the participant will be gone by the time we use the result.
1473  bool sendUpdate = participant->isOwner() && !participant->isBitPublisher();
1474 
1475  CORBA::Boolean dont_notify_lost = 0;
1476  int status = where->second->remove_participant(participantId, dont_notify_lost);
1477 
1478  if (0 != status) {
1479  // Removing the participant failed
1481  }
1482 
1483  // Update any concerned observers that the participant was destroyed.
1484  if (this->um_ && sendUpdate) {
1485  Update::IdPath path(
1486  where->second->get_id(),
1487  participantId,
1488  participantId);
1489  this->um_->destroy(path, Update::Participant);
1490 
1492  OpenDDS::DCPS::RepoIdConverter converter(participantId);
1493  ACE_DEBUG((LM_DEBUG,
1494  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::remove_domain_participant: ")
1495  ACE_TEXT("pushing deletion of participant %C in domain %d.\n"),
1496  std::string(converter).c_str(),
1497  domainId));
1498  }
1499  }
1500 
1501  if (where->second->participants().empty()
1502 #ifndef DDS_HAS_MINIMUM_BIT
1503  && !(participant->isOwner() && participant->isBitPublisher() && in_cleanup_all_built_in_topics_)
1504  // If this is false, we're running as part of cleanup_all_built_in_topics
1505  // and we can't remove the domain because we would invalid the iterator
1506  // we're using in cleanup_all_built_in_topics. cleanup_all_built_in_topics
1507  // will clear the domains once it's done.
1508 #endif
1509  ) {
1510  domains_.erase(where);
1511  }
1512 #ifndef DDS_HAS_MINIMUM_BIT
1513  else if (where->second->useBIT() &&
1514  where->second->participants().size() == 1) {
1515  // The only participant left is the one we created to publish BITs.
1516  // It can be removed now since no user participants exist in this domain,
1517  // but it has to be removed on the Service Participant's reactor thread
1518  // in order to make the locking work properly in delete_participant().
1519  BIT_Cleanup_Handler* eh_impl = new BIT_Cleanup_Handler(this, domainId);
1520  const ACE_Event_Handler_var eh = eh_impl;
1521  TheServiceParticipant->reactor()->notify(eh.handler());
1522 
1523  // Wait for that to be finished
1526  OpenDDS::DCPS::ThreadStatusManager& thread_status_manager = TheServiceParticipant->get_thread_status_manager();
1527  while (status == CvStatus_NoTimeout && !eh_impl->done_) {
1528  status = eh_impl->cv_.wait(thread_status_manager);
1529  }
1530  }
1531 #endif
1532 }
#define ACE_DEBUG(X)
Update::Manager * um_
Definition: DCPSInfo_i.h:421
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
ACE_Recursive_Thread_Mutex lock_
Definition: DCPSInfo_i.h:427
ACE_Event_Handler * handler(void) const
ACE_CDR::Boolean Boolean
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
DCPS_IR_Domain_Map domains_
Definition: DCPSInfo_i.h:414
void destroy(const IdPath &id, ItemType type, ActorType actor=DataWriter)
The wait has returned because it was woken up.
bool in_cleanup_all_built_in_topics_
Definition: DCPSInfo_i.h:450
#define TheServiceParticipant

◆ remove_publication()

void TAO_DDS_DCPSInfo_i::remove_publication ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::GUID_t participantId,
const OpenDDS::DCPS::GUID_t publicationId 
)
virtual

Definition at line 630 of file DCPSInfo_i.cpp.

References ACE_DEBUG, ACE_GUARD, ACE_TEXT(), Update::Actor, Update::DataWriter, OpenDDS::DCPS::DCPS_debug_level, Update::Manager::destroy(), domains_, in_cleanup_all_built_in_topics_, DCPS_IR_Participant::isBitPublisher(), DCPS_IR_Participant::isOwner(), LM_DEBUG, lock_, DCPS_IR_Participant::remove_publication(), and um_.

Referenced by OpenDDS::Federator::ManagerImpl::processDelete().

634 {
636 
637  // Grab the domain.
638  DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
639 
640  if (where == this->domains_.end()) {
642  }
643 
644  // Grab the participant.
645  DCPS_IR_Participant* const partPtr = where->second->participant(participantId);
646  if (!partPtr) {
648  }
649 
650  const bool in_cleanup =
651 #ifdef DDS_HAS_MINIMUM_BIT
652  false;
653 #else
655 #endif
656 
657  if (partPtr->remove_publication(publicationId) != 0) {
658  where->second->remove_dead_participants(in_cleanup);
659 
660  // throw exception because the publication was not removed!
662  }
663 
664  where->second->remove_dead_participants(in_cleanup);
665 
666  if (um_ && partPtr->isOwner() && !partPtr->isBitPublisher()) {
667  Update::IdPath path(domainId, participantId, publicationId);
669 
671  OpenDDS::DCPS::RepoIdConverter converter(publicationId);
672  ACE_DEBUG((LM_DEBUG,
673  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::remove_publication: ")
674  ACE_TEXT("pushing deletion of publication %C in domain %d.\n"),
675  std::string(converter).c_str(),
676  domainId));
677  }
678  }
679 }
#define ACE_DEBUG(X)
Update::Manager * um_
Definition: DCPSInfo_i.h:421
#define ACE_GUARD(MUTEX, OBJ, LOCK)
int remove_publication(OpenDDS::DCPS::GUID_t pubId)
ACE_Recursive_Thread_Mutex lock_
Definition: DCPSInfo_i.h:427
Representative of the Domain Participant.
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
DCPS_IR_Domain_Map domains_
Definition: DCPSInfo_i.h:414
void destroy(const IdPath &id, ItemType type, ActorType actor=DataWriter)
bool in_cleanup_all_built_in_topics_
Definition: DCPSInfo_i.h:450
bool isOwner() const
Indication of whether the current repository is the owner of this participant.

◆ remove_subscription()

void TAO_DDS_DCPSInfo_i::remove_subscription ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::GUID_t participantId,
const OpenDDS::DCPS::GUID_t subscriptionId 
)
virtual

Definition at line 962 of file DCPSInfo_i.cpp.

References ACE_DEBUG, ACE_GUARD, ACE_TEXT(), Update::Actor, Update::DataReader, OpenDDS::DCPS::DCPS_debug_level, Update::Manager::destroy(), domains_, in_cleanup_all_built_in_topics_, DCPS_IR_Participant::isBitPublisher(), DCPS_IR_Participant::isOwner(), LM_DEBUG, lock_, DCPS_IR_Participant::remove_subscription(), and um_.

Referenced by OpenDDS::Federator::ManagerImpl::processDelete().

966 {
968 
969  // Grab the domain.
970  DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
971 
972  if (where == this->domains_.end()) {
974  }
975 
976  // Grab the participant.
977  DCPS_IR_Participant* const partPtr = where->second->participant(participantId);
978  if (!partPtr) {
980  }
981 
982  if (partPtr->remove_subscription(subscriptionId) != 0) {
983  // throw exception because the subscription was not removed!
985  }
986 
987  where->second->remove_dead_participants(
988 #ifdef DDS_HAS_MINIMUM_BIT
989  false
990 #else
992 #endif
993  );
994 
995  if (um_ && partPtr->isOwner() && !partPtr->isBitPublisher()) {
996  Update::IdPath path(domainId, participantId, subscriptionId);
998 
1000  OpenDDS::DCPS::RepoIdConverter converter(subscriptionId);
1001  ACE_DEBUG((LM_DEBUG,
1002  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::remove_subscription: ")
1003  ACE_TEXT("pushing deletion of subscription %C in domain %d.\n"),
1004  std::string(converter).c_str(),
1005  domainId));
1006  }
1007  }
1008 }
#define ACE_DEBUG(X)
Update::Manager * um_
Definition: DCPSInfo_i.h:421
#define ACE_GUARD(MUTEX, OBJ, LOCK)
ACE_Recursive_Thread_Mutex lock_
Definition: DCPSInfo_i.h:427
Representative of the Domain Participant.
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
DCPS_IR_Domain_Map domains_
Definition: DCPSInfo_i.h:414
void destroy(const IdPath &id, ItemType type, ActorType actor=DataWriter)
bool in_cleanup_all_built_in_topics_
Definition: DCPSInfo_i.h:450
int remove_subscription(OpenDDS::DCPS::GUID_t subId)
bool isOwner() const
Indication of whether the current repository is the owner of this participant.

◆ remove_topic()

OpenDDS::DCPS::TopicStatus TAO_DDS_DCPSInfo_i::remove_topic ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::GUID_t participantId,
const OpenDDS::DCPS::GUID_t topicId 
)
virtual

Definition at line 325 of file DCPSInfo_i.cpp.

References ACE_DEBUG, ACE_GUARD_RETURN, ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, Update::Manager::destroy(), domains_, DCPS_IR_Participant::find_topic_reference(), OpenDDS::DCPS::INTERNAL_ERROR, DCPS_IR_Participant::isBitPublisher(), DCPS_IR_Participant::isOwner(), LM_DEBUG, lock_, Update::Topic, and um_.

Referenced by OpenDDS::Federator::ManagerImpl::processDelete().

329 {
331 
332  // Grab the domain.
333  DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
334 
335  if (where == this->domains_.end()) {
337  }
338 
339  // Grab the participant.
340  DCPS_IR_Participant* partPtr
341  = where->second->participant(participantId);
342 
343  if (0 == partPtr) {
345  }
346 
347  DCPS_IR_Topic* topic;
348 
349  if (partPtr->find_topic_reference(topicId, topic) != 0) {
351  }
352 
353  OpenDDS::DCPS::TopicStatus removedStatus = where->second->remove_topic(partPtr, topic);
354 
355  if (this->um_
356  && (partPtr->isOwner() == true)
357  && (partPtr->isBitPublisher() == false)) {
358  Update::IdPath path(domainId, participantId, topicId);
359  this->um_->destroy(path, Update::Topic);
360 
362  OpenDDS::DCPS::RepoIdConverter converter(topicId);
363  ACE_DEBUG((LM_DEBUG,
364  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::remove_topic: ")
365  ACE_TEXT("pushing deletion of topic %C in domain %d.\n"),
366  std::string(converter).c_str(),
367  domainId));
368  }
369  }
370 
371  return removedStatus;
372 }
#define ACE_DEBUG(X)
Update::Manager * um_
Definition: DCPSInfo_i.h:421
ACE_Recursive_Thread_Mutex lock_
Definition: DCPSInfo_i.h:427
Representative of a Topic.
Definition: DCPS_IR_Topic.h:43
int find_topic_reference(OpenDDS::DCPS::GUID_t topicId, DCPS_IR_Topic *&topic)
Representative of the Domain Participant.
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
DCPS_IR_Domain_Map domains_
Definition: DCPSInfo_i.h:414
void destroy(const IdPath &id, ItemType type, ActorType actor=DataWriter)
bool isOwner() const
Indication of whether the current repository is the owner of this participant.

◆ shutdown()

void TAO_DDS_DCPSInfo_i::shutdown ( void  )
virtual

Cause the entire repository to exit.

Definition at line 113 of file DCPSInfo_i.cpp.

References ShutdownInterface::shutdown(), and shutdown_.

Referenced by OpenDDS::Federator::ManagerImpl::leave_and_shutdown(), and OpenDDS::Federator::ManagerImpl::shutdown().

114 {
115  this->shutdown_->shutdown();
116 }
virtual void shutdown()=0
ShutdownInterface * shutdown_
Interface to effect shutdown of the process.
Definition: DCPSInfo_i.h:425

◆ update_domain_participant_qos()

CORBA::Boolean TAO_DDS_DCPSInfo_i::update_domain_participant_qos ( DDS::DomainId_t  domain,
const OpenDDS::DCPS::GUID_t participantId,
const DDS::DomainParticipantQos qos 
)
virtual

Definition at line 2084 of file DCPSInfo_i.cpp.

References ACE_DEBUG, ACE_GUARD_RETURN, ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, domains_, DCPS_IR_Participant::isBitPublisher(), DCPS_IR_Participant::isOwner(), LM_DEBUG, lock_, DCPS_IR_Participant::set_qos(), um_, and Update::Manager::update().

Referenced by OpenDDS::Federator::ManagerImpl::processUpdateQos1().

2088 {
2090 
2091  // Grab the domain.
2092  DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
2093 
2094  if (where == this->domains_.end()) {
2096  }
2097 
2098  // Grab the participant.
2099  DCPS_IR_Participant* partPtr
2100  = where->second->participant(participantId);
2101 
2102  if (0 == partPtr) {
2104  }
2105 
2106  if (partPtr->set_qos(qos) == false)
2107  return 0;
2108 
2109  if (this->um_
2110  && (partPtr->isOwner() == true)
2111  && (partPtr->isBitPublisher() == false)) {
2112  Update::IdPath path(domainId, participantId, participantId);
2113  this->um_->update(path, qos);
2114 
2116  OpenDDS::DCPS::RepoIdConverter converter(participantId);
2117  ACE_DEBUG((LM_DEBUG,
2118  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::update_domain_participant_qos: ")
2119  ACE_TEXT("pushing update of participant %C in domain %d.\n"),
2120  std::string(converter).c_str(),
2121  domainId));
2122  }
2123  }
2124 
2125  return 1;
2126 }
#define ACE_DEBUG(X)
Update::Manager * um_
Definition: DCPSInfo_i.h:421
void update(const IdPath &id, const QosType &qos)
ACE_Recursive_Thread_Mutex lock_
Definition: DCPSInfo_i.h:427
Representative of the Domain Participant.
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
ACE_TEXT("TCP_Factory")
bool set_qos(const DDS::DomainParticipantQos &qos)
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
DCPS_IR_Domain_Map domains_
Definition: DCPSInfo_i.h:414
bool isOwner() const
Indication of whether the current repository is the owner of this participant.

◆ update_publication_qos() [1/3]

CORBA::Boolean TAO_DDS_DCPSInfo_i::update_publication_qos ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::GUID_t partId,
const OpenDDS::DCPS::GUID_t dwId,
const DDS::DataWriterQos qos,
const DDS::PublisherQos publisherQos 
)
virtual

Definition at line 1660 of file DCPSInfo_i.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_GUARD_RETURN, ACE_TEXT(), Update::DataWriterQos, OpenDDS::DCPS::DCPS_debug_level, domains_, DCPS_IR_Participant::find_publication_reference(), DCPS_IR_Participant::isBitPublisher(), LM_DEBUG, LM_ERROR, LM_INFO, lock_, Update::NoQos, Update::PublisherQos, DCPS_IR_Publication::set_qos(), um_, and Update::Manager::update().

Referenced by OpenDDS::Federator::ManagerImpl::processUpdateQos1(), and OpenDDS::Federator::ManagerImpl::processUpdateQos2().

1666 {
1668 
1669  // Grab the domain.
1670  DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
1671 
1672  if (where == this->domains_.end()) {
1674  }
1675 
1676  // Grab the participant.
1677  DCPS_IR_Participant* partPtr
1678  = where->second->participant(partId);
1679 
1680  if (0 == partPtr) {
1682  }
1683 
1685  ACE_DEBUG((LM_INFO, "(%P|%t) updating publication qos 1\n"));
1686  }
1687 
1688  DCPS_IR_Publication* pub;
1689 
1690  if (partPtr->find_publication_reference(dwId, pub) != 0 || pub == 0) {
1691  OpenDDS::DCPS::RepoIdConverter part_converter(partId);
1692  OpenDDS::DCPS::RepoIdConverter pub_converter(dwId);
1693  ACE_ERROR((LM_ERROR,
1694  ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_publication_qos: ")
1695  ACE_TEXT("participant %C could not find publication %C.\n"),
1696  std::string(part_converter).c_str(),
1697  std::string(pub_converter).c_str()));
1699  }
1700 
1701  Update::SpecificQos qosType;
1702 
1703  if (pub->set_qos(qos, publisherQos, qosType) == false) // failed
1704  return 0;
1705 
1706  if (this->um_ && (partPtr->isBitPublisher() == false)) {
1707  Update::IdPath path(domainId, partId, dwId);
1708 
1709  switch (qosType) {
1710  case Update::DataWriterQos:
1711  this->um_->update(path, qos);
1712  break;
1713 
1714  case Update::PublisherQos:
1715  this->um_->update(path, publisherQos);
1716  break;
1717 
1718  case Update::NoQos:
1719  default:
1720  break;
1721  }
1722 
1724  OpenDDS::DCPS::RepoIdConverter converter(dwId);
1725  ACE_DEBUG((LM_DEBUG,
1726  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::update_publication_qos: ")
1727  ACE_TEXT("pushing update of publication %C in domain %d.\n"),
1728  std::string(converter).c_str(),
1729  domainId));
1730  }
1731  }
1732 
1733  return 1;
1734 }
#define ACE_DEBUG(X)
Update::Manager * um_
Definition: DCPSInfo_i.h:421
#define ACE_ERROR(X)
void update(const IdPath &id, const QosType &qos)
Representative of a Publication.
ACE_Recursive_Thread_Mutex lock_
Definition: DCPSInfo_i.h:427
Representative of the Domain Participant.
int find_publication_reference(OpenDDS::DCPS::GUID_t pubId, DCPS_IR_Publication *&pub)
Return the publication object.
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
bool set_qos(const DDS::DataWriterQos &qos, const DDS::PublisherQos &publisherQos, Update::SpecificQos &specificQos)
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
DCPS_IR_Domain_Map domains_
Definition: DCPSInfo_i.h:414

◆ update_publication_qos() [2/3]

void TAO_DDS_DCPSInfo_i::update_publication_qos ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::GUID_t partId,
const OpenDDS::DCPS::GUID_t dwId,
const DDS::DataWriterQos qos 
)

Entry for federation updates of DataWriterQos values.

Definition at line 1737 of file DCPSInfo_i.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_GUARD, ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, domains_, DCPS_IR_Participant::find_publication_reference(), LM_ERROR, LM_INFO, lock_, and DCPS_IR_Publication::set_qos().

1742 {
1744 
1745  // Grab the domain.
1746  DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
1747 
1748  if (where == this->domains_.end()) {
1750  }
1751 
1752  // Grab the participant.
1753  DCPS_IR_Participant* partPtr
1754  = where->second->participant(partId);
1755 
1756  if (0 == partPtr) {
1758  }
1759 
1761  ACE_DEBUG((LM_INFO, "(%P|%t) updating publication qos 2\n"));
1762  }
1763 
1764  DCPS_IR_Publication* pub;
1765 
1766  if (partPtr->find_publication_reference(dwId, pub) != 0 || pub == 0) {
1767  OpenDDS::DCPS::RepoIdConverter part_converter(partId);
1768  OpenDDS::DCPS::RepoIdConverter pub_converter(dwId);
1769  ACE_ERROR((LM_ERROR,
1770  ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_publication_qos: ")
1771  ACE_TEXT("participant %C could not find publication %C.\n"),
1772  std::string(part_converter).c_str(),
1773  std::string(pub_converter).c_str()));
1775  }
1776 
1777  pub->set_qos(qos);
1778 }
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
Representative of a Publication.
ACE_Recursive_Thread_Mutex lock_
Definition: DCPSInfo_i.h:427
Representative of the Domain Participant.
int find_publication_reference(OpenDDS::DCPS::GUID_t pubId, DCPS_IR_Publication *&pub)
Return the publication object.
bool set_qos(const DDS::DataWriterQos &qos, const DDS::PublisherQos &publisherQos, Update::SpecificQos &specificQos)
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
DCPS_IR_Domain_Map domains_
Definition: DCPSInfo_i.h:414

◆ update_publication_qos() [3/3]

void TAO_DDS_DCPSInfo_i::update_publication_qos ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::GUID_t partId,
const OpenDDS::DCPS::GUID_t dwId,
const DDS::PublisherQos qos 
)

Entry for federation updates of PublisherQos values.

Definition at line 1781 of file DCPSInfo_i.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_GUARD, ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, domains_, DCPS_IR_Participant::find_publication_reference(), LM_ERROR, LM_INFO, lock_, and DCPS_IR_Publication::set_qos().

1786 {
1788 
1789  // Grab the domain.
1790  DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
1791 
1792  if (where == this->domains_.end()) {
1794  }
1795 
1796  // Grab the participant.
1797  DCPS_IR_Participant* partPtr
1798  = where->second->participant(partId);
1799 
1800  if (0 == partPtr) {
1802  }
1803 
1805  ACE_DEBUG((LM_INFO, "(%P|%t) updating publication qos 3\n"));
1806  }
1807 
1808  DCPS_IR_Publication* pub;
1809 
1810  if (partPtr->find_publication_reference(dwId, pub) != 0 || pub == 0) {
1811  OpenDDS::DCPS::RepoIdConverter part_converter(partId);
1812  OpenDDS::DCPS::RepoIdConverter pub_converter(dwId);
1813  ACE_ERROR((LM_ERROR,
1814  ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_publication_qos: ")
1815  ACE_TEXT("participant %C could not find publication %C.\n"),
1816  std::string(part_converter).c_str(),
1817  std::string(pub_converter).c_str()));
1819  }
1820 
1821  pub->set_qos(qos);
1822 }
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
Representative of a Publication.
ACE_Recursive_Thread_Mutex lock_
Definition: DCPSInfo_i.h:427
Representative of the Domain Participant.
int find_publication_reference(OpenDDS::DCPS::GUID_t pubId, DCPS_IR_Publication *&pub)
Return the publication object.
bool set_qos(const DDS::DataWriterQos &qos, const DDS::PublisherQos &publisherQos, Update::SpecificQos &specificQos)
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
DCPS_IR_Domain_Map domains_
Definition: DCPSInfo_i.h:414

◆ update_subscription_params()

CORBA::Boolean TAO_DDS_DCPSInfo_i::update_subscription_params ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::GUID_t participantId,
const OpenDDS::DCPS::GUID_t subscriptionId,
const DDS::StringSeq params 
)

Definition at line 1989 of file DCPSInfo_i.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_GUARD_RETURN, ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, domain(), domains_, DCPS_IR_Participant::find_subscription_reference(), DCPS_IR_Participant::isBitPublisher(), LM_ERROR, LM_INFO, lock_, um_, Update::Manager::update(), and DCPS_IR_Subscription::update_expr_params().

Referenced by OpenDDS::Federator::ManagerImpl::processUpdateFilterExpressionParams().

1994 {
1996 
1997  DCPS_IR_Domain_Map::iterator domain = this->domains_.find(domainId);
1998  if (domain == this->domains_.end()) {
2000  }
2001 
2002  DCPS_IR_Participant* partPtr = domain->second->participant(participantId);
2003  if (0 == partPtr) {
2005  }
2006 
2008  ACE_DEBUG((LM_INFO, "(%P|%t) updating subscription params\n"));
2009  }
2010 
2011  DCPS_IR_Subscription* sub;
2012  if (partPtr->find_subscription_reference(subscriptionId, sub) != 0) {
2013  OpenDDS::DCPS::RepoIdConverter part_converter(participantId);
2014  OpenDDS::DCPS::RepoIdConverter sub_converter(subscriptionId);
2015  ACE_ERROR((LM_ERROR,
2016  ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_subscription_params: ")
2017  ACE_TEXT("participant %C could not find subscription %C.\n"),
2018  std::string(part_converter).c_str(),
2019  std::string(sub_converter).c_str()));
2021  }
2022 
2023  sub->update_expr_params(params); // calls writers via DataWriterRemote
2024 
2025  if (this->um_ && !partPtr->isBitPublisher()) {
2026  Update::IdPath path(domainId, participantId, subscriptionId);
2027  this->um_->update(path, params);
2028  }
2029 
2030  return true;
2031 }
DCPS_IR_Domain * domain(DDS::DomainId_t domain)
Convert a domain Id into a reference to a DCPS_IR_Domain object.
#define ACE_DEBUG(X)
Update::Manager * um_
Definition: DCPSInfo_i.h:421
#define ACE_ERROR(X)
void update(const IdPath &id, const QosType &qos)
int find_subscription_reference(OpenDDS::DCPS::GUID_t subId, DCPS_IR_Subscription *&sub)
Return the subscription object.
ACE_Recursive_Thread_Mutex lock_
Definition: DCPSInfo_i.h:427
Representative of the Domain Participant.
Representative of a Subscription.
void update_expr_params(const DDS::StringSeq &params)
Calls associated Publications.
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
DCPS_IR_Domain_Map domains_
Definition: DCPSInfo_i.h:414

◆ update_subscription_qos() [1/3]

CORBA::Boolean TAO_DDS_DCPSInfo_i::update_subscription_qos ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::GUID_t partId,
const OpenDDS::DCPS::GUID_t drId,
const DDS::DataReaderQos qos,
const DDS::SubscriberQos subscriberQos 
)
virtual

Definition at line 1824 of file DCPSInfo_i.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_GUARD_RETURN, ACE_TEXT(), Update::DataReaderQos, OpenDDS::DCPS::DCPS_debug_level, domains_, DCPS_IR_Participant::find_subscription_reference(), DCPS_IR_Participant::isBitPublisher(), LM_DEBUG, LM_ERROR, LM_INFO, lock_, Update::NoQos, DCPS_IR_Subscription::set_qos(), Update::SubscriberQos, um_, and Update::Manager::update().

Referenced by OpenDDS::Federator::ManagerImpl::processUpdateQos1(), and OpenDDS::Federator::ManagerImpl::processUpdateQos2().

1830 {
1832 
1833  // Grab the domain.
1834  DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
1835 
1836  if (where == this->domains_.end()) {
1838  }
1839 
1840  // Grab the participant.
1841  DCPS_IR_Participant* partPtr
1842  = where->second->participant(partId);
1843 
1844  if (0 == partPtr) {
1846  }
1847 
1848  DCPS_IR_Subscription* sub;
1849 
1851  ACE_DEBUG((LM_INFO, "(%P|%t) updating QOS for subscription 1\n"));
1852  }
1853 
1854  if (partPtr->find_subscription_reference(drId, sub) != 0 || sub == 0) {
1855  OpenDDS::DCPS::RepoIdConverter part_converter(partId);
1856  OpenDDS::DCPS::RepoIdConverter sub_converter(drId);
1857  ACE_ERROR((LM_ERROR,
1858  ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_subscription_qos: ")
1859  ACE_TEXT("participant %C could not find subscription %C.\n"),
1860  std::string(part_converter).c_str(),
1861  std::string(sub_converter).c_str()));
1863  }
1864 
1865  Update::SpecificQos qosType;
1866 
1867  if (sub->set_qos(qos, subscriberQos, qosType) == false) // failed
1868  return 0;
1869 
1870  if (this->um_ && (partPtr->isBitPublisher() == false)) {
1871  Update::IdPath path(domainId, partId, drId);
1872 
1873  switch (qosType) {
1874  case Update::DataReaderQos:
1875  this->um_->update(path, qos);
1876  break;
1877 
1878  case Update::SubscriberQos:
1879  this->um_->update(path, subscriberQos);
1880  break;
1881 
1882  case Update::NoQos:
1883  default:
1884  break;
1885  }
1886 
1888  OpenDDS::DCPS::RepoIdConverter converter(drId);
1889  ACE_DEBUG((LM_DEBUG,
1890  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::update_subscription_qos: ")
1891  ACE_TEXT("pushing update of subscription %C in domain %d.\n"),
1892  std::string(converter).c_str(),
1893  domainId));
1894  }
1895  }
1896 
1897  return 1;
1898 }
#define ACE_DEBUG(X)
Update::Manager * um_
Definition: DCPSInfo_i.h:421
#define ACE_ERROR(X)
void update(const IdPath &id, const QosType &qos)
int find_subscription_reference(OpenDDS::DCPS::GUID_t subId, DCPS_IR_Subscription *&sub)
Return the subscription object.
ACE_Recursive_Thread_Mutex lock_
Definition: DCPSInfo_i.h:427
Representative of the Domain Participant.
bool set_qos(const DDS::DataReaderQos &qos, const DDS::SubscriberQos &subscriberQos, Update::SpecificQos &specificQos)
Representative of a Subscription.
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
DCPS_IR_Domain_Map domains_
Definition: DCPSInfo_i.h:414

◆ update_subscription_qos() [2/3]

void TAO_DDS_DCPSInfo_i::update_subscription_qos ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::GUID_t partId,
const OpenDDS::DCPS::GUID_t drId,
const DDS::DataReaderQos qos 
)

Entry for federation updates of DataReaderQos values.

Definition at line 1901 of file DCPSInfo_i.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_GUARD, ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, domains_, DCPS_IR_Participant::find_subscription_reference(), LM_ERROR, LM_INFO, lock_, and DCPS_IR_Subscription::set_qos().

1906 {
1908 
1909  // Grab the domain.
1910  DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
1911 
1912  if (where == this->domains_.end()) {
1914  }
1915 
1916  // Grab the participant.
1917  DCPS_IR_Participant* partPtr
1918  = where->second->participant(partId);
1919 
1920  if (0 == partPtr) {
1922  }
1923 
1924  DCPS_IR_Subscription* sub;
1925 
1927  ACE_DEBUG((LM_INFO, "(%P|%t) updating QOS for subscription 2\n"));
1928  }
1929 
1930  if (partPtr->find_subscription_reference(drId, sub) != 0 || sub == 0) {
1931  OpenDDS::DCPS::RepoIdConverter part_converter(partId);
1932  OpenDDS::DCPS::RepoIdConverter sub_converter(drId);
1933  ACE_ERROR((LM_ERROR,
1934  ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_subscription_qos: ")
1935  ACE_TEXT("participant %C could not find subscription %C.\n"),
1936  std::string(part_converter).c_str(),
1937  std::string(sub_converter).c_str()));
1939  }
1940 
1941  sub->set_qos(qos);
1942 }
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
int find_subscription_reference(OpenDDS::DCPS::GUID_t subId, DCPS_IR_Subscription *&sub)
Return the subscription object.
ACE_Recursive_Thread_Mutex lock_
Definition: DCPSInfo_i.h:427
Representative of the Domain Participant.
bool set_qos(const DDS::DataReaderQos &qos, const DDS::SubscriberQos &subscriberQos, Update::SpecificQos &specificQos)
Representative of a Subscription.
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
DCPS_IR_Domain_Map domains_
Definition: DCPSInfo_i.h:414

◆ update_subscription_qos() [3/3]

void TAO_DDS_DCPSInfo_i::update_subscription_qos ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::GUID_t partId,
const OpenDDS::DCPS::GUID_t drId,
const DDS::SubscriberQos qos 
)

Entry for federation updates of SubscriberQos values.

Definition at line 1945 of file DCPSInfo_i.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_GUARD, ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, domains_, DCPS_IR_Participant::find_subscription_reference(), LM_ERROR, LM_INFO, lock_, and DCPS_IR_Subscription::set_qos().

1950 {
1952 
1953  // Grab the domain.
1954  DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
1955 
1956  if (where == this->domains_.end()) {
1958  }
1959 
1960  // Grab the participant.
1961  DCPS_IR_Participant* partPtr
1962  = where->second->participant(partId);
1963 
1964  if (0 == partPtr) {
1966  }
1967 
1968  DCPS_IR_Subscription* sub;
1969 
1971  ACE_DEBUG((LM_INFO, "(%P|%t) updating QOS for subscription 3\n"));
1972  }
1973 
1974  if (partPtr->find_subscription_reference(drId, sub) != 0 || sub == 0) {
1975  OpenDDS::DCPS::RepoIdConverter part_converter(partId);
1976  OpenDDS::DCPS::RepoIdConverter sub_converter(drId);
1977  ACE_ERROR((LM_ERROR,
1978  ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_subscription_qos: ")
1979  ACE_TEXT("participant %C could not find subscription %C.\n"),
1980  std::string(part_converter).c_str(),
1981  std::string(sub_converter).c_str()));
1983  }
1984 
1985  sub->set_qos(qos);
1986 }
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
int find_subscription_reference(OpenDDS::DCPS::GUID_t subId, DCPS_IR_Subscription *&sub)
Return the subscription object.
ACE_Recursive_Thread_Mutex lock_
Definition: DCPSInfo_i.h:427
Representative of the Domain Participant.
bool set_qos(const DDS::DataReaderQos &qos, const DDS::SubscriberQos &subscriberQos, Update::SpecificQos &specificQos)
Representative of a Subscription.
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
DCPS_IR_Domain_Map domains_
Definition: DCPSInfo_i.h:414

◆ update_topic_qos()

CORBA::Boolean TAO_DDS_DCPSInfo_i::update_topic_qos ( const OpenDDS::DCPS::GUID_t topicId,
DDS::DomainId_t  domainId,
const OpenDDS::DCPS::GUID_t participantId,
const DDS::TopicQos qos 
)
virtual

Definition at line 2033 of file DCPSInfo_i.cpp.

References ACE_DEBUG, ACE_GUARD_RETURN, ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, domains_, DCPS_IR_Participant::find_topic_reference(), DCPS_IR_Participant::isBitPublisher(), DCPS_IR_Participant::isOwner(), LM_DEBUG, lock_, DCPS_IR_Topic::set_topic_qos(), um_, and Update::Manager::update().

Referenced by OpenDDS::Federator::ManagerImpl::processUpdateQos1().

2038 {
2040 
2041  // Grab the domain.
2042  DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
2043 
2044  if (where == this->domains_.end()) {
2046  }
2047 
2048  // Grab the participant.
2049  DCPS_IR_Participant* partPtr
2050  = where->second->participant(participantId);
2051 
2052  if (0 == partPtr) {
2054  }
2055 
2056  DCPS_IR_Topic* topic;
2057 
2058  if (partPtr->find_topic_reference(topicId, topic) != 0) {
2060  }
2061 
2062  if (topic->set_topic_qos(qos) == false)
2063  return 0;
2064 
2065  if (this->um_
2066  && (partPtr->isOwner() == true)
2067  && (partPtr->isBitPublisher() == false)) {
2068  Update::IdPath path(domainId, participantId, topicId);
2069  this->um_->update(path, qos);
2070 
2072  OpenDDS::DCPS::RepoIdConverter converter(topicId);
2073  ACE_DEBUG((LM_DEBUG,
2074  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::update_topic_qos: ")
2075  ACE_TEXT("pushing update of topic %C in domain %d.\n"),
2076  std::string(converter).c_str(),
2077  domainId));
2078  }
2079  }
2080 
2081  return 1;
2082 }
#define ACE_DEBUG(X)
Update::Manager * um_
Definition: DCPSInfo_i.h:421
void update(const IdPath &id, const QosType &qos)
ACE_Recursive_Thread_Mutex lock_
Definition: DCPSInfo_i.h:427
Representative of a Topic.
Definition: DCPS_IR_Topic.h:43
int find_topic_reference(OpenDDS::DCPS::GUID_t topicId, DCPS_IR_Topic *&topic)
bool set_topic_qos(const DDS::TopicQos &qos)
Representative of the Domain Participant.
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
DCPS_IR_Domain_Map domains_
Definition: DCPSInfo_i.h:414
bool isOwner() const
Indication of whether the current repository is the owner of this participant.

Member Data Documentation

◆ dispatch_check_timer_id_

long TAO_DDS_DCPSInfo_i::dispatch_check_timer_id_
private

Definition at line 430 of file DCPSInfo_i.h.

Referenced by finalize(), and init_dispatchChecking().

◆ dispatchingOrb_

CORBA::ORB_var TAO_DDS_DCPSInfo_i::dispatchingOrb_
private

◆ domains_

DCPS_IR_Domain_Map TAO_DDS_DCPSInfo_i::domains_
private

◆ federation_

const TAO_DDS_DCPSFederationId& TAO_DDS_DCPSInfo_i::federation_
private

◆ in_cleanup_all_built_in_topics_

bool TAO_DDS_DCPSInfo_i::in_cleanup_all_built_in_topics_
private

◆ lock_

ACE_Recursive_Thread_Mutex TAO_DDS_DCPSInfo_i::lock_
private

◆ orb_

CORBA::ORB_var TAO_DDS_DCPSInfo_i::orb_
private

◆ participantIdGenerator_

OpenDDS::DCPS::RepoIdGenerator TAO_DDS_DCPSInfo_i::participantIdGenerator_
private

Definition at line 419 of file DCPSInfo_i.h.

Referenced by domain(), and receive_image().

◆ reassociate_timer_id_

long TAO_DDS_DCPSInfo_i::reassociate_timer_id_
private

Definition at line 429 of file DCPSInfo_i.h.

Referenced by finalize(), and init_reassociation().

◆ reincarnate_

bool TAO_DDS_DCPSInfo_i::reincarnate_
private

Definition at line 422 of file DCPSInfo_i.h.

Referenced by domain(), and init_persistence().

◆ shutdown_

ShutdownInterface* TAO_DDS_DCPSInfo_i::shutdown_
private

Interface to effect shutdown of the process.

Definition at line 425 of file DCPSInfo_i.h.

Referenced by shutdown().

◆ um_

Update::Manager* TAO_DDS_DCPSInfo_i::um_
private

The documentation for this class was generated from the following files: