32 DDS::PublisherListener_ptr a_listener,
40 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
43 domain_id_(participant->get_domain_id()),
44 participant_(*participant),
45 suspend_depth_count_(0),
47 reverse_pi_lock_(pi_lock_),
57 participant->return_handle(
handle_);
66 "%C still exist\n", leftover_entities.c_str()));
87 if (a_handle == it->second->get_instance_handle()) {
97 DDS::Topic_ptr a_topic,
99 DDS::DataWriterListener_ptr a_listener,
105 return DDS::DataWriter::_nil();
110 if (!topic_servant) {
115 ACE_TEXT(
"PublisherImpl::create_datawriter, ")
116 ACE_TEXT(
"topic_servant(topic_name=%C) is nil.\n"),
122 OpenDDS::DCPS::TypeSupport_ptr typesupport =
125 if (typesupport == 0) {
130 ACE_TEXT(
"PublisherImpl::create_datawriter, ")
131 ACE_TEXT(
"typesupport(topic_name=%C) is nil.\n"),
134 return DDS::DataWriter::_nil();
137 DDS::DataWriter_var dw_obj = typesupport->create_datawriter();
142 if (dw_servant == 0) {
146 ACE_TEXT(
"PublisherImpl::create_datawriter, ")
149 return DDS::DataWriter::_nil();
167 ACE_TEXT(
"PublisherImpl::create_datawriter, ")
170 return DDS::DataWriter::_nil();
177 return DDS::DataWriter::_duplicate(dw_obj.in());
187 "(%P|%t) PublisherImpl::delete_datawriter - dynamic cast to DataWriterImpl failed\n"));
193 DDS::Publisher_var dw_publisher(dw_servant->
get_publisher());
195 if (dw_publisher.in() !=
this) {
198 ACE_TEXT(
"(%P|%t) PublisherImpl::delete_datawriter: ")
199 ACE_TEXT(
"the data writer %C doesn't ")
200 ACE_TEXT(
"belong to this subscriber\n"),
223 publication_id = dw_servant->
get_guid();
231 ACE_TEXT(
"PublisherImpl::delete_datawriter, ")
232 ACE_TEXT(
"datawriter %C not found.\n"),
233 LogGuid(publication_id).c_str()));
243 DataWriterMap::iterator writ;
249 if (writ->second == it->second) {
288 if (!disco->remove_publication(
290 participant->get_id(),
295 ACE_TEXT(
"PublisherImpl::delete_datawriter, ")
296 ACE_TEXT(
"publication not removed from discovery.\n")));
301 participant->remove_adjust_liveliness_timers();
312 DDS::DataWriter::_nil());
322 ACE_TEXT(
"PublisherImpl::lookup_datawriter, ")
323 ACE_TEXT(
"The datawriter(topic_name=%C) is not found\n"),
327 return DDS::DataWriter::_nil();
330 return DDS::DataWriter::_duplicate(it->second.in());
339 for (DataWriterMap::iterator i =
datawriter_map_.begin(); i != end; ++i) {
358 for (DataWriterMap::iterator i =
datawriter_map_.begin(); i != end; ++i) {
399 pub_id = a_datawriter->get_guid();
410 ACE_TEXT(
"delete_contained_entities: ")
442 DwIdToQosMap idToQosMap;
453 GUID_t id = iter->second->get_guid();
454 std::pair<DwIdToQosMap::iterator, bool> pair =
455 idToQosMap.insert(DwIdToQosMap::value_type(
id, qos));
461 ACE_TEXT(
"PublisherImpl::set_qos: ")
462 ACE_TEXT(
"insert id %C to DwIdToQosMap ")
471 DwIdToQosMap::iterator iter = idToQosMap.begin();
473 while (iter != idToQosMap.end()) {
479 status = disco->update_publication_qos(
480 participant->get_domain_id(),
481 participant->get_id(),
489 ACE_TEXT(
"(%P|%t) PublisherImpl::set_qos, ")
520 listener_ = DDS::PublisherListener::_duplicate(a_listener);
524 DDS::PublisherListener_ptr
528 return DDS::PublisherListener::_duplicate(
listener_.in());
538 ACE_TEXT(
"PublisherImpl::suspend_publications, ")
539 ACE_TEXT(
" Entity is not enabled.\n")));
569 ACE_TEXT(
"PublisherImpl::resume_publications, ")
570 ACE_TEXT(
" Entity is not enabled.\n")));
575 PublicationMap publication_map_copy;
588 suspend_guard.release();
598 for (PublicationMap::const_iterator it = publication_map_copy.begin();
599 it != publication_map_copy.end(); ++it) {
600 it->second->send_suspended_data();
606 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 614 ACE_TEXT(
"(%P|%t) ERROR: PublisherImpl::begin_coherent_changes:")
615 ACE_TEXT(
" Publisher is not enabled!\n")));
623 ACE_TEXT(
"(%P|%t) ERROR: PublisherImpl::begin_coherent_changes:")
624 ACE_TEXT(
" QoS policy does not support coherent access!\n")));
647 it->second->begin_coherent_changes();
660 ACE_TEXT(
"(%P|%t) ERROR: PublisherImpl::end_coherent_changes:")
661 ACE_TEXT(
" Publisher is not enabled!\n")));
669 ACE_TEXT(
"(%P|%t) ERROR: PublisherImpl::end_coherent_changes:")
670 ACE_TEXT(
" QoS policy does not support coherent access!\n")));
683 ACE_TEXT(
"(%P|%t) ERROR: PublisherImpl::end_coherent_changes:")
684 ACE_TEXT(
" No matching call to begin_coherent_changes!\n")));
700 GroupCoherentSamples group_samples;
704 if (it->second->coherent_samples_ == 0) {
708 std::pair<GroupCoherentSamples::iterator, bool> pair =
709 group_samples.insert(GroupCoherentSamples::value_type(
710 it->second->get_guid(),
712 it->second->sequence_number_)));
717 ACE_TEXT(
"(%P|%t) ERROR: PublisherImpl::end_coherent_changes: ")
718 ACE_TEXT(
"failed to insert to GroupCoherentSamples.\n")));
726 if (it->second->coherent_samples_ == 0) {
730 it->second->end_coherent_changes(group_samples);
737 #endif // OPENDDS_NO_OBJECT_MODEL_PROFILE 746 ACE_TEXT(
"(%P|%t) ERROR: PublisherImpl::wait_for_acknowledgments, ")
747 ACE_TEXT(
"Entity is not enabled.\n")));
753 DataWriterAckMap ack_writers;
766 if (writer->should_ack()) {
769 std::pair<DataWriterAckMap::iterator, bool> pair =
770 ack_writers.insert(DataWriterAckMap::value_type(writer.
in(), token));
775 ACE_TEXT(
"(%P|%t) ERROR: PublisherImpl::wait_for_acknowledgments, ")
776 ACE_TEXT(
"Unable to insert AckToken into DataWriterAckMap!\n")));
784 if (ack_writers.empty()) {
787 ACE_TEXT(
"(%P|%t) PublisherImpl::wait_for_acknowledgments() - ")
788 ACE_TEXT(
"not blocking due to no writers requiring acks.\n")));
795 for (DataWriterAckMap::iterator it(ack_writers.begin());
796 it != ack_writers.end(); ++it) {
799 it->first->wait_for_specific_ack(token);
805 DDS::DomainParticipant_ptr
855 if (!participant || !participant->is_enabled()) {
867 DataWriterSet writers;
869 for (DataWriterSet::iterator it = writers.begin(); it != writers.end(); ++it) {
879 if (leftover_entities) {
880 leftover_entities->clear();
886 if (leftover_entities && writer_count) {
887 *leftover_entities +=
to_dds_string(writer_count) +
" writer(s)";
891 if (leftover_entities && publication_count) {
892 if (leftover_entities->size()) {
893 *leftover_entities +=
", ";
895 *leftover_entities +=
to_dds_string(publication_count) +
" publication(s)";
898 return writer_count == 0 && publication_count == 0;
914 const GUID_t publication_id = writer->get_guid();
916 std::pair<PublicationMap::iterator, bool> pair =
917 publication_map_.insert(PublicationMap::value_type(publication_id, writer));
923 ACE_TEXT(
"PublisherImpl::writer_enabled: ")
924 ACE_TEXT(
"insert publication %C failed.\n"),
925 LogGuid(publication_id).c_str()));
938 DDS::PublisherListener_ptr
952 return participant->listener_for(kind);
955 return DDS::PublisherListener::_duplicate(
listener_.in());
982 tv = std::min(tv, it->second->liveliness_check_interval(kind));
992 if (it->second->participant_liveliness_activity_after(tv)) {
1011 pubs.push_back(iter->first);
1024 DDS::Topic_ptr a_topic,
1031 ACE_TEXT(
"PublisherImpl::create_datawriter, ")
1034 return DDS::DataWriter::_nil();
1038 dw_qos = default_qos;
1042 a_topic->get_qos(topic_qos);
1043 dw_qos = default_qos;
1061 ACE_TEXT(
"PublisherImpl::create_datawriter, ")
1064 return DDS::DataWriter::_nil();
1071 ACE_TEXT(
"PublisherImpl::create_datawriter, ")
1074 return DDS::DataWriter::_nil();
virtual DDS::ReturnCode_t set_qos(const DDS::PublisherQos &qos)
virtual DDS::ReturnCode_t end_coherent_changes()
DDS::PublisherQos qos_
Publisher QoS policy list.
DataWriterMap datawriter_map_
This map is used to support datawriter lookup by topic name.
Implements the OpenDDS::DCPS::DomainParticipant interfaces.
virtual DDS::ReturnCode_t copy_from_topic_qos(DDS::DataWriterQos &a_datawriter_qos, const DDS::TopicQos &a_topic_qos)
RcHandle< T > rchandle_from(T *pointer)
DDS::ReturnCode_t assert_liveliness_by_participant()
DDS::PublisherListener_var listener_
Used to notify the entity for relevant events.
virtual DDS::ReturnCode_t get_default_datawriter_qos(DDS::DataWriterQos &qos)
DDS::ReturnCode_t set_enabled()
const ReturnCode_t RETCODE_PRECONDITION_NOT_MET
const ReturnCode_t RETCODE_INCONSISTENT_POLICY
bool participant_liveliness_activity_after(const MonotonicTimePoint &tv)
virtual DDS::ReturnCode_t delete_contained_entities()
#define OPENDDS_NO_DURABILITY_SERVICE_COMPATIBILITY_CHECK(qos, error_rtn_value)
const GUID_t GUID_UNKNOWN
Nil value for GUID.
virtual DDS::ReturnCode_t get_qos(DDS::PublisherQos &qos)
String to_dds_string(unsigned short to_convert)
virtual DDS::ReturnCode_t set_listener(DDS::PublisherListener_ptr a_listener, DDS::StatusMask mask)
static bool changeable(const DDS::UserDataQosPolicy &qos1, const DDS::UserDataQosPolicy &qos2)
PresentationQosPolicyAccessScopeKind access_scope
void set_deleted(bool state)
DDS::InstanceHandle_t handle_
virtual DDS::ReturnCode_t delete_datawriter(DDS::DataWriter_ptr a_datawriter)
virtual RcHandle< EntityImpl > parent() const
void wait_pending()
Wait for pending data and control messages to drain.
#define OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(qos, error_rtn_value)
OpenDDS::DCPS::TypeSupport_ptr get_type_support()
DDS::StatusMask listener_mask_
EntityFactoryQosPolicy entity_factory
DDS::DomainId_t domain_id_
Domain in which we are contained.
virtual DDS::DataWriter_ptr create_datawriter(DDS::Topic_ptr a_topic, const DDS::DataWriterQos &qos, DDS::DataWriterListener_ptr a_listener, DDS::StatusMask mask)
const ReturnCode_t RETCODE_IMMUTABLE_POLICY
#define OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(qos, error_rtn_value)
unique_ptr< Monitor > monitor_
Monitor object for this entity.
DDS::ReturnCode_t writer_enabled(const char *topic_name, DataWriterImpl *impl)
Implements the DDS::Topic interface.
PresentationQosPolicy presentation
Implements the OpenDDS::DCPS::DataWriterRemote interfaces and DDS::DataWriter interfaces.
static bool validate_datawriter_qos(const DDS::DataWriterQos &qos, const DDS::DataWriterQos &default_qos, DDS::Topic_ptr a_topic, DDS::DataWriterQos &dw_qos)
virtual DDS::ReturnCode_t resume_publications()
virtual DDS::ReturnCode_t set_default_datawriter_qos(const DDS::DataWriterQos &qos)
PublicationMap publication_map_
#define OPENDDS_NO_OWNERSHIP_STRENGTH_COMPATIBILITY_CHECK(qos, error_rtn_value)
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
bool is_suspended() const
virtual DDS::PublisherListener_ptr get_listener()
virtual DDS::ReturnCode_t wait_for_acknowledgments(const DDS::Duration_t &max_wait)
bool is_clean(String *leftover_entities=0) const
DDS::PublisherListener_ptr listener_for(::DDS::StatusKind kind)
std::size_t change_depth_
The number of times begin_coherent_changes as been called.
TimeDuration liveliness_check_interval(DDS::LivelinessQosPolicyKind kind)
static bool copy_from_topic_qos(DDS::DataReaderQos &a_datareader_qos, const DDS::TopicQos &a_topic_qos)
void remove_all_associations()
virtual DDS::ReturnCode_t suspend_publications()
HANDLE_TYPE_NATIVE InstanceHandle_t
const ReturnCode_t RETCODE_NOT_ENABLED
AtomicBool enabled_
The flag indicates the entity is enabled.
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
OpenDDS_Dcps_Export LogLevel log_level
void get_publication_ids(PublicationIdVec &pubs)
bool set_wait_pending_deadline(const MonotonicTimePoint &deadline)
virtual DDS::InstanceHandle_t get_instance_handle()
#define OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK(qos, error_rtn_value)
virtual DDS::Publisher_ptr get_publisher()
boolean autoenable_created_entities
virtual DDS::ReturnCode_t enable()
void init(TopicImpl *topic_servant, const DDS::DataWriterQos &qos, DDS::DataWriterListener_ptr a_listener, const DDS::StatusMask &mask, WeakRcHandle< DomainParticipantImpl > participant_servant, PublisherImpl *publisher_servant)
#define DATAWRITER_QOS_DEFAULT
const ReturnCode_t RETCODE_ERROR
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
const ReturnCode_t RETCODE_OK
CORBA::Short suspend_depth_count_
The suspend depth count.
PublisherImpl(DDS::InstanceHandle_t handle, GUID_t id, const DDS::PublisherQos &qos, DDS::PublisherListener_ptr a_listener, const DDS::StatusMask &mask, DomainParticipantImpl *participant)
const ReturnCode_t RETCODE_UNSUPPORTED
DataWriterSet writers_not_enabled_
ACE_Thread_Mutex listener_mutex_
Mutex to protect listener info.
#define DATAWRITER_QOS_USE_TOPIC_QOS
WeakRcHandle< DomainParticipantImpl > participant_
The DomainParticipant servant that owns this Publisher.
const character_type * in(void) const
static bool valid(const DDS::UserDataQosPolicy &qos)
reverse_lock_type reverse_pi_lock_
#define TheServiceParticipant
virtual DDS::ReturnCode_t enable()
static bool consistent(const DDS::ResourceLimitsQosPolicy &resource_limits, const DDS::HistoryQosPolicy &history)
The Internal API and Implementation of OpenDDS.
DDS::DataWriterQos default_datawriter_qos_
Default datawriter Qos policy list.
virtual char * get_name()
void set_wait_pending_deadline(const MonotonicTimePoint &deadline)
typedef OPENDDS_MAP(OPENDDS_STRING, OPENDDS_STRING) ValueMap
Helper types and functions for config file parsing.
bool contains_writer(DDS::InstanceHandle_t a_handle)
bool prepare_to_delete_datawriters()
#define OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(qos, error_rtn_value)
virtual DDS::DataWriter_ptr lookup_datawriter(const char *topic_name)
lock_type pi_suspended_lock_
virtual DDS::DomainParticipant_ptr get_participant()
virtual DDS::ReturnCode_t begin_coherent_changes()
lock_type pi_lock_
The recursive lock to protect datawriter map and suspend count.
static const TimeDuration max_value