9 #include "DCPS_IR_Publication.h"
11 #include "DCPS_IR_Participant.h"
12 #include "DCPS_IR_Topic.h"
13 #include "DCPS_IR_Subscription.h"
14 #include "DCPS_IR_Domain.h"
15 #include "DCPS_IR_Topic_Description.h"
16 #include "dds/DCPS/DCPS_Utils.h"
17 #include "dds/DdsDcpsInfoUtilsC.h"
18 #include "dds/DCPS/RepoIdConverter.h"
19 #include "dds/DCPS/Qos_Helper.h"
20 #include "tao/debug.h"
22 #include "ace/OS_NS_unistd.h"
29 OpenDDS::DCPS::DataWriterRemote_ptr writer,
36 participant_(participant),
42 transportContext_(transportContext),
43 publisherQos_(publisherQos),
44 serializedTypeInfo_(serializedTypeInfo)
46 writer_ = OpenDDS::DCPS::DataWriterRemote::_duplicate(writer);
82 ACE_TEXT(
"(%P|%t) DCPS_IR_Publication::add_associated_subscription:")
83 ACE_TEXT(
" publication %C adding subscription %C.\n"),
84 std::string(pub_converter).c_str(),
85 std::string(sub_converter).c_str()));
88 writer_->add_association(
id_, association, active);
92 ACE_TEXT(
"(%P|%t) DCPS_IR_Publication::add_associated_subscription: ")
93 ACE_TEXT(
"successfully added subscription %x.\n"),
98 "(%P|%t) ERROR: Exception caught in DCPS_IR_Publication::add_associated_subscription:");
110 ACE_TEXT(
"(%P|%t) ERROR: DCPS_IR_Publication::add_associated_subscription: ")
111 ACE_TEXT(
"publication %C attempted to re-add subscription %C.\n"),
112 std::string(pub_converter).c_str(),
113 std::string(sub_converter).c_str()));
121 ACE_TEXT(
"(%P|%t) ERROR: DCPS_IR_Publication::add_associated_subscription: ")
122 ACE_TEXT(
"publication %C failed to add subscription %C.\n"),
123 std::string(pub_converter).c_str(),
124 std::string(sub_converter).c_str()));
134 bool notify_both_side)
136 bool marked_dead =
false;
145 ACE_TEXT(
"(%P|%t) DCPS_IR_Publication::remove_associated_subscription:")
146 ACE_TEXT(
" calling pub %C with sub %C\n"),
155 writer_->remove_associations(idSeq, notify_lost);
157 if (notify_both_side) {
164 "(%P|%t) ERROR: Exception caught in DCPS_IR_Publication::remove_associated_subscription:");
180 ACE_TEXT(
"(%P|%t) DCPS_IR_Publication::remove_associated_subscription: ")
181 ACE_TEXT(
"publication %C removed subscription %C at %x.\n"),
182 std::string(pub_converter).c_str(),
183 std::string(sub_converter).c_str(),
191 ACE_TEXT(
"(%P|%t) ERROR: DCPS_IR_Publication::remove_associated_subscription: ")
192 ACE_TEXT(
"publication %C failed to remove subscription %C at %x.\n"),
193 std::string(pub_converter).c_str(),
194 std::string(sub_converter).c_str(),
214 if (0 < numAssociations) {
218 while (iter != end) {
239 if (0 < numAssociations) {
241 idSeq.length(static_cast<CORBA::ULong>(numAssociations));
246 while (iter != end) {
256 ACE_TEXT(
"(%P|%t) DCPS_IR_Publication::disassociate_participant: ")
257 ACE_TEXT(
"publication %C testing if subscription %C particpant %C == %C.\n"),
258 std::string(pub_converter).c_str(),
259 std::string(sub_converter).c_str(),
260 std::string(sub_part_converter).c_str(),
261 std::string(pub_part_converter).c_str()));
269 idSeq[count] = sub->
get_id();
276 ACE_TEXT(
"(%P|%t) ERROR: DCPS_IR_Publication::disassociate_participant: ")
277 ACE_TEXT(
"publication %C failed to reassociate subscription %C at %x.\n"),
278 std::string(pub_converter).c_str(),
279 std::string(sub_converter).c_str(),
291 writer_->remove_associations(idSeq, dont_notify_lost);
296 "(%P|%t) ERROR: Exception caught in DCPS_IR_Publication::disassociate_participant:");
314 if (0 < numAssociations) {
316 idSeq.length(static_cast<CORBA::ULong>(numAssociations));
321 while (iter != end) {
331 ACE_TEXT(
"(%P|%t) DCPS_IR_Publication::disassociate_topic: ")
332 ACE_TEXT(
"publication %C testing if subscription %C topic %C == %C.\n"),
333 std::string(pub_converter).c_str(),
334 std::string(sub_converter).c_str(),
335 std::string(sub_topic_converter).c_str(),
336 std::string(pub_topic_converter).c_str()));
344 idSeq[count] = sub->
get_id();
355 writer_->remove_associations(idSeq, dont_notify_lost);
360 "(%P|%t) ERROR: Exception caught in DCPS_IR_Publication::remove_associations:");
379 if (0 < numAssociations) {
381 idSeq.length(static_cast<CORBA::ULong>(numAssociations));
386 while (iter != end) {
395 ACE_TEXT(
"(%P|%t) DCPS_IR_Publication::disassociate_subscription: ")
396 ACE_TEXT(
"publication %C testing if subscription %C == %C.\n"),
397 std::string(pub_converter).c_str(),
398 std::string(sub_converter).c_str(),
399 std::string(pub_sub_converter).c_str()));
402 if (
id == sub->
get_id()) {
407 idSeq[count] = sub->
get_id();
414 ACE_TEXT(
"(%P|%t) ERROR: DCPS_IR_Publication::disassociate_subscription: ")
415 ACE_TEXT(
"publication %C failed to reassociate subscription %C at %x.\n"),
416 std::string(pub_converter).c_str(),
417 std::string(sub_converter).c_str(),
429 writer_->remove_associations(idSeq, dont_notify_lost);
434 "(%P|%t) ERROR: Exception caught in DCPS_IR_Publication::remove_associations:");
453 "(%P|%t) ERROR: Exception caught in DCPS_IR_Publication::update_incompatible_qos:");
478 using OpenDDS::DCPS::operator==;
483 if (
false == (qos == this->
qos_)) {
542 bool need_evaluate =
false;
543 bool u_dw_qos = !(
qos_ == qos);
547 need_evaluate =
true;
557 need_evaluate =
true;
639 OpenDDS::DCPS::DataWriterRemote_ptr
642 return OpenDDS::DCPS::DataWriterRemote::_duplicate(this->
writer_.in());
660 ACE_TEXT(
"(%P|%t) ERROR: DCPS_IR_Publication::reevaluate_defunct_associations: ")
661 ACE_TEXT(
"publication %C failed to reassociate subscription %C at %x.\n"),
662 std::string(pub_converter).c_str(),
663 std::string(sub_converter).c_str(),
675 while (iter != end) {
700 bool sendNotify =
true;
701 bool notify_lost =
true;
719 writer_->update_subscription_params(readerId, params);
723 "DCPS_IR_Publication::update_expr_params:");
733 #if !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT) 736 for (
int i=0; i < depth; i++)
738 std::string indent = str + prefix;
739 str +=
"DCPS_IR_Publication[";
740 str += std::string(local_converter);
746 str += indent +
"Associations [ ";
752 str += std::string(assoc_converter);
757 str += indent +
"Defunct Associations [ ";
763 str += std::string(def_converter);
767 #endif // !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT) void update_incompatible_qos()
int remove_associations(CORBA::Boolean notify_lost)
const DDS::OctetSeq & get_serialized_type_info() const
std::string dump_to_string(const std::string &prefix, int depth) const
void update_expr_params(OpenDDS::DCPS::GUID_t readerId, const DDS::StringSeq ¶ms)
DDS::OctetSeq serializedTypeInfo_
OpenDDS::DCPS::GUID_t get_participant_id()
bool set_qos(const DDS::DataWriterQos &qos, const DDS::PublisherQos &publisherQos, Update::SpecificQos &specificQos)
void set_bit_status(CORBA::Boolean isBIT)
void reevaluate_existing_associations()
DDS::InstanceHandle_t get_handle()
CORBA::Boolean is_subscription_ignored(OpenDDS::DCPS::GUID_t id)
::DDS::DataReaderQos readerQos
OpenDDS::DCPS::GUID_t get_id()
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
OpenDDS::DCPS::GUID_t get_id() const
::DDS::OctetSeq serializedTypeInfo
DCPS_IR_Publication(const OpenDDS::DCPS::GUID_t &id, DCPS_IR_Participant *participant, DCPS_IR_Topic *topic, OpenDDS::DCPS::DataWriterRemote_ptr writer, const DDS::DataWriterQos &qos, const OpenDDS::DCPS::TransportLocatorSeq &info, ACE_CDR::ULong transportContext, const DDS::PublisherQos &publisherQos, const DDS::OctetSeq &serializedTypeInfo)
std::string get_filter_class_name() const
::DDS::SubscriberQos subQos
DCPS_IR_Topic_Description * get_topic_description()
ssize_t send(ACE_HANDLE handle, const void *buf, size_t len, int flags, const ACE_Time_Value *timeout=0)
sequence< TransportLocator > TransportLocatorSeq
bool compatibleQOS(OpenDDS::DCPS::IncompatibleQosStatus *writerStatus, OpenDDS::DCPS::IncompatibleQosStatus *readerStatus, const OpenDDS::DCPS::TransportLocatorSeq &pubTLS, const OpenDDS::DCPS::TransportLocatorSeq &subTLS, DDS::DataWriterQos const *const writerQos, DDS::DataReaderQos const *const readerQos, DDS::PublisherQos const *const pubQos, DDS::SubscriberQos const *const subQos)
OpenDDS::DCPS::GUID_t get_topic_id()
OpenDDS::DCPS::GUID_t id_
void disassociate_topic(OpenDDS::DCPS::GUID_t id)
Remove any subscriptions whose topic has the id.
OpenDDS::DCPS::IncompatibleQosStatus * get_incompatibleQosStatus()
sequence< GUID_t > ReaderIdSeq
::DDS::StringSeq exprParams
OpenDDS::DCPS::TransportLocatorSeq get_transportLocatorSeq() const
std::string get_filter_expression() const
int add_associated_subscription(DCPS_IR_Subscription *sub, bool active)
CORBA::Boolean is_participant_ignored(OpenDDS::DCPS::GUID_t id)
DCPS_IR_Topic_Description * get_topic_description()
OpenDDS::DCPS::IncompatibleQosStatus * get_incompatibleQosStatus()
OpenDDS::DCPS::TransportLocatorSeq info_
int insert(const T &new_item)
Representative of a Topic.
CORBA::Boolean is_subscription_ignored(OpenDDS::DCPS::GUID_t partId, OpenDDS::DCPS::GUID_t topicId, OpenDDS::DCPS::GUID_t subId)
DCPS_IR_Participant * participant_
Representative of the Domain Participant.
DCPS_IR_Subscription_Set defunct_
bool reevaluate_association(DCPS_IR_Subscription *subscription)
OpenDDS::DCPS::IncompatibleQosStatus incompatibleQosStatus_
sequence< octet > OctetSeq
DDS::PublisherQos * get_publisher_qos()
OpenDDS::DCPS::GUID_t get_id()
void set_handle(DDS::InstanceHandle_t handle)
HANDLE_TYPE_NATIVE InstanceHandle_t
OpenDDS::DCPS::DataWriterRemote_var writer_
the corresponding DataWriterRemote object
long count_since_last_send
CORBA::Boolean is_topic_ignored(OpenDDS::DCPS::GUID_t id)
int remove_associated_publication(DCPS_IR_Publication *pub, CORBA::Boolean sendNotify, CORBA::Boolean notify_lost, bool notify_both_side=false)
DDS::PublisherQos publisherQos_
void reevaluate_associations(DCPS_IR_Publication *publication)
OpenDDS::DCPS::DataWriterRemote_ptr writer()
void publish_publication_bit(DCPS_IR_Publication *publication)
Publish Publication in the Publication Built-In Topic.
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
bool try_associate(DCPS_IR_Publication *publication, DCPS_IR_Subscription *subscription)
OpenDDS::DCPS::GUID_t get_id()
OpenDDS::DCPS::GUID_t get_participant_id()
void disassociate_subscription(OpenDDS::DCPS::GUID_t id, bool reassociate=false)
Remove any subscriptions with the id.
int remove(const T &item)
TransportLocatorSeq readerTransInfo
const DDS::SubscriberQos * get_subscriber_qos()
int find(const T &item) const
bool isOwner() const
Indication of whether the current repository is the owner of this participant.
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
Representative of a Subscription.
void reevaluate_defunct_associations()
OpenDDS::DCPS::GUID_t get_topic_id()
DCPS_IR_Domain * get_domain_reference() const
DCPS_IR_Topic * get_topic()
int remove_associated_subscription(DCPS_IR_Subscription *sub, CORBA::Boolean sendNotify, CORBA::Boolean notify_lost, bool notify_both_side=false)
DDS::InstanceHandle_t handle_
bool should_check_association_upon_change(const DDS::DataReaderQos &qos1, const DDS::DataReaderQos &qos2)
ACE_CDR::ULong get_transportContext() const
OpenDDS::DCPS::TransportLocatorSeq get_transportLocatorSeq() const
const DDS::OctetSeq & get_serialized_type_info() const
CORBA::Boolean is_alive()
DCPS_IR_Subscription_Set associations_
void _tao_print_exception(const char *info, FILE *f=stdout) const
Representative of a Topic Description.
void disassociate_participant(OpenDDS::DCPS::GUID_t id, bool reassociate=false)
Remove any subscriptions whose participant has the id.
DDS::StringSeq get_expr_params() const
unsigned long transportContext
DDS::DataWriterQos * get_datawriter_qos()
sequence< string > StringSeq
const DDS::DataReaderQos * get_datareader_qos()