OpenDDS  Snapshot(2023/04/28-20:55)
PublisherImpl.cpp
Go to the documentation of this file.
1 /*
2  * Distributed under the OpenDDS License.
3  * See: http://www.opendds.org/license.html
4  */
5 
6 #include <DCPS/DdsDcps_pch.h> // Only the _pch include should start with DCPS/
7 
8 #include "PublisherImpl.h"
9 
11 #include "DataWriterImpl.h"
12 #include "DomainParticipantImpl.h"
13 #include "DataWriterImpl.h"
14 #include "Service_Participant.h"
15 #include "Qos_Helper.h"
16 #include "GuidConverter.h"
17 #include "Marked_Default_Qos.h"
18 #include "TopicImpl.h"
19 #include "MonitorFactory.h"
23 
25 
26 namespace OpenDDS {
27 namespace DCPS {
28 
30  GUID_t id,
31  const DDS::PublisherQos& qos,
32  DDS::PublisherListener_ptr a_listener,
33  const DDS::StatusMask& mask,
34  DomainParticipantImpl* participant)
35 : handle_(handle),
36  qos_(qos),
37  default_datawriter_qos_(TheServiceParticipant->initial_DataWriterQos()),
38  listener_mask_(mask),
39  listener_(DDS::PublisherListener::_duplicate(a_listener)),
40 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
41  change_depth_(0),
42 #endif
43  domain_id_(participant->get_domain_id()),
44  participant_(*participant),
45  suspend_depth_count_(0),
46  sequence_number_(),
47  reverse_pi_lock_(pi_lock_),
48  publisher_id_(id)
49 {
50  monitor_.reset(TheServiceParticipant->monitor_factory_->create_publisher_monitor(this));
51 }
52 
54 {
55  const RcHandle<DomainParticipantImpl> participant = participant_.lock();
56  if (participant) {
57  participant->return_handle(handle_);
58  }
59 
60  // The datawriters should be deleted already before calling delete
61  // publisher.
62  String leftover_entities;
63  if (!is_clean(&leftover_entities)) {
65  ACE_ERROR((LM_WARNING, "(%P|%t) WARNING: PublisherImpl::~PublisherImpl: "
66  "%C still exist\n", leftover_entities.c_str()));
67  }
68  }
69 }
70 
73 {
74  return handle_;
75 }
76 
77 bool
79 {
81  guard,
82  this->pi_lock_,
84 
85  for (DataWriterMap::iterator it(datawriter_map_.begin());
86  it != datawriter_map_.end(); ++it) {
87  if (a_handle == it->second->get_instance_handle()) {
88  return true;
89  }
90  }
91 
92  return false;
93 }
94 
95 DDS::DataWriter_ptr
97  DDS::Topic_ptr a_topic,
98  const DDS::DataWriterQos & qos,
99  DDS::DataWriterListener_ptr a_listener,
100  DDS::StatusMask mask)
101 {
102  DDS::DataWriterQos dw_qos;
103 
104  if (!validate_datawriter_qos(qos, default_datawriter_qos_, a_topic, dw_qos)) {
105  return DDS::DataWriter::_nil();
106  }
107 
108  TopicImpl* topic_servant = dynamic_cast<TopicImpl*>(a_topic);
109 
110  if (!topic_servant) {
111  if (DCPS_debug_level > 0) {
112  CORBA::String_var name = a_topic->get_name();
114  ACE_TEXT("(%P|%t) ERROR: ")
115  ACE_TEXT("PublisherImpl::create_datawriter, ")
116  ACE_TEXT("topic_servant(topic_name=%C) is nil.\n"),
117  name.in()));
118  }
119  return 0;
120  }
121 
122  OpenDDS::DCPS::TypeSupport_ptr typesupport =
123  topic_servant->get_type_support();
124 
125  if (typesupport == 0) {
126  if (DCPS_debug_level > 0) {
127  CORBA::String_var name = topic_servant->get_name();
129  ACE_TEXT("(%P|%t) ERROR: ")
130  ACE_TEXT("PublisherImpl::create_datawriter, ")
131  ACE_TEXT("typesupport(topic_name=%C) is nil.\n"),
132  name.in()));
133  }
134  return DDS::DataWriter::_nil();
135  }
136 
137  DDS::DataWriter_var dw_obj = typesupport->create_datawriter();
138 
139  DataWriterImpl* dw_servant =
140  dynamic_cast <DataWriterImpl*>(dw_obj.in());
141 
142  if (dw_servant == 0) {
143  if (DCPS_debug_level > 0) {
145  ACE_TEXT("(%P|%t) ERROR: ")
146  ACE_TEXT("PublisherImpl::create_datawriter, ")
147  ACE_TEXT("servant is nil.\n")));
148  }
149  return DDS::DataWriter::_nil();
150  }
151 
152  dw_servant->init(
153  topic_servant,
154  dw_qos,
155  a_listener,
156  mask,
157  participant_,
158  this);
159 
161  const DDS::ReturnCode_t ret = dw_servant->enable();
162 
163  if (ret != DDS::RETCODE_OK) {
164  if (DCPS_debug_level > 0) {
166  ACE_TEXT("(%P|%t) WARNING: ")
167  ACE_TEXT("PublisherImpl::create_datawriter, ")
168  ACE_TEXT("enable failed.\n")));
169  }
170  return DDS::DataWriter::_nil();
171  }
172  } else {
174  writers_not_enabled_.insert(rchandle_from(dw_servant));
175  }
176 
177  return DDS::DataWriter::_duplicate(dw_obj.in());
178 }
179 
181 PublisherImpl::delete_datawriter(DDS::DataWriter_ptr a_datawriter)
182 {
183  DataWriterImpl* dw_servant = dynamic_cast<DataWriterImpl*>(a_datawriter);
184  if (!dw_servant) {
185  if (DCPS_debug_level > 0) {
187  "(%P|%t) PublisherImpl::delete_datawriter - dynamic cast to DataWriterImpl failed\n"));
188  }
189  return DDS::RETCODE_ERROR;
190  }
191 
192  {
193  DDS::Publisher_var dw_publisher(dw_servant->get_publisher());
194 
195  if (dw_publisher.in() != this) {
196  if (DCPS_debug_level > 0) {
198  ACE_TEXT("(%P|%t) PublisherImpl::delete_datawriter: ")
199  ACE_TEXT("the data writer %C doesn't ")
200  ACE_TEXT("belong to this subscriber\n"),
201  LogGuid(dw_servant->get_guid()).c_str()));
202  }
204  }
205  }
206 
207  if (!dw_servant->get_deleted()) {
208  dw_servant->prepare_to_delete();
209  dw_servant->set_wait_pending_deadline(TheServiceParticipant->new_pending_timeout_deadline());
210  }
211 
212  // Wait for any data and control messages to be transported during
213  // unregistering of instances.
214  dw_servant->wait_pending();
215 
216  GUID_t publication_id = GUID_UNKNOWN;
217  {
219  guard,
220  this->pi_lock_,
222 
223  publication_id = dw_servant->get_guid();
224 
225  PublicationMap::iterator it = publication_map_.find(publication_id);
226 
227  if (it == publication_map_.end()) {
228  if (DCPS_debug_level > 0) {
230  ACE_TEXT("(%P|%t) ERROR: ")
231  ACE_TEXT("PublisherImpl::delete_datawriter, ")
232  ACE_TEXT("datawriter %C not found.\n"),
233  LogGuid(publication_id).c_str()));
234  }
235  return DDS::RETCODE_ERROR;
236  }
237 
238  // We can not erase the datawriter from datawriter map by the topic name
239  // because the map might have multiple datawriters with the same topic
240  // name.
241  // Find the iterator to the datawriter in the datawriter map and erase
242  // by the iterator.
243  DataWriterMap::iterator writ;
244  DataWriterMap::iterator the_writ = datawriter_map_.end();
245 
246  for (writ = datawriter_map_.begin();
247  writ != datawriter_map_.end();
248  ++writ) {
249  if (writ->second == it->second) {
250  the_writ = writ;
251  break;
252  }
253  }
254 
255  if (the_writ != datawriter_map_.end()) {
256  datawriter_map_.erase(the_writ);
257  }
258 
259  publication_map_.erase(it);
260 
261  // not just unregister but remove any pending writes/sends.
262  dw_servant->unregister_all();
263 
264  // Release pi_lock_ before making call to transport layer to avoid
265  // some deadlock situations that threads acquire locks(PublisherImpl
266  // pi_lock_, TransportClient reservation_lock and TransportImpl
267  // lock_) in reverse order.
268  ACE_GUARD_RETURN(reverse_lock_type, reverse_monitor, this->reverse_pi_lock_,
270  // Wait for pending samples to drain prior to removing associations
271  // and unregistering the publication.
272  dw_servant->wait_pending();
273 
274  // Call remove association before unregistering the datawriter
275  // with the transport, otherwise some callbacks resulted from
276  // remove_association may lost.
277  dw_servant->remove_all_associations();
278  dw_servant->cleanup();
279  }
280 
281  if (this->monitor_) {
282  this->monitor_->report();
283  }
284 
285  RcHandle<DomainParticipantImpl> participant = this->participant_.lock();
286 
287  Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
288  if (!disco->remove_publication(
289  this->domain_id_,
290  participant->get_id(),
291  publication_id)) {
292  if (DCPS_debug_level > 0) {
294  ACE_TEXT("(%P|%t) ERROR: ")
295  ACE_TEXT("PublisherImpl::delete_datawriter, ")
296  ACE_TEXT("publication not removed from discovery.\n")));
297  }
298  return DDS::RETCODE_ERROR;
299  }
300 
301  participant->remove_adjust_liveliness_timers();
302 
303  return DDS::RETCODE_OK;
304 }
305 
306 DDS::DataWriter_ptr
307 PublisherImpl::lookup_datawriter(const char* topic_name)
308 {
310  guard,
311  this->pi_lock_,
312  DDS::DataWriter::_nil());
313 
314  // If multiple entries whose key is "topic_name" then which one is
315  // returned ? Spec does not limit which one should give.
316  DataWriterMap::iterator it = datawriter_map_.find(topic_name);
317 
318  if (it == datawriter_map_.end()) {
319  if (DCPS_debug_level >= 2) {
321  ACE_TEXT("(%P|%t) ")
322  ACE_TEXT("PublisherImpl::lookup_datawriter, ")
323  ACE_TEXT("The datawriter(topic_name=%C) is not found\n"),
324  topic_name));
325  }
326 
327  return DDS::DataWriter::_nil();
328 
329  } else {
330  return DDS::DataWriter::_duplicate(it->second.in());
331  }
332 }
333 
335 {
337  bool result = true;
338  const DataWriterMap::iterator end = datawriter_map_.end();
339  for (DataWriterMap::iterator i = datawriter_map_.begin(); i != end; ++i) {
340  DataWriterImpl* const writer = dynamic_cast<DataWriterImpl*>(i->second.in());
341  if (writer) {
342  if (!writer->get_deleted()) {
343  writer->prepare_to_delete();
344  }
345  } else {
346  result = false;
347  }
348  }
349 
350  return result;
351 }
352 
354 {
356  bool result = true;
357  const DataWriterMap::iterator end = datawriter_map_.end();
358  for (DataWriterMap::iterator i = datawriter_map_.begin(); i != end; ++i) {
359  DataWriterImpl* const writer = dynamic_cast<DataWriterImpl*>(i->second.in());
360  if (writer) {
361  writer->set_wait_pending_deadline(deadline);
362  } else {
363  result = false;
364  }
365  }
366  return result;
367 }
368 
370 {
371  // If the call isn't part of another delete, prepare the datawriters to be
372  // deleted and set the pending deadline on all the writers.
373  if (!get_deleted()) {
374  // mark that the entity is being deleted
375  set_deleted(true);
376 
378  return DDS::RETCODE_ERROR;
379  }
380  if (!set_wait_pending_deadline(TheServiceParticipant->new_pending_timeout_deadline())) {
381  return DDS::RETCODE_ERROR;
382  }
383  }
384 
385  while (true) {
386  GUID_t pub_id = GUID_UNKNOWN;
387  DataWriterImpl_rch a_datawriter;
388 
389  {
391  guard,
392  this->pi_lock_,
394 
395  if (datawriter_map_.empty()) {
396  break;
397  } else {
398  a_datawriter = datawriter_map_.begin()->second;
399  pub_id = a_datawriter->get_guid();
400  }
401  }
402 
403  const DDS::ReturnCode_t ret = delete_datawriter(a_datawriter.in());
404 
405  if (ret != DDS::RETCODE_OK) {
406  if (DCPS_debug_level > 0) {
408  ACE_TEXT("(%P|%t) ERROR: ")
409  ACE_TEXT("PublisherImpl::")
410  ACE_TEXT("delete_contained_entities: ")
411  ACE_TEXT("failed to delete ")
412  ACE_TEXT("datawriter %C.\n"),
413  LogGuid(pub_id).c_str()));
414  }
415  return ret;
416  }
417  }
418 
419  // the publisher can now start creating new publications
420  set_deleted(false);
421 
422  return DDS::RETCODE_OK;
423 }
424 
427 {
428 
430 
431  if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
432  if (qos_ == qos)
433  return DDS::RETCODE_OK;
434 
435  // for the not changeable qos, it can be changed before enable
436  if (!Qos_Helper::changeable(qos_, qos) && enabled_) {
438 
439  } else {
440  qos_ = qos;
441 
442  DwIdToQosMap idToQosMap;
443  {
445  guard,
446  this->pi_lock_,
448 
449  for (PublicationMap::iterator iter = publication_map_.begin();
450  iter != publication_map_.end();
451  ++iter) {
452  DDS::DataWriterQos qos = iter->second->qos_;
453  GUID_t id = iter->second->get_guid();
454  std::pair<DwIdToQosMap::iterator, bool> pair =
455  idToQosMap.insert(DwIdToQosMap::value_type(id, qos));
456 
457  if (!pair.second) {
458  if (DCPS_debug_level > 0) {
460  ACE_TEXT("(%P|%t) ")
461  ACE_TEXT("PublisherImpl::set_qos: ")
462  ACE_TEXT("insert id %C to DwIdToQosMap ")
463  ACE_TEXT("failed.\n"),
464  LogGuid(id).c_str()));
465  }
466  return DDS::RETCODE_ERROR;
467  }
468  }
469  }
470 
471  DwIdToQosMap::iterator iter = idToQosMap.begin();
472 
473  while (iter != idToQosMap.end()) {
474  Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
475  bool status = false;
476 
477  RcHandle<DomainParticipantImpl> participant = this->participant_.lock();
478  if (participant)
479  status = disco->update_publication_qos(
480  participant->get_domain_id(),
481  participant->get_id(),
482  iter->first,
483  iter->second,
484  this->qos_);
485 
486  if (!status) {
487  if (DCPS_debug_level > 0) {
489  ACE_TEXT("(%P|%t) PublisherImpl::set_qos, ")
490  ACE_TEXT("failed.\n")));
491  }
492  return DDS::RETCODE_ERROR;
493  }
494 
495  ++iter;
496  }
497  }
498 
499  return DDS::RETCODE_OK;
500 
501  } else {
503  }
504 }
505 
508 {
509  qos = qos_;
510  return DDS::RETCODE_OK;
511 }
512 
514 PublisherImpl::set_listener(DDS::PublisherListener_ptr a_listener,
515  DDS::StatusMask mask)
516 {
518  listener_mask_ = mask;
519  //note: OK to duplicate a nil object ref
520  listener_ = DDS::PublisherListener::_duplicate(a_listener);
521  return DDS::RETCODE_OK;
522 }
523 
524 DDS::PublisherListener_ptr
526 {
528  return DDS::PublisherListener::_duplicate(listener_.in());
529 }
530 
533 {
534  if (!enabled_) {
535  if (DCPS_debug_level > 0) {
537  ACE_TEXT("(%P|%t) ERROR: ")
538  ACE_TEXT("PublisherImpl::suspend_publications, ")
539  ACE_TEXT(" Entity is not enabled.\n")));
540  }
542  }
543 
545  suspend_guard,
546  this->pi_suspended_lock_,
549  return DDS::RETCODE_OK;
550 }
551 
552 bool
554 {
556  suspend_guard,
557  this->pi_suspended_lock_,
558  false);
559  return suspend_depth_count_;
560 }
561 
564 {
565  if (!enabled_) {
566  if (DCPS_debug_level > 0) {
568  ACE_TEXT("(%P|%t) ERROR: ")
569  ACE_TEXT("PublisherImpl::resume_publications, ")
570  ACE_TEXT(" Entity is not enabled.\n")));
571  }
573  }
574 
575  PublicationMap publication_map_copy;
576  {
578  suspend_guard,
579  this->pi_suspended_lock_,
582 
583  if (suspend_depth_count_ < 0) {
586  }
587  if (suspend_depth_count_ == 0) {
588  suspend_guard.release();
590  guard,
591  this->pi_lock_,
593 
594  publication_map_copy = publication_map_;
595  }
596  }
597 
598  for (PublicationMap::const_iterator it = publication_map_copy.begin();
599  it != publication_map_copy.end(); ++it) {
600  it->second->send_suspended_data();
601  }
602 
603  return DDS::RETCODE_OK;
604 }
605 
606 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
607 
610 {
611  if (!enabled_) {
612  if (DCPS_debug_level > 0) {
614  ACE_TEXT("(%P|%t) ERROR: PublisherImpl::begin_coherent_changes:")
615  ACE_TEXT(" Publisher is not enabled!\n")));
616  }
618  }
619 
621  if (DCPS_debug_level > 0) {
623  ACE_TEXT("(%P|%t) ERROR: PublisherImpl::begin_coherent_changes:")
624  ACE_TEXT(" QoS policy does not support coherent access!\n")));
625  }
626  return DDS::RETCODE_ERROR;
627  }
628 
630  guard,
631  this->pi_lock_,
633 
634  ++this->change_depth_;
635 
637  // INSTANCE access scope essentially behaves
638  // as a no-op. (see: 7.1.3.6)
639  return DDS::RETCODE_OK;
640  }
641 
642  // We should only notify publications on the first
643  // and last change to the current change set:
644  if (this->change_depth_ == 1) {
645  for (PublicationMap::iterator it = this->publication_map_.begin();
646  it != this->publication_map_.end(); ++it) {
647  it->second->begin_coherent_changes();
648  }
649  }
650 
651  return DDS::RETCODE_OK;
652 }
653 
656 {
657  if (!enabled_) {
658  if (DCPS_debug_level > 0) {
660  ACE_TEXT("(%P|%t) ERROR: PublisherImpl::end_coherent_changes:")
661  ACE_TEXT(" Publisher is not enabled!\n")));
662  }
664  }
665 
667  if (DCPS_debug_level > 0) {
669  ACE_TEXT("(%P|%t) ERROR: PublisherImpl::end_coherent_changes:")
670  ACE_TEXT(" QoS policy does not support coherent access!\n")));
671  }
672  return DDS::RETCODE_ERROR;
673  }
674 
676  guard,
677  this->pi_lock_,
679 
680  if (this->change_depth_ == 0) {
681  if (DCPS_debug_level > 0) {
683  ACE_TEXT("(%P|%t) ERROR: PublisherImpl::end_coherent_changes:")
684  ACE_TEXT(" No matching call to begin_coherent_changes!\n")));
685  }
687  }
688 
689  --this->change_depth_;
690 
692  // INSTANCE access scope essentially behaves
693  // as a no-op. (see: 7.1.3.6)
694  return DDS::RETCODE_OK;
695  }
696 
697  // We should only notify publications on the first
698  // and last change to the current change set:
699  if (this->change_depth_ == 0) {
700  GroupCoherentSamples group_samples;
701  for (PublicationMap::iterator it = this->publication_map_.begin();
702  it != this->publication_map_.end(); ++it) {
703 
704  if (it->second->coherent_samples_ == 0) {
705  continue;
706  }
707 
708  std::pair<GroupCoherentSamples::iterator, bool> pair =
709  group_samples.insert(GroupCoherentSamples::value_type(
710  it->second->get_guid(),
711  WriterCoherentSample(it->second->coherent_samples_,
712  it->second->sequence_number_)));
713 
714  if (!pair.second) {
715  if (DCPS_debug_level > 0) {
717  ACE_TEXT("(%P|%t) ERROR: PublisherImpl::end_coherent_changes: ")
718  ACE_TEXT("failed to insert to GroupCoherentSamples.\n")));
719  }
720  return DDS::RETCODE_ERROR;
721  }
722  }
723 
724  for (PublicationMap::iterator it = this->publication_map_.begin();
725  it != this->publication_map_.end(); ++it) {
726  if (it->second->coherent_samples_ == 0) {
727  continue;
728  }
729 
730  it->second->end_coherent_changes(group_samples);
731  }
732  }
733 
734  return DDS::RETCODE_OK;
735 }
736 
737 #endif // OPENDDS_NO_OBJECT_MODEL_PROFILE
738 
741  const DDS::Duration_t& max_wait)
742 {
743  if (!enabled_) {
744  if (DCPS_debug_level > 0) {
746  ACE_TEXT("(%P|%t) ERROR: PublisherImpl::wait_for_acknowledgments, ")
747  ACE_TEXT("Entity is not enabled.\n")));
748  }
750  }
751 
752  typedef OPENDDS_MAP(DataWriterImpl*, DataWriterImpl::AckToken) DataWriterAckMap;
753  DataWriterAckMap ack_writers;
754  {
756  guard,
757  this->pi_lock_,
759 
760  // Collect writers to request acks
761  for (DataWriterMap::iterator it(this->datawriter_map_.begin());
762  it != this->datawriter_map_.end(); ++it) {
763  DataWriterImpl_rch writer = it->second;
764  if (writer->qos_.reliability.kind != DDS::RELIABLE_RELIABILITY_QOS)
765  continue;
766  if (writer->should_ack()) {
767  DataWriterImpl::AckToken token = writer->create_ack_token(max_wait);
768 
769  std::pair<DataWriterAckMap::iterator, bool> pair =
770  ack_writers.insert(DataWriterAckMap::value_type(writer.in(), token));
771 
772  if (!pair.second) {
773  if (DCPS_debug_level > 0) {
775  ACE_TEXT("(%P|%t) ERROR: PublisherImpl::wait_for_acknowledgments, ")
776  ACE_TEXT("Unable to insert AckToken into DataWriterAckMap!\n")));
777  }
778  return DDS::RETCODE_ERROR;
779  }
780  }
781  }
782  }
783 
784  if (ack_writers.empty()) {
785  if (DCPS_debug_level > 0) {
787  ACE_TEXT("(%P|%t) PublisherImpl::wait_for_acknowledgments() - ")
788  ACE_TEXT("not blocking due to no writers requiring acks.\n")));
789  }
790 
791  return DDS::RETCODE_OK;
792  }
793 
794  // Wait for ack responses from all associated readers
795  for (DataWriterAckMap::iterator it(ack_writers.begin());
796  it != ack_writers.end(); ++it) {
797  DataWriterImpl::AckToken token = it->second;
798 
799  it->first->wait_for_specific_ack(token);
800  }
801 
802  return DDS::RETCODE_OK;
803 }
804 
805 DDS::DomainParticipant_ptr
807 {
808  return participant_.lock()._retn();
809 }
810 
813 {
814  if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
816  return DDS::RETCODE_OK;
817 
818  } else {
820  }
821 }
822 
825 {
827  return DDS::RETCODE_OK;
828 }
829 
832  const DDS::TopicQos & a_topic_qos)
833 {
834  if (Qos_Helper::copy_from_topic_qos(a_datawriter_qos, a_topic_qos)) {
835  return DDS::RETCODE_OK;
836  } else {
838  }
839 }
840 
843 {
844  //According spec:
845  // - Calling enable on an already enabled Entity returns OK and has no
846  // effect.
847  // - Calling enable on an Entity whose factory is not enabled will fail
848  // and return PRECONDITION_NOT_MET.
849 
850  if (this->is_enabled()) {
851  return DDS::RETCODE_OK;
852  }
853 
854  RcHandle<DomainParticipantImpl> participant = this->participant_.lock();
855  if (!participant || !participant->is_enabled()) {
857  }
858 
859  if (this->monitor_) {
860  this->monitor_->report();
861  }
862 
863  this->set_enabled();
864 
867  DataWriterSet writers;
868  writers_not_enabled_.swap(writers);
869  for (DataWriterSet::iterator it = writers.begin(); it != writers.end(); ++it) {
870  (*it)->enable();
871  }
872  }
873 
874  return DDS::RETCODE_OK;
875 }
876 
877 bool PublisherImpl::is_clean(String* leftover_entities) const
878 {
879  if (leftover_entities) {
880  leftover_entities->clear();
881  }
882 
884 
885  const size_t writer_count = datawriter_map_.size();
886  if (leftover_entities && writer_count) {
887  *leftover_entities += to_dds_string(writer_count) + " writer(s)";
888  }
889 
890  const size_t publication_count = publication_map_.size();
891  if (leftover_entities && publication_count) {
892  if (leftover_entities->size()) {
893  *leftover_entities += ", ";
894  }
895  *leftover_entities += to_dds_string(publication_count) + " publication(s)";
896  }
897 
898  return writer_count == 0 && publication_count == 0;
899 }
900 
902 PublisherImpl::writer_enabled(const char* topic_name,
903  DataWriterImpl* writer_ptr)
904 {
906  guard,
907  this->pi_lock_,
909  DataWriterImpl_rch writer = rchandle_from(writer_ptr);
910  writers_not_enabled_.erase(writer);
911 
912  datawriter_map_.insert(DataWriterMap::value_type(topic_name, writer));
913 
914  const GUID_t publication_id = writer->get_guid();
915 
916  std::pair<PublicationMap::iterator, bool> pair =
917  publication_map_.insert(PublicationMap::value_type(publication_id, writer));
918 
919  if (!pair.second) {
920  if (DCPS_debug_level > 0) {
922  ACE_TEXT("(%P|%t) ERROR: ")
923  ACE_TEXT("PublisherImpl::writer_enabled: ")
924  ACE_TEXT("insert publication %C failed.\n"),
925  LogGuid(publication_id).c_str()));
926  }
927  return DDS::RETCODE_ERROR;
928  }
929 
930  if (this->monitor_) {
931  this->monitor_->report();
932  }
933 
934  return DDS::RETCODE_OK;
935 }
936 
937 
938 DDS::PublisherListener_ptr
940 {
941  // per 2.1.4.3.1 Listener Access to Plain Communication Status
942  // use this entities factory if listener is mask not enabled
943  // for this kind.
944  RcHandle<DomainParticipantImpl> participant = this->participant_.lock();
945 
946  if (!participant)
947  return 0;
948 
950  if (CORBA::is_nil(listener_.in()) || (listener_mask_ & kind) == 0) {
951  g.release();
952  return participant->listener_for(kind);
953 
954  } else {
955  return DDS::PublisherListener::_duplicate(listener_.in());
956  }
957 }
958 
961 {
963 
964  for (DataWriterMap::iterator it(datawriter_map_.begin());
965  it != datawriter_map_.end(); ++it) {
966  const DDS::ReturnCode_t dw_ret = it->second->assert_liveliness_by_participant();
967 
968  if (dw_ret != DDS::RETCODE_OK) {
969  ret = dw_ret;
970  }
971  }
972 
973  return ret;
974 }
975 
978 {
980  for (DataWriterMap::iterator it(datawriter_map_.begin());
981  it != datawriter_map_.end(); ++it) {
982  tv = std::min(tv, it->second->liveliness_check_interval(kind));
983  }
984  return tv;
985 }
986 
987 bool
989 {
990  for (DataWriterMap::iterator it(datawriter_map_.begin());
991  it != datawriter_map_.end(); ++it) {
992  if (it->second->participant_liveliness_activity_after(tv)) {
993  return true;
994  }
995  }
996  return false;
997 }
998 
999 void
1000 PublisherImpl::get_publication_ids(PublicationIdVec& pubs)
1001 {
1003  guard,
1004  this->pi_lock_,
1005  );
1006 
1007  pubs.reserve(publication_map_.size());
1008  for (PublicationMap::iterator iter = publication_map_.begin();
1009  iter != publication_map_.end();
1010  ++iter) {
1011  pubs.push_back(iter->first);
1012  }
1013 }
1014 
1017 {
1018  return this->participant_.lock();
1019 }
1020 
1021 bool
1023  const DDS::DataWriterQos& default_qos,
1024  DDS::Topic_ptr a_topic,
1025  DDS::DataWriterQos& dw_qos)
1026 {
1027  if (CORBA::is_nil(a_topic)) {
1028  if (DCPS_debug_level > 0) {
1030  ACE_TEXT("(%P|%t) ERROR: ")
1031  ACE_TEXT("PublisherImpl::create_datawriter, ")
1032  ACE_TEXT("topic is nil.\n")));
1033  }
1034  return DDS::DataWriter::_nil();
1035  }
1036 
1037  if (qos == DATAWRITER_QOS_DEFAULT) {
1038  dw_qos = default_qos;
1039 
1040  } else if (qos == DATAWRITER_QOS_USE_TOPIC_QOS) {
1041  DDS::TopicQos topic_qos;
1042  a_topic->get_qos(topic_qos);
1043  dw_qos = default_qos;
1044 
1045  Qos_Helper::copy_from_topic_qos(dw_qos, topic_qos);
1046 
1047  } else {
1048  dw_qos = qos;
1049  }
1050 
1051  OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(dw_qos, DDS::DataWriter::_nil());
1052  OPENDDS_NO_OWNERSHIP_STRENGTH_COMPATIBILITY_CHECK(dw_qos, DDS::DataWriter::_nil());
1053  OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(dw_qos, DDS::DataWriter::_nil());
1054  OPENDDS_NO_DURABILITY_SERVICE_COMPATIBILITY_CHECK(dw_qos, DDS::DataWriter::_nil());
1056 
1057  if (!Qos_Helper::valid(dw_qos)) {
1058  if (DCPS_debug_level > 0) {
1060  ACE_TEXT("(%P|%t) ERROR: ")
1061  ACE_TEXT("PublisherImpl::create_datawriter, ")
1062  ACE_TEXT("invalid qos.\n")));
1063  }
1064  return DDS::DataWriter::_nil();
1065  }
1066 
1067  if (!Qos_Helper::consistent(dw_qos)) {
1068  if (DCPS_debug_level > 0) {
1070  ACE_TEXT("(%P|%t) ERROR: ")
1071  ACE_TEXT("PublisherImpl::create_datawriter, ")
1072  ACE_TEXT("inconsistent qos.\n")));
1073  }
1074  return DDS::DataWriter::_nil();
1075  }
1076  return true;
1077 }
1078 
1079 } // namespace DCPS
1080 } // namespace OpenDDS
virtual DDS::ReturnCode_t set_qos(const DDS::PublisherQos &qos)
virtual DDS::ReturnCode_t end_coherent_changes()
DDS::PublisherQos qos_
Publisher QoS policy list.
DataWriterMap datawriter_map_
This map is used to support datawriter lookup by topic name.
Implements the OpenDDS::DCPS::DomainParticipant interfaces.
#define ACE_DEBUG(X)
virtual DDS::ReturnCode_t copy_from_topic_qos(DDS::DataWriterQos &a_datawriter_qos, const DDS::TopicQos &a_topic_qos)
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
DDS::ReturnCode_t assert_liveliness_by_participant()
DDS::PublisherListener_var listener_
Used to notify the entity for relevant events.
#define ACE_ERROR(X)
virtual DDS::ReturnCode_t get_default_datawriter_qos(DDS::DataWriterQos &qos)
DDS::ReturnCode_t set_enabled()
Definition: EntityImpl.cpp:40
const ReturnCode_t RETCODE_PRECONDITION_NOT_MET
std::string String
const ReturnCode_t RETCODE_INCONSISTENT_POLICY
bool participant_liveliness_activity_after(const MonotonicTimePoint &tv)
virtual DDS::ReturnCode_t delete_contained_entities()
#define OPENDDS_NO_DURABILITY_SERVICE_COMPATIBILITY_CHECK(qos, error_rtn_value)
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
virtual DDS::ReturnCode_t get_qos(DDS::PublisherQos &qos)
String to_dds_string(unsigned short to_convert)
virtual DDS::ReturnCode_t set_listener(DDS::PublisherListener_ptr a_listener, DDS::StatusMask mask)
static bool changeable(const DDS::UserDataQosPolicy &qos1, const DDS::UserDataQosPolicy &qos2)
Definition: Qos_Helper.inl:948
PresentationQosPolicyAccessScopeKind access_scope
void set_deleted(bool state)
Definition: EntityImpl.cpp:83
DDS::InstanceHandle_t handle_
virtual DDS::ReturnCode_t delete_datawriter(DDS::DataWriter_ptr a_datawriter)
virtual RcHandle< EntityImpl > parent() const
void wait_pending()
Wait for pending data and control messages to drain.
#define OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(qos, error_rtn_value)
int release(void)
OpenDDS::DCPS::TypeSupport_ptr get_type_support()
T * _duplicate(T *st)
DDS::StatusMask listener_mask_
EntityFactoryQosPolicy entity_factory
DDS::DomainId_t domain_id_
Domain in which we are contained.
virtual DDS::DataWriter_ptr create_datawriter(DDS::Topic_ptr a_topic, const DDS::DataWriterQos &qos, DDS::DataWriterListener_ptr a_listener, DDS::StatusMask mask)
const ReturnCode_t RETCODE_IMMUTABLE_POLICY
#define OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(qos, error_rtn_value)
unique_ptr< Monitor > monitor_
Monitor object for this entity.
DDS::ReturnCode_t writer_enabled(const char *topic_name, DataWriterImpl *impl)
Implements the DDS::Topic interface.
Definition: TopicImpl.h:37
PresentationQosPolicy presentation
LM_DEBUG
Implements the OpenDDS::DCPS::DataWriterRemote interfaces and DDS::DataWriter interfaces.
static bool validate_datawriter_qos(const DDS::DataWriterQos &qos, const DDS::DataWriterQos &default_qos, DDS::Topic_ptr a_topic, DDS::DataWriterQos &dw_qos)
virtual DDS::ReturnCode_t resume_publications()
virtual DDS::ReturnCode_t set_default_datawriter_qos(const DDS::DataWriterQos &qos)
#define OPENDDS_NO_OWNERSHIP_STRENGTH_COMPATIBILITY_CHECK(qos, error_rtn_value)
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
virtual DDS::PublisherListener_ptr get_listener()
virtual DDS::ReturnCode_t wait_for_acknowledgments(const DDS::Duration_t &max_wait)
LM_WARNING
bool is_clean(String *leftover_entities=0) const
DDS::PublisherListener_ptr listener_for(::DDS::StatusKind kind)
The End User API.
std::size_t change_depth_
The number of times begin_coherent_changes as been called.
TimeDuration liveliness_check_interval(DDS::LivelinessQosPolicyKind kind)
const char *const name
Definition: debug.cpp:60
static bool copy_from_topic_qos(DDS::DataReaderQos &a_datareader_qos, const DDS::TopicQos &a_topic_qos)
virtual DDS::ReturnCode_t suspend_publications()
HANDLE_TYPE_NATIVE InstanceHandle_t
Definition: DdsDcpsCore.idl:51
ACE_TEXT("TCP_Factory")
unsigned long StatusMask
const ReturnCode_t RETCODE_NOT_ENABLED
AtomicBool enabled_
The flag indicates the entity is enabled.
Definition: EntityImpl.h:82
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
OpenDDS_Dcps_Export LogLevel log_level
void get_publication_ids(PublicationIdVec &pubs)
bool set_wait_pending_deadline(const MonotonicTimePoint &deadline)
virtual DDS::InstanceHandle_t get_instance_handle()
#define OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK(qos, error_rtn_value)
virtual DDS::Publisher_ptr get_publisher()
virtual DDS::ReturnCode_t enable()
void init(TopicImpl *topic_servant, const DDS::DataWriterQos &qos, DDS::DataWriterListener_ptr a_listener, const DDS::StatusMask &mask, WeakRcHandle< DomainParticipantImpl > participant_servant, PublisherImpl *publisher_servant)
#define DATAWRITER_QOS_DEFAULT
const ReturnCode_t RETCODE_ERROR
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
const ReturnCode_t RETCODE_OK
CORBA::Short suspend_depth_count_
The suspend depth count.
PublisherImpl(DDS::InstanceHandle_t handle, GUID_t id, const DDS::PublisherQos &qos, DDS::PublisherListener_ptr a_listener, const DDS::StatusMask &mask, DomainParticipantImpl *participant)
const ReturnCode_t RETCODE_UNSUPPORTED
DataWriterSet writers_not_enabled_
ACE_Thread_Mutex listener_mutex_
Mutex to protect listener info.
#define DATAWRITER_QOS_USE_TOPIC_QOS
WeakRcHandle< DomainParticipantImpl > participant_
The DomainParticipant servant that owns this Publisher.
const character_type * in(void) const
unsigned long StatusKind
static bool valid(const DDS::UserDataQosPolicy &qos)
Definition: Qos_Helper.inl:723
reverse_lock_type reverse_pi_lock_
#define TheServiceParticipant
virtual DDS::ReturnCode_t enable()
static bool consistent(const DDS::ResourceLimitsQosPolicy &resource_limits, const DDS::HistoryQosPolicy &history)
Definition: Qos_Helper.inl:596
LM_ERROR
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
DDS::DataWriterQos default_datawriter_qos_
Default datawriter Qos policy list.
void set_wait_pending_deadline(const MonotonicTimePoint &deadline)
typedef OPENDDS_MAP(OPENDDS_STRING, OPENDDS_STRING) ValueMap
Helper types and functions for config file parsing.
bool contains_writer(DDS::InstanceHandle_t a_handle)
LivelinessQosPolicyKind
#define OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(qos, error_rtn_value)
Boolean is_nil(T x)
virtual DDS::DataWriter_ptr lookup_datawriter(const char *topic_name)
virtual DDS::DomainParticipant_ptr get_participant()
virtual DDS::ReturnCode_t begin_coherent_changes()
lock_type pi_lock_
The recursive lock to protect datawriter map and suspend count.
static const TimeDuration max_value
Definition: TimeDuration.h:32