10 #include "DCPS_IR_Subscription.h"
12 #include "DCPS_IR_Publication.h"
13 #include "DCPS_IR_Participant.h"
14 #include "DCPS_IR_Topic_Description.h"
15 #include "DCPS_IR_Domain.h"
16 #include "dds/DCPS/DCPS_Utils.h"
17 #include "dds/DCPS/RepoIdConverter.h"
18 #include "dds/DCPS/Qos_Helper.h"
19 #include "tao/debug.h"
21 #include "ace/OS_NS_unistd.h"
28 OpenDDS::DCPS::DataReaderRemote_ptr reader,
33 const char* filterClassName,
34 const char* filterExpression,
38 participant_(participant),
44 transportContext_(transportContext),
45 subscriberQos_(subscriberQos),
46 filterClassName_(filterClassName),
47 filterExpression_(filterExpression),
48 exprParams_(exprParams),
49 serializedTypeInfo_(serializedTypeInfo)
51 reader_ = OpenDDS::DCPS::DataReaderRemote::_duplicate(reader);
83 ACE_TEXT(
"(%P|%t) DCPS_IR_Subscription::add_associated_publication:")
84 ACE_TEXT(
" subscription %C adding publication %C.\n"),
85 std::string(sub_converter).c_str(),
86 std::string(pub_converter).c_str()));
89 reader_->add_association(
id_, association, active);
93 ACE_TEXT(
"(%P|%t) DCPS_IR_Subscription::add_associated_publication: ")
94 ACE_TEXT(
"successfully added publication %x\n"),
99 "(%P|%t) ERROR: Exception caught in DCPS_IR_Subscription::add_associated_publication:");
112 ACE_TEXT(
"(%P|%t) ERROR: DCPS_IR_Subscription::add_associated_publication: ")
113 ACE_TEXT(
"subscription %C attempted to re-add publication %C\n"),
114 std::string(sub_converter).c_str(),
115 std::string(pub_converter).c_str()));
123 ACE_TEXT(
"(%P|%t) ERROR: DCPS_IR_Subscription::add_associated_publication: ")
124 ACE_TEXT(
"subscription %C failed to add publication %C\n"),
125 std::string(sub_converter).c_str(),
126 std::string(pub_converter).c_str()));
136 bool notify_both_side)
138 bool marked_dead =
false;
148 ACE_TEXT(
"(%P|%t) DCPS_IR_Subscription::remove_associated_publication:")
149 ACE_TEXT(
" calling sub %C with pub %C\n"),
150 std::string(sub_converter).c_str(),
151 std::string(pub_converter).c_str()
159 reader_->remove_associations(idSeq, notify_lost);
161 if (notify_both_side) {
168 "(%P|%t) ERROR: Exception caught in DCPS_IR_Subscription::remove_associated_publication:");
184 ACE_TEXT(
"(%P|%t) DCPS_IR_Subscription::remove_associated_publication: ")
185 ACE_TEXT(
"subscription %C removed publication %C at %x.\n"),
186 std::string(sub_converter).c_str(),
187 std::string(pub_converter).c_str(),
195 ACE_TEXT(
"(%P|%t) ERROR: DCPS_IR_Subscription::remove_associated_publication: ")
196 ACE_TEXT(
"subscription %C failed to remove publication %C at %x.\n"),
197 std::string(sub_converter).c_str(),
198 std::string(pub_converter).c_str(),
218 if (0 < numAssociations) {
222 while (iter != end) {
245 if (0 < numAssociations) {
247 idSeq.length(static_cast<CORBA::ULong>(numAssociations));
252 while (iter != end) {
262 ACE_TEXT(
"(%P|%t) DCPS_IR_Subscription::disassociate_participant: ")
263 ACE_TEXT(
"subscription %C testing if publication %C particpant %C == %C.\n"),
264 std::string(sub_converter).c_str(),
265 std::string(pub_converter).c_str(),
266 std::string(sub_part_converter).c_str(),
267 std::string(pub_part_converter).c_str()));
275 idSeq[count] = pub->
get_id();
282 ACE_TEXT(
"(%P|%t) ERROR: DCPS_IR_Subscription::disassociate_participant: ")
283 ACE_TEXT(
"subscription %C failed to reassociate publication %C at %x.\n"),
284 std::string(sub_converter).c_str(),
285 std::string(pub_converter).c_str(),
297 reader_->remove_associations(idSeq, dont_notify_lost);
302 "(%P|%t) ERROR: Exception caught in DCPS_IR_Subscription::disassociate_participant:");
320 if (0 < numAssociations) {
322 idSeq.length(static_cast<CORBA::ULong>(numAssociations));
327 while (iter != end) {
337 ACE_TEXT(
"(%P|%t) DCPS_IR_Subscription::disassociate_topic: ")
338 ACE_TEXT(
"subscription %C testing if publication %C topic %C == %C.\n"),
339 std::string(sub_converter).c_str(),
340 std::string(pub_converter).c_str(),
341 std::string(sub_topic_converter).c_str(),
342 std::string(pub_topic_converter).c_str()));
350 idSeq[count] = pub->
get_id();
361 reader_->remove_associations(idSeq, dont_notify_lost);
366 "(%P|%t) ERROR: Exception caught in DCPS_IR_Subscription::remove_associations:");
385 if (0 < numAssociations) {
387 idSeq.length(static_cast<CORBA::ULong>(numAssociations));
392 while (iter != end) {
401 ACE_TEXT(
"(%P|%t) DCPS_IR_Subscription::disassociate_publication: ")
402 ACE_TEXT(
"subscription %C testing if publication %C == %C.\n"),
403 std::string(sub_converter).c_str(),
404 std::string(pub_converter).c_str(),
405 std::string(sub_pub_converter).c_str()));
408 if (
id == pub->
get_id()) {
413 idSeq[count] = pub->
get_id();
420 ACE_TEXT(
"(%P|%t) ERROR: DCPS_IR_Subscription::disassociate_publication: ")
421 ACE_TEXT(
"subscription %C failed to reassociate publication %C at %x.\n"),
422 std::string(sub_converter).c_str(),
423 std::string(pub_converter).c_str(),
435 reader_->remove_associations(idSeq, dont_notify_lost);
440 "(%P|%t) ERROR: Exception caught in DCPS_IR_Subscription::remove_associations:");
459 "(%P|%t) ERROR: Exception caught in DCPS_IR_Subscription::update_incompatible_qos:");
498 using OpenDDS::DCPS::operator==;
503 if (
false == (qos == this->
qos_)) {
563 bool need_evaluate =
false;
564 bool u_dr_qos = !(
qos_ == qos);
568 need_evaluate =
true;
578 need_evaluate =
true;
615 ACE_TEXT(
"(%P|%t) ERROR: DCPS_IR_Subscription::reevaluate_defunct_associations: ")
616 ACE_TEXT(
"subscription %C failed to reassociate publication %C at %x.\n"),
617 std::string(sub_converter).c_str(),
618 std::string(pub_converter).c_str(),
630 while (iter != end) {
654 bool sendNotify =
true;
655 bool notify_lost =
true;
713 OpenDDS::DCPS::DataReaderRemote_ptr
716 return OpenDDS::DCPS::DataReaderRemote::_duplicate(this->
reader_.in());
743 (*i)->update_expr_params(
id_, params);
751 #if !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT) 754 for (
int i=0; i < depth; i++)
756 std::string indent = str + prefix;
757 str +=
"DCPS_IR_Subscription[";
758 str += std::string(local_converter);
764 str += indent +
"Associations [ ";
770 str += std::string(assoc_converter);
775 str += indent +
"Defunct Associations [ ";
781 str += std::string(def_converter);
786 #endif // !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT) DCPS_IR_Publication_Set associations_
DCPS_IR_Topic * get_topic()
const DDS::OctetSeq & get_serialized_type_info() const
int remove_associations(CORBA::Boolean notify_lost)
OpenDDS::DCPS::GUID_t get_participant_id()
std::string dump_to_string(const std::string &prefix, int depth) const
ACE_CDR::ULong get_transportContext() const
OpenDDS::DCPS::GUID_t get_id()
DDS::SubscriberQos subscriberQos_
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
OpenDDS::DCPS::GUID_t get_id() const
void reevaluate_existing_associations()
std::string get_filter_class_name() const
void set_bit_status(CORBA::Boolean isBIT)
DCPS_IR_Topic_Description * get_topic_description()
ssize_t send(ACE_HANDLE handle, const void *buf, size_t len, int flags, const ACE_Time_Value *timeout=0)
DDS::OctetSeq serializedTypeInfo_
sequence< TransportLocator > TransportLocatorSeq
bool compatibleQOS(OpenDDS::DCPS::IncompatibleQosStatus *writerStatus, OpenDDS::DCPS::IncompatibleQosStatus *readerStatus, const OpenDDS::DCPS::TransportLocatorSeq &pubTLS, const OpenDDS::DCPS::TransportLocatorSeq &subTLS, DDS::DataWriterQos const *const writerQos, DDS::DataReaderQos const *const readerQos, DDS::PublisherQos const *const pubQos, DDS::SubscriberQos const *const subQos)
OpenDDS::DCPS::GUID_t get_topic_id()
void publish_subscription_bit(DCPS_IR_Subscription *subscription)
Publish Subscription in the Subscription Built-In Topic.
DDS::InstanceHandle_t handle_
DCPS_IR_Subscription(const OpenDDS::DCPS::GUID_t &id, DCPS_IR_Participant *participant, DCPS_IR_Topic *topic, OpenDDS::DCPS::DataReaderRemote_ptr reader, const DDS::DataReaderQos &qos, const OpenDDS::DCPS::TransportLocatorSeq &info, ACE_CDR::ULong transportContext, const DDS::SubscriberQos &subscriberQos, const char *filterClassName, const char *filterExpression, const DDS::StringSeq &exprParams, const DDS::OctetSeq &serializedTypeInfo)
void reevaluate_defunct_associations()
unsigned long transportContext
void disassociate_publication(OpenDDS::DCPS::GUID_t id, bool reassociate=false)
Remove any publications with id.
OpenDDS::DCPS::IncompatibleQosStatus * get_incompatibleQosStatus()
DDS::InstanceHandle_t get_handle()
OpenDDS::DCPS::TransportLocatorSeq get_transportLocatorSeq() const
std::string get_filter_expression() const
DCPS_IR_Publication_Set defunct_
OpenDDS::DCPS::IncompatibleQosStatus incompatibleQosStatus_
void set_handle(DDS::InstanceHandle_t handle)
CORBA::Boolean is_participant_ignored(OpenDDS::DCPS::GUID_t id)
void disassociate_topic(OpenDDS::DCPS::GUID_t id)
Remove any publications whose topic has the id.
OpenDDS::DCPS::IncompatibleQosStatus * get_incompatibleQosStatus()
OpenDDS::DCPS::GUID_t id_
int insert(const T &new_item)
Representative of a Topic.
CORBA::Boolean is_publication_ignored(OpenDDS::DCPS::GUID_t id)
Representative of the Domain Participant.
bool set_qos(const DDS::DataReaderQos &qos, const DDS::SubscriberQos &subscriberQos, Update::SpecificQos &specificQos)
sequence< GUID_t > WriterIdSeq
std::string filterClassName_
sequence< octet > OctetSeq
DDS::PublisherQos * get_publisher_qos()
OpenDDS::DCPS::GUID_t get_id()
OpenDDS::DCPS::TransportLocatorSeq info_
HANDLE_TYPE_NATIVE InstanceHandle_t
OpenDDS::DCPS::DataReaderRemote_ptr reader()
long count_since_last_send
CORBA::Boolean is_topic_ignored(OpenDDS::DCPS::GUID_t id)
TransportLocatorSeq writerTransInfo
int remove_associated_publication(DCPS_IR_Publication *pub, CORBA::Boolean sendNotify, CORBA::Boolean notify_lost, bool notify_both_side=false)
void reevaluate_associations(DCPS_IR_Publication *publication)
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
CORBA::Boolean is_publication_ignored(OpenDDS::DCPS::GUID_t partId, OpenDDS::DCPS::GUID_t topicId, OpenDDS::DCPS::GUID_t pubId)
bool try_associate(DCPS_IR_Publication *publication, DCPS_IR_Subscription *subscription)
OpenDDS::DCPS::GUID_t get_id()
void update_expr_params(const DDS::StringSeq ¶ms)
Calls associated Publications.
OpenDDS::DCPS::GUID_t get_participant_id()
int remove(const T &item)
const DDS::SubscriberQos * get_subscriber_qos()
void disassociate_participant(OpenDDS::DCPS::GUID_t id, bool reassociate=false)
Remove any publications whose participant has the id.
int find(const T &item) const
bool isOwner() const
Indication of whether the current repository is the owner of this participant.
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
int add_associated_publication(DCPS_IR_Publication *pub, bool active)
DCPS_IR_Topic_Description * get_topic_description()
OpenDDS::DCPS::GUID_t get_topic_id()
DCPS_IR_Domain * get_domain_reference() const
int remove_associated_subscription(DCPS_IR_Subscription *sub, CORBA::Boolean sendNotify, CORBA::Boolean notify_lost, bool notify_both_side=false)
bool should_check_association_upon_change(const DDS::DataReaderQos &qos1, const DDS::DataReaderQos &qos2)
OpenDDS::DCPS::TransportLocatorSeq get_transportLocatorSeq() const
DDS::StringSeq exprParams_
::DDS::PublisherQos pubQos
::DDS::OctetSeq serializedTypeInfo
std::string filterExpression_
const DDS::OctetSeq & get_serialized_type_info() const
CORBA::Boolean is_alive()
::DDS::DataWriterQos writerQos
void update_incompatible_qos()
void _tao_print_exception(const char *info, FILE *f=stdout) const
Representative of a Topic Description.
OpenDDS::DCPS::DataReaderRemote_var reader_
the corresponding DataReaderRemote object
Representative of a Publication.
DDS::StringSeq get_expr_params() const
DDS::DataWriterQos * get_datawriter_qos()
sequence< string > StringSeq
bool reevaluate_association(DCPS_IR_Publication *publication)
DCPS_IR_Participant * participant_
const DDS::DataReaderQos * get_datareader_qos()