OpenDDS  Snapshot(2023/04/28-20:55)
ReplayerImpl.cpp
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
9 #include "ReplayerImpl.h"
11 #include "DomainParticipantImpl.h"
12 #include "PublisherImpl.h"
13 #include "Service_Participant.h"
14 #include "GuidConverter.h"
15 #include "TopicImpl.h"
16 #include "PublicationInstance.h"
18 #include "DataSampleElement.h"
19 #include "Serializer.h"
20 #include "Transient_Kludge.h"
21 #include "DataDurabilityCache.h"
22 #include "MonitorFactory.h"
23 #include "TypeSupportImpl.h"
24 #include "DCPS_Utils.h"
25 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
26 #include "CoherentChangeControl.h"
27 #endif
28 #include "AssociationData.h"
29 
30 #if !defined (DDS_HAS_MINIMUM_BIT)
31 #include "BuiltInTopicUtils.h"
32 #endif // !defined (DDS_HAS_MINIMUM_BIT)
33 
34 #include "Util.h"
35 
40 
41 #include "ace/Reactor.h"
42 #include "ace/Auto_Ptr.h"
43 
44 #include <stdexcept>
45 
47 
48 namespace OpenDDS {
49 namespace DCPS {
50 
51 
53  : data_dropped_count_(0),
54  data_delivered_count_(0),
55  n_chunks_(TheServiceParticipant->n_chunks()),
56  association_chunk_multiplier_(TheServiceParticipant->association_chunk_multiplier()),
57  qos_(TheServiceParticipant->initial_DataWriterQos()),
58  participant_servant_(0),
59  topic_id_(GUID_UNKNOWN),
60  topic_servant_(0),
61  listener_mask_(DEFAULT_STATUS_MASK),
62  domain_id_(0),
63  publisher_servant_(0),
64  publication_id_(GUID_UNKNOWN),
65  sequence_number_(SequenceNumber::SEQUENCENUMBER_UNKNOWN()),
66  // data_container_(0),
67  // liveliness_lost_(false),
68  // last_deadline_missed_total_count_(0),
69  is_bit_(false),
70  empty_condition_(lock_),
71  pending_write_count_(0)
72 {
73  // liveliness_lost_status_.total_count = 0;
74  // liveliness_lost_status_.total_count_change = 0;
75  //
76  // offered_deadline_missed_status_.total_count = 0;
77  // offered_deadline_missed_status_.total_count_change = 0;
78  // offered_deadline_missed_status_.last_instance_handle = DDS::HANDLE_NIL;
79 
84 
90 
91 }
92 
93 // This method is called when there are no longer any reference to the
94 // the servant.
96 {
97  DBG_ENTRY_LVL("ReplayerImpl","~ReplayerImpl",6);
98 }
99 
100 // this method is called when delete_datawriter is called.
103 {
104 
105  // // Unregister all registered instances prior to deletion.
106  // // this->unregister_instances(SystemTimePoint::now().to_dds_time());
107  //
108  // // CORBA::String_var topic_name = this->get_Atopic_name();
109  {
111 
112  // Wait for pending samples to drain prior to removing associations
113  // and unregistering the publication.
114  ThreadStatusManager& thread_status_manager = TheServiceParticipant->get_thread_status_manager();
115  while (this->pending_write_count_) {
116  this->empty_condition_.wait(thread_status_manager);
117  }
118 
119  // Call remove association before unregistering the datawriter
120  // with the transport, otherwise some callbacks resulted from
121  // remove_association may lost.
122  this->remove_all_associations();
123 
124  // release our Topic_var
125  topic_objref_ = DDS::Topic::_nil();
126  topic_servant_ = 0;
127 
128  }
129 
130  // not just unregister but remove any pending writes/sends.
131  // this->unregister_all();
132 
133  Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
134  if (!disco->remove_publication(
135  this->domain_id_,
136  this->participant_servant_->get_id(),
137  this->publication_id_)) {
139  ACE_TEXT("(%P|%t) ERROR: ")
140  ACE_TEXT("PublisherImpl::delete_datawriter, ")
141  ACE_TEXT("publication not removed from discovery.\n")),
143  }
144  return DDS::RETCODE_OK;
145 }
146 
147 void
149  DDS::Topic_ptr topic,
150  TopicImpl * topic_servant,
151  const DDS::DataWriterQos & qos,
152  ReplayerListener_rch a_listener,
153  const DDS::StatusMask & mask,
154  OpenDDS::DCPS::DomainParticipantImpl * participant_servant,
155  const DDS::PublisherQos& publisher_qos)
156 {
157  DBG_ENTRY_LVL("ReplayerImpl","init",6);
158  topic_objref_ = DDS::Topic::_duplicate(topic);
159  topic_servant_ = topic_servant;
160  topic_name_ = topic_servant_->get_name();
161  topic_id_ = topic_servant_->get_id();
162  type_name_ = topic_servant_->get_type_name();
163 
164 #if !defined (DDS_HAS_MINIMUM_BIT)
166 #endif // !defined (DDS_HAS_MINIMUM_BIT)
167 
168  qos_ = qos;
169  passed_qos_ = qos;
170 
171  //Note: OK to _duplicate(nil).
172  listener_ = a_listener;
173  listener_mask_ = mask;
174 
175  // Only store the participant pointer, since it is our "grand"
176  // parent, we will exist as long as it does.
177  participant_servant_ = participant_servant;
179 
180  publisher_qos_ = publisher_qos;
181 }
182 
183 
185  const DDS::DataWriterQos & qos)
186 {
187 
189 
190  if (Qos_Helper::valid(publisher_qos) && Qos_Helper::consistent(publisher_qos)) {
191  if (publisher_qos_ == publisher_qos)
192  return DDS::RETCODE_OK;
193 
194  // for the not changeable qos, it can be changed before enable
195  if (!Qos_Helper::changeable(publisher_qos_, publisher_qos) && enabled_) {
197 
198  } else {
199  publisher_qos_ = publisher_qos;
200  }
201  } else {
203  }
204 
210 
211  if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
212  if (qos_ == qos)
213  return DDS::RETCODE_OK;
214 
215  if (!Qos_Helper::changeable(qos_, qos) && enabled_) {
217 
218  } else {
219  Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
220  // DDS::PublisherQos publisherQos;
221  // this->publisher_servant_->get_qos(publisherQos);
222  DDS::PublisherQos publisherQos = this->publisher_qos_;
223  const bool status
224  = disco->update_publication_qos(this->participant_servant_->get_domain_id(),
225  this->participant_servant_->get_id(),
226  this->publication_id_,
227  qos,
228  publisherQos);
229 
230  if (!status) {
232  ACE_TEXT("(%P|%t) DataWriterImpl::set_qos, ")
233  ACE_TEXT("qos not updated.\n")),
235  }
236  }
237 
238  if (!(qos_ == qos)) {
239  // Reset the deadline timer if the period has changed.
240  // if (qos_.deadline.period.sec != qos.deadline.period.sec
241  // || qos_.deadline.period.nanosec != qos.deadline.period.nanosec) {
242  // if (qos_.deadline.period.sec == DDS::DURATION_INFINITE_SEC
243  // && qos_.deadline.period.nanosec == DDS::DURATION_INFINITE_NSEC) {
244  // ACE_auto_ptr_reset(this->watchdog_,
245  // new OfferedDeadlineWatchdog(
246  // this->reactor_,
247  // this->lock_,
248  // qos.deadline,
249  // this,
250  // this,
251  // this->offered_deadline_missed_status_,
252  // this->last_deadline_missed_total_count_));
253  //
254  // } else if (qos.deadline.period.sec == DDS::DURATION_INFINITE_SEC
255  // && qos.deadline.period.nanosec == DDS::DURATION_INFINITE_NSEC) {
256  // this->watchdog_->cancel_all();
257  // this->watchdog_.reset();
258  //
259  // } else {
260  // this->watchdog_->reset_interval(
261  // duration_to_time_value(qos.deadline.period));
262  // }
263  // }
264 
265  qos_ = qos;
266  }
267 
268  return DDS::RETCODE_OK;
269 
270  } else {
272  }
273 }
274 
276  DDS::DataWriterQos & qos)
277 {
278  qos = passed_qos_;
279  publisher_qos = publisher_qos_;
280  return DDS::RETCODE_OK;
281 }
282 
283 
285  DDS::StatusMask mask)
286 {
287  listener_ = a_listener;
288  listener_mask_ = mask;
289  return DDS::RETCODE_OK;
290 }
291 
293 {
294  return listener_;
295 }
296 
299 {
300  //According spec:
301  // - Calling enable on an already enabled Entity returns OK and has no
302  // effect.
303  // - Calling enable on an Entity whose factory is not enabled will fail
304  // and return PRECONDITION_NOT_MET.
305 
306  if (this->is_enabled()) {
307  return DDS::RETCODE_OK;
308  }
309 
310  // if (!this->publisher_servant_->is_enabled()) {
311  // return DDS::RETCODE_PRECONDITION_NOT_MET;
312  // }
313  //
314  const bool reliable = qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS;
315 
318  }
319  // +1 because we might allocate one before releasing another
320  // TBD - see if this +1 can be removed.
324 
326 
327 
328  if (DCPS_debug_level >= 2) {
330  "(%P|%t) ReplayerImpl::enable-mb"
331  " Cached_Allocator_With_Overflow %x with %d chunks\n",
332  mb_allocator_.get(),
333  n_chunks_));
334 
336  "(%P|%t) ReplayerImpl::enable-db"
337  " Cached_Allocator_With_Overflow %x with %d chunks\n",
338  db_allocator_.get(),
339  n_chunks_));
340 
342  "(%P|%t) ReplayerImpl::enable-header"
343  " Cached_Allocator_With_Overflow %x with %d chunks\n",
344  header_allocator_.get(),
345  n_chunks_));
346  }
347 
348  this->set_enabled();
349 
350  try {
351  this->enable_transport(reliable,
353 
354  } catch (const Transport::Exception&) {
356  ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::enable, ")
357  ACE_TEXT("Transport Exception.\n")));
358  return DDS::RETCODE_ERROR;
359 
360  }
361 
362  const TransportLocatorSeq& trans_conf_info = connection_info();
363 
364 
365  Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
366 
368  if (!topic_servant_->check_data_representation(qos_.representation.value, true)) {
369  return DDS::RETCODE_ERROR;
370  }
371 
372  XTypes::TypeInformation type_info;
374  type_info.minimal.dependent_typeid_count = 0;
376  type_info.complete.dependent_typeid_count = 0;
377 
378  this->publication_id_ =
379  disco->add_publication(this->domain_id_,
380  this->participant_servant_->get_id(),
381  this->topic_servant_->get_id(),
382  rchandle_from(this),
383  this->qos_,
384  trans_conf_info,
385  this->publisher_qos_,
386  type_info);
387 
388  if (this->publication_id_ == GUID_UNKNOWN) {
390  ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::enable, ")
391  ACE_TEXT("add_publication returned invalid id.\n")));
392  return DDS::RETCODE_ERROR;
393  }
394 
395  return DDS::RETCODE_OK;
396 }
397 
398 
399 
400 void
402  const ReaderAssociation& reader,
403  bool active)
404 {
405  DBG_ENTRY_LVL("ReplayerImpl", "add_association", 6);
406 
407  if (DCPS_debug_level >= 1) {
409  ACE_TEXT("(%P|%t) ReplayerImpl::add_association - ")
410  ACE_TEXT("bit %d local %C remote %C\n"),
411  is_bit_,
412  LogGuid(yourId).c_str(),
413  LogGuid(reader.readerId).c_str()));
414  }
415 
416  // if (entity_deleted_) {
417  // if (DCPS_debug_level >= 1)
418  // ACE_DEBUG((LM_DEBUG,
419  // ACE_TEXT("(%P|%t) ReplayerImpl::add_association")
420  // ACE_TEXT(" This is a deleted datawriter, ignoring add.\n")));
421  //
422  // return;
423  // }
424 
425  if (GUID_UNKNOWN == publication_id_) {
426  publication_id_ = yourId;
427  }
428 
429  {
431  reader_info_.insert(std::make_pair(reader.readerId,
432  ReaderInfo(TheServiceParticipant->publisher_content_filter() ? reader.filterExpression : "",
435  }
436 
437  if (DCPS_debug_level > 4) {
439  ACE_TEXT("(%P|%t) ReplayerImpl::add_association(): ")
440  ACE_TEXT("adding subscription to publication %C with priority %d.\n"),
441  LogGuid(publication_id_).c_str(),
443  }
444 
445  AssociationData data;
446  data.remote_id_ = reader.readerId;
447  data.remote_data_ = reader.readerTransInfo;
448  data.discovery_locator_ = reader.readerDiscInfo;
450  data.remote_reliable_ =
452  data.remote_durable_ =
454 
455  if (!this->associate(data, active)) {
456  //FUTURE: inform inforepo and try again as passive peer
457  if (DCPS_debug_level) {
459  ACE_TEXT("(%P|%t) ReplayerImpl::add_association: ")
460  ACE_TEXT("ERROR: transport layer failed to associate.\n")));
461  }
462  return;
463  }
464 
465  if (active) {
467 
469  }
470 }
471 
472 
474  const DDS::StringSeq& params,
476  bool durable)
477  : expected_sequence_(SequenceNumber::SEQUENCENUMBER_UNKNOWN())
478  , durable_(durable)
479 {
480  ACE_UNUSED_ARG(filter);
481  ACE_UNUSED_ARG(params);
482  ACE_UNUSED_ARG(participant);
483 }
484 
485 
487 {
488 }
489 
490 void
492 {
493  DBG_ENTRY_LVL("ReplayerImpl", "association_complete_i", 6);
494  // bool reader_durable = false;
495  {
497  if (OpenDDS::DCPS::insert(readers_, remote_id) == -1) {
499  ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::association_complete_i: ")
500  ACE_TEXT("insert %C from pending failed.\n"),
501  LogGuid(remote_id).c_str()));
502  }
503  // RepoIdToReaderInfoMap::const_iterator it = reader_info_.find(remote_id);
504  // if (it != reader_info_.end()) {
505  // reader_durable = it->second.durable_;
506  // }
507  }
508 
509  if (!is_bit_) {
510 
511  const DDS::InstanceHandle_t handle = participant_servant_->assign_handle(remote_id);
512 
513  {
514  // protect publication_match_status_ and status changed flags.
516 
517  // update the publication_match_status_
522 
523  if (OpenDDS::DCPS::bind(id_to_handle_map_, remote_id, handle) != 0) {
525  ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::association_complete_i: ")
526  ACE_TEXT("id_to_handle_map_%C = 0x%x failed.\n"),
527  LogGuid(remote_id).c_str(),
528  handle));
529  return;
530 
531  } else if (DCPS_debug_level > 4) {
533  ACE_TEXT("(%P|%t) ReplayerImpl::association_complete_i: ")
534  ACE_TEXT("id_to_handle_map_%C = 0x%x.\n"),
535  LogGuid(remote_id).c_str(),
536  handle));
537  }
538 
540 
541  }
542 
543 
544  if (listener_.in()) {
547 
548  // TBD - why does the spec say to change this but not
549  // change the ChangeFlagStatus after a listener call?
552  }
553 
554  }
555 
556 }
557 
558 void
560  CORBA::Boolean notify_lost)
561 {
562  if (DCPS_debug_level >= 1) {
564  ACE_TEXT("(%P|%t) ReplayerImpl::remove_associations: ")
565  ACE_TEXT("bit %d local %C remote %C num remotes %d\n"),
566  is_bit_,
567  LogGuid(publication_id_).c_str(),
568  LogGuid(readers[0]).c_str(),
569  readers.length()));
570  }
571 
572  this->stop_associating(readers.get_buffer(), readers.length());
573 
574  ReaderIdSeq fully_associated_readers;
575  CORBA::ULong fully_associated_len = 0;
576  ReaderIdSeq rds;
577  CORBA::ULong rds_len = 0;
578  DDS::InstanceHandleSeq handles;
579 
580  {
581  // Ensure the same acquisition order as in wait_for_acknowledgments().
582  // ACE_GUARD(ACE_SYNCH_MUTEX, wfaGuard, this->wfaLock_);
584 
585  //Remove the readers from fully associated reader list.
586  //If the supplied reader is not in the cached reader list then it is
587  //already removed. We just need remove the readers in the list that have
588  //not been removed.
589 
590  CORBA::ULong len = readers.length();
591 
592  for (CORBA::ULong i = 0; i < len; ++i) {
593  //Remove the readers from fully associated reader list. If it's not
594  //in there, the association_complete() is not called yet and remove it
595  //from pending list.
596 
597  if (OpenDDS::DCPS::remove(readers_, readers[i]) == 0) {
598  ++fully_associated_len;
599  fully_associated_readers.length(fully_associated_len);
600  fully_associated_readers [fully_associated_len - 1] = readers[i];
601 
602  // Remove this reader from the ACK sequence map if its there.
603  // This is where we need to be holding the wfaLock_ obtained
604  // above.
605  RepoIdToSequenceMap::iterator where
606  = this->idToSequence_.find(readers[i]);
607 
608  if (where != this->idToSequence_.end()) {
609  this->idToSequence_.erase(where);
610 
611  // It is possible that this subscription was causing the wait
612  // to continue, so give the opportunity to find out.
613  // this->wfaCondition_.broadcast();
614  }
615 
616  ++rds_len;
617  rds.length(rds_len);
618  rds [rds_len - 1] = readers[i];
619  }
620  reader_info_.erase(readers[i]);
621  //else reader is already removed which indicates remove_association()
622  //is called multiple times.
623  }
624 
625  if (fully_associated_len > 0 && !is_bit_) {
626  // The reader should be in the id_to_handle map at this time
627  this->lookup_instance_handles(fully_associated_readers, handles);
628 
629  for (CORBA::ULong i = 0; i < fully_associated_len; ++i) {
630  id_to_handle_map_.erase(fully_associated_readers[i]);
631  }
632  }
633 
634  // wfaGuard.release();
635 
636  // Mirror the PUBLICATION_MATCHED_STATUS processing from
637  // association_complete() here.
638  if (!this->is_bit_) {
639 
640  // Derive the change in the number of subscriptions reading this writer.
641  int matchedSubscriptions =
642  static_cast<int>(this->id_to_handle_map_.size());
644  matchedSubscriptions - this->publication_match_status_.current_count;
645 
646  // Only process status if the number of subscriptions has changed.
648  this->publication_match_status_.current_count = matchedSubscriptions;
649 
650  /// Section 7.1.4.1: total_count will not decrement.
651 
652  /// @TODO: Reconcile this with the verbiage in section 7.1.4.1
653  /// TODO: Should rds_len really be fully_associated_len here??
655  handles[rds_len - 1];
656 
657 
658  if (listener_.in()) {
660  this,
662 
663  // Listener consumes the change.
666  }
667 
668  }
669  }
670  }
671 
672  for (CORBA::ULong i = 0; i < rds.length(); ++i) {
673  this->disassociate(rds[i]);
674  }
675 
676  // If this remove_association is invoked when the InfoRepo
677  // detects a lost reader then make a callback to notify
678  // subscription lost.
679  if (notify_lost && handles.length() > 0) {
680  this->notify_publication_lost(handles);
681  }
682 
683  for (unsigned int i = 0; i < handles.length(); ++i) {
684  participant_servant_->return_handle(handles[i]);
685  }
686 }
687 
689 {
690  this->stop_associating();
691 
693  CORBA::ULong size;
694  {
696 
697  size = static_cast<CORBA::ULong>(readers_.size());
698  readers.length(size);
699 
700  RepoIdSet::iterator itEnd = readers_.end();
701  int i = 0;
702 
703  for (RepoIdSet::iterator it = readers_.begin(); it != itEnd; ++it) {
704  readers[i++] = *it;
705  }
706  }
707 
708  try {
709  if (0 < size) {
710  CORBA::Boolean dont_notify_lost = false;
711  this->remove_associations(readers, dont_notify_lost);
712  }
713 
714  } catch (const CORBA::Exception&) {
715  }
716 
717  transport_stop();
718 }
719 
720 void
722  const GUID_t& writerid,
723  const GUID_t& readerid,
724  const TransportLocatorSeq& locators,
725  DiscoveryListener* listener)
726 {
727  TransportClient::register_for_reader(participant, writerid, readerid, locators, listener);
728 }
729 
730 void
732  const GUID_t& writerid,
733  const GUID_t& readerid)
734 {
735  TransportClient::unregister_for_reader(participant, writerid, readerid);
736 }
737 
738 void
740 {
741 
742 
744 
745  // copy status and increment change
748  status.count_since_last_send;
751 
752 }
753 
754 void
756  const DDS::StringSeq& params)
757 {
758  ACE_UNUSED_ARG(readerId);
759  ACE_UNUSED_ARG(params);
760 }
761 
762 bool
764 {
765  // DataWriter does not impose any constraints on which transports
766  // may be used based on QoS.
767  return true;
768 }
769 
771 {
772  return this->publication_id_;
773 }
774 
777 {
778  return this->qos_.transport_priority.value;
779 }
780 
781 void
783 {
784  DBG_ENTRY_LVL("ReplayerImpl","data_delivered",6);
785  if (!(sample->get_pub_id() == this->publication_id_)) {
787  ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::data_delivered: ")
788  ACE_TEXT(" The publication id %C from delivered element ")
789  ACE_TEXT("does not match the datawriter's id %C\n"),
790  LogGuid(sample->get_pub_id()).c_str(),
792  return;
793  }
794  DataSampleElement* elem = const_cast<DataSampleElement*>(sample);
795  // this->data_container_->data_delivered(sample);
798 
799  {
801  if (--pending_write_count_ == 0) {
803  }
804  }
805 }
806 
807 void
809 {
810  ACE_UNUSED_ARG(sample);
811 }
812 
813 void
815  bool dropped_by_transport)
816 {
817  DBG_ENTRY_LVL("ReplayerImpl","data_dropped",6);
818  // this->data_container_->data_dropped(element, dropped_by_transport);
819  ACE_UNUSED_ARG(dropped_by_transport);
820  DataSampleElement* elem = const_cast<DataSampleElement*>(sample);
823  {
825  if ((--pending_write_count_) == 0) {
827  }
828  }
829 }
830 
831 void
833  bool /* dropped_by_transport */)
834 {
835  ACE_UNUSED_ARG(sample);
836 }
837 
838 void
840 {
841  ACE_UNUSED_ARG(subids);
842 }
843 
844 void
846 {
847  ACE_UNUSED_ARG(subids);
848 }
849 
850 void
852 {
853  ACE_UNUSED_ARG(subids);
854 }
855 
856 void
858 {
859  ACE_UNUSED_ARG(handles);
860 }
861 
862 
863 void
865 {
866  qos_data.pub_qos = this->publisher_qos_;
867  qos_data.dw_qos = this->qos_;
868  qos_data.topic_name = this->topic_name_.in();
869 }
870 
873  int num_samples,
874  DDS::InstanceHandle_t* reader_ih_ptr)
875 {
876  DBG_ENTRY_LVL("ReplayerImpl","write",6);
877 
879  if (reader_ih_ptr) {
880  repo_id = this->participant_servant_->get_repoid(*reader_ih_ptr);
881  if (repo_id == GUID_UNKNOWN) {
883  ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::write: ")
884  ACE_TEXT("Invalid reader instance handle (%d)\n"), *reader_ih_ptr),
886  }
887  }
888 
890 
891  for (int i = 0; i < num_samples; ++i) {
892  DataSampleElement* element = 0;
893 
895  element,
896  static_cast<DataSampleElement*>(
898  sizeof(DataSampleElement))),
900  this,
903 
904  element->get_header().byte_order_ = samples[i].sample_byte_order_;
905  element->get_header().publication_id_ = this->publication_id_;
906  list.enqueue_tail(element);
907  Message_Block_Ptr temp;
908  Message_Block_Ptr sample(samples[i].sample_->duplicate());
910  element->get_header(),
911  temp,
912  samples[i].source_timestamp_,
913  false);
914  element->set_sample(move(temp));
915  if (reader_ih_ptr) {
916  element->set_num_subs(1);
917  element->set_sub_id(0, repo_id);
918  }
919 
920  if (ret != DDS::RETCODE_OK) {
921  // we need to free the list
922  while (list.dequeue(element)) {
924  }
925 
926  return ret;
927  }
928  }
929 
930  {
933  }
934 
935  this->send(list);
936 
937  for (RepoIdToReaderInfoMap::iterator iter = reader_info_.begin(),
938  end = reader_info_.end(); iter != end; ++iter) {
939  iter->second.expected_sequence_ = sequence_number_;
940  }
941 
942  return DDS::RETCODE_OK;
943 }
944 
947 {
948  return this->write(&sample, 1, 0);
949 }
950 
953  DataSampleHeader& header_data,
954  Message_Block_Ptr& message,
955  const DDS::Time_t& source_timestamp,
956  bool content_filter)
957 {
958  header_data.message_id_ = SAMPLE_DATA;
959  header_data.coherent_change_ = content_filter;
960 
961  header_data.content_filter_ = false;
962  header_data.cdr_encapsulation_ = this->cdr_encapsulation();
963  header_data.message_length_ = static_cast<ACE_UINT32>(data->total_length());
964  header_data.sequence_repair_ = need_sequence_repair();
967  } else {
968  ++this->sequence_number_;
969  }
970  header_data.sequence_ = this->sequence_number_;
971  header_data.source_timestamp_sec_ = source_timestamp.sec;
972  header_data.source_timestamp_nanosec_ = source_timestamp.nanosec;
973 
976  header_data.lifespan_duration_ = true;
979  }
980 
981  // header_data.publication_id_ = publication_id_;
982  // header_data.publisher_id_ = this->publisher_servant_->publisher_id_;
983  ACE_Message_Block* tmp;
985  static_cast<ACE_Message_Block*>(
986  mb_allocator_->malloc(sizeof(ACE_Message_Block))),
989  data.release(), //cont
990  0, //data
991  header_allocator_.get(), //alloc_strategy
992  0, //locking_strategy
996  db_allocator_.get(),
997  mb_allocator_.get()),
999  message.reset(tmp);
1000  *message << header_data;
1001  return DDS::RETCODE_OK;
1002 }
1003 
1004 void
1006  DDS::InstanceHandleSeq & hdls)
1007 {
1008  CORBA::ULong const num_rds = ids.length();
1009 
1010  if (DCPS_debug_level > 9) {
1011  OPENDDS_STRING separator;
1012  OPENDDS_STRING buffer;
1013 
1014  for (CORBA::ULong i = 0; i < num_rds; ++i) {
1015  buffer += separator + LogGuid(ids[i]).conv_;
1016  separator = ", ";
1017  }
1018 
1020  ACE_TEXT("(%P|%t) ReplayerImpl::lookup_instance_handles: ")
1021  ACE_TEXT("searching for handles for reader Ids: %C.\n"),
1022  buffer.c_str()));
1023  }
1024 
1025  hdls.length(num_rds);
1026 
1027  for (CORBA::ULong i = 0; i < num_rds; ++i) {
1028  hdls[i] = participant_servant_->lookup_handle(ids[i]);
1029  }
1030 }
1031 
1032 bool
1034 {
1035  for (RepoIdToReaderInfoMap::const_iterator it = reader_info_.begin(),
1036  end = reader_info_.end(); it != end; ++it) {
1037  if (it->second.expected_sequence_ != sequence_number_) {
1038  return true;
1039  }
1040  }
1041  return false;
1042 }
1043 
1046 {
1048 }
1049 
1052  const RawDataSample& sample )
1053 {
1054  return write(&sample, 1, &subscription);
1055 }
1056 
1059  const RawDataSampleList& samples )
1060 {
1061  if (!samples.empty())
1062  return write(&samples[0], static_cast<int>(samples.size()), &subscription);
1063  return DDS::RETCODE_ERROR;
1064 }
1065 
1066 } // namespace DCPS
1067 } // namespace
1068 
void set_sample(Message_Block_Ptr sample)
void association_complete_i(const GUID_t &remote_id)
void send(SendStateDataSampleList send_list, ACE_UINT64 transaction_id=0)
RcHandle< PublicationInstance > PublicationInstance_rch
Implements the OpenDDS::DCPS::DomainParticipant interfaces.
#define ACE_DEBUG(X)
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
sequence< InstanceHandle_t > InstanceHandleSeq
Definition: DdsDcpsCore.idl:53
ACE_CDR::Long Long
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
virtual void add_association(const GUID_t &yourId, const ReaderAssociation &reader, bool active)
unique_ptr< DataBlockAllocator > db_allocator_
Definition: ReplayerImpl.h:279
DDS::ReturnCode_t set_enabled()
Definition: EntityImpl.cpp:40
const DataSampleHeader & get_header() const
static const ACE_Time_Value max_time
const InstanceHandle_t HANDLE_NIL
char message_id_
The enum MessageId.
Base class to hold configuration settings for TransportImpls.
Definition: TransportInst.h:64
void enable_transport(bool reliable, bool durable)
const ReturnCode_t RETCODE_INCONSISTENT_POLICY
DDS::DataWriterQos qos_
The qos policy list of this datawriter.
Definition: ReplayerImpl.h:193
virtual void init(DDS::Topic_ptr topic, TopicImpl *topic_servant, const DDS::DataWriterQos &qos, ReplayerListener_rch a_listener, const DDS::StatusMask &mask, OpenDDS::DCPS::DomainParticipantImpl *participant_servant, const DDS::PublisherQos &publisher_qos)
ReliabilityQosPolicy reliability
void set_sub_id(CORBA::ULong index, OpenDDS::DCPS::GUID_t id)
const TransportLocatorSeq & connection_info() const
const long DURATION_INFINITE_SEC
Definition: DdsDcpsCore.idl:72
DurabilityQosPolicy durability
#define OPENDDS_NO_DURABILITY_SERVICE_COMPATIBILITY_CHECK(qos, error_rtn_value)
TransportLocatorSeq remote_data_
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
int bind(Container &c, const FirstType &first, const SecondType &second)
Definition: Util.h:20
virtual void notify_publication_disconnected(const ReaderIdSeq &subids)
RepoIdToHandleMap id_to_handle_map_
Definition: ReplayerImpl.h:250
static bool changeable(const DDS::UserDataQosPolicy &qos1, const DDS::UserDataQosPolicy &qos2)
Definition: Qos_Helper.inl:948
const char * c_str() const
unique_ptr< DataSampleElementAllocator > sample_list_element_allocator_
Definition: ReplayerImpl.h:285
TransportLocator discovery_locator_
ACE_CDR::ULong remote_transport_context_
#define OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(qos, error_rtn_value)
virtual DDS::ReturnCode_t write_to_reader(DDS::InstanceHandle_t subscription, const RawDataSample &sample)
virtual void notify_publication_lost(const ReaderIdSeq &subids)
sequence< TransportLocator > TransportLocatorSeq
ACE_Recursive_Thread_Mutex lock_
The sample data container.
Definition: ReplayerImpl.h:246
virtual bool check_transport_qos(const TransportInst &inst)
unique_ptr< DataSampleHeaderAllocator > header_allocator_
Definition: ReplayerImpl.h:281
bool topicIsBIT(const char *name, const char *type)
void return_handle(DDS::InstanceHandle_t handle)
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
DataRepresentationQosPolicy representation
Cached_Allocator_With_Overflow< DataSampleHeader, ACE_Null_Mutex > DataSampleHeaderAllocator
RepoIdToSequenceMap idToSequence_
Definition: ReplayerImpl.h:311
void disassociate(const GUID_t &peerId)
ACE_Guard< ACE_Thread_Mutex > lock_
ReaderInfo(const char *filter, const DDS::StringSeq &params, DomainParticipantImpl *participant, bool durable)
virtual void unregister_for_reader(const GUID_t &participant, const GUID_t &writerid, const GUID_t &readerid)
GUID_t get_repoid(DDS::InstanceHandle_t id) const
size_t association_chunk_multiplier_
The multiplier for allocators affected by associations.
Definition: ReplayerImpl.h:187
TopicDescriptionPtr< TopicImpl > topic_servant_
The topic servant.
Definition: ReplayerImpl.h:224
const DDS::StatusMask DEFAULT_STATUS_MASK
void set_writer_effective_data_rep_qos(DDS::DataRepresentationIdSeq &qos, bool cdr_encapsulated)
Definition: DCPS_Utils.cpp:508
const ReturnCode_t RETCODE_IMMUTABLE_POLICY
#define OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(qos, error_rtn_value)
DomainParticipantImpl * participant()
Definition: ReplayerImpl.h:161
void lookup_instance_handles(const ReaderIdSeq &ids, DDS::InstanceHandleSeq &hdls)
Lookup the instance handles by the subscription repo ids.
Implements the DDS::Topic interface.
Definition: TopicImpl.h:37
#define OPENDDS_STRING
sequence< GUID_t > ReaderIdSeq
RepoIdToReaderInfoMap reader_info_
Definition: ReplayerImpl.h:211
virtual void retrieve_inline_qos_data(InlineQosData &qos_data) const
unique_ptr< MessageBlockAllocator > mb_allocator_
Definition: ReplayerImpl.h:277
GUID_t publication_id_
The repository id of this datawriter/publication.
Definition: ReplayerImpl.h:238
DDS::PublicationMatchedStatus publication_match_status_
Definition: ReplayerImpl.h:258
void enqueue_tail(const DataSampleElement *element)
LM_DEBUG
#define ACE_NEW_MALLOC_RETURN(POINTER, ALLOCATOR, CONSTRUCTOR, RET_VAL)
virtual void control_delivered(const Message_Block_Ptr &sample)
ReliabilityQosPolicyKind kind
ACE_CDR::ULong ULong
DurabilityQosPolicyKind kind
DurabilityQosPolicy durability
virtual void data_delivered(const DataSampleElement *sample)
static size_t get_max_serialized_size()
Similar to IDL compiler generated methods.
virtual DDS::InstanceHandle_t get_instance_handle()
void unregister_for_reader(const GUID_t &participant, const GUID_t &writerid, const GUID_t &readerid)
unsigned long nanosec
Definition: DdsDcpsCore.idl:69
TypeIdentifierWithDependencies complete
Definition: TypeObject.h:3374
#define OPENDDS_NO_OWNERSHIP_STRENGTH_COMPATIBILITY_CHECK(qos, error_rtn_value)
virtual DDS::ReturnCode_t set_qos(const DDS::PublisherQos &publisher_qos, const DDS::DataWriterQos &datawriter_qos)
ACE_CDR::Boolean Boolean
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
DomainParticipantImpl * participant_servant_
Definition: ReplayerImpl.h:200
DDS::ReturnCode_t create_sample_data_message(Message_Block_Ptr data, DataSampleHeader &header_data, Message_Block_Ptr &message, const DDS::Time_t &source_timestamp, bool content_filter)
virtual DDS::ReturnCode_t set_listener(const ReplayerListener_rch &a_listener, DDS::StatusMask mask)
CORBA::String_var type_name_
The type name of associated topic.
Definition: ReplayerImpl.h:190
ResourceLimitsQosPolicy resource_limits
DDS::DataWriterQos passed_qos_
Definition: ReplayerImpl.h:196
virtual void data_dropped(const DataSampleElement *sample, bool dropped_by_transport)
DDS::ReturnCode_t enable()
int data_dropped_count_
Statistics counter.
Definition: ReplayerImpl.h:124
size_t total_length(void) const
virtual void remove_associations(const ReaderIdSeq &readers, CORBA::Boolean callback)
LM_WARNING
void set_num_subs(CORBA::ULong num_subs)
virtual DDS::ReturnCode_t get_qos(DDS::PublisherQos &publisher_qos, DDS::DataWriterQos &datawriter_qos)
size_t n_chunks_
The number of chunks for the cached allocator.
Definition: ReplayerImpl.h:184
bool dequeue(const DataSampleElement *stale)
OPENDDS_STRING conv_
bool notify_all()
Unblock all of the threads waiting on this condition.
GUID_t topic_id_
The associated topic repository id.
Definition: ReplayerImpl.h:220
virtual DDS::ReturnCode_t write(const RawDataSample &sample)
DDS::StatusMask listener_mask_
Definition: ReplayerImpl.h:228
DDS::InstanceHandle_t get_entity_instance_handle(const GUID_t &id, const RcHandle< DomainParticipantImpl > &participant)
Definition: EntityImpl.cpp:142
HANDLE_TYPE_NATIVE InstanceHandle_t
Definition: DdsDcpsCore.idl:51
void register_for_reader(const GUID_t &participant, const GUID_t &writerid, const GUID_t &readerid, const TransportLocatorSeq &locators, OpenDDS::DCPS::DiscoveryListener *listener)
ACE_TEXT("TCP_Factory")
virtual void on_replayer_matched(Replayer *replayer, const DDS::PublicationMatchedStatus &status)
Definition: Replayer.cpp:18
const unsigned long DURATION_INFINITE_NSEC
Definition: DdsDcpsCore.idl:73
unsigned long StatusMask
bool associate(const AssociationData &peer, bool active)
TransportPriorityQosPolicy transport_priority
unsigned long nanosec
CvStatus wait(ThreadStatusManager &thread_status_manager)
Block until thread is woken up.
Cached_Allocator_With_Overflow< ACE_Message_Block, ACE_Thread_Mutex > MessageBlockAllocator
AtomicBool enabled_
The flag indicates the entity is enabled.
Definition: EntityImpl.h:82
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ReliabilityQosPolicy reliability
DDS::Time_t source_timestamp_
The timestamp the sender put on the sample.
Definition: RawDataSample.h:46
DDS::ReturnCode_t cleanup()
virtual void control_dropped(const Message_Block_Ptr &sample, bool dropped_by_transport)
#define OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK(qos, error_rtn_value)
SequenceNumber sequence_number_
The sequence number unique in DataWriter scope.
Definition: ReplayerImpl.h:240
DDS::PublisherQos publisher_qos_
Definition: ReplayerImpl.h:235
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
virtual void update_subscription_params(const GUID_t &readerId, const DDS::StringSeq &exprParams)
virtual void register_for_reader(const GUID_t &participant, const GUID_t &writerid, const GUID_t &readerid, const TransportLocatorSeq &locators, DiscoveryListener *listener)
const SequenceNumber_t SEQUENCENUMBER_UNKNOWN
Definition: MessageTypes.h:50
virtual void notify_publication_reconnected(const ReaderIdSeq &subids)
Sequence number abstraction. Only allows positive 64 bit values.
static const ACE_Time_Value zero
static SequenceNumber SEQUENCENUMBER_UNKNOWN()
const ReturnCode_t RETCODE_ERROR
DDS::Topic_var topic_objref_
The object reference of the associated topic.
Definition: ReplayerImpl.h:222
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
LifespanQosPolicy lifespan
virtual GUID_t get_guid() const
int remove(Container &c, const ValueType &v)
Definition: Util.h:121
Cached_Allocator_With_Overflow< DataSampleElement, ACE_Null_Mutex > DataSampleElementAllocator
const ReturnCode_t RETCODE_OK
const ReturnCode_t RETCODE_UNSUPPORTED
ReplayerListener_rch listener_
Used to notify the entity for relevant events.
Definition: ReplayerImpl.h:230
virtual CORBA::Long get_priority_value(const AssociationData &data) const
#define ACE_DES_FREE(POINTER, DEALLOCATOR, CLASS)
virtual ReplayerListener_rch get_listener()
DDS::DomainId_t domain_id_
The domain id.
Definition: ReplayerImpl.h:232
virtual void update_incompatible_qos(const IncompatibleQosStatus &status)
DDS::InstanceHandle_t assign_handle(const GUID_t &id=GUID_UNKNOWN)
const long LENGTH_UNLIMITED
#define ACE_ERROR_RETURN(X, Y)
int insert(Container &c, const ValueType &v)
Definition: Util.h:105
const character_type * in(void) const
DataRepresentationIdSeq value
ConditionVariable< ACE_Recursive_Thread_Mutex > empty_condition_
Definition: ReplayerImpl.h:313
static bool valid(const DDS::UserDataQosPolicy &qos)
Definition: Qos_Helper.inl:723
#define ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY
DDS::InstanceHandle_t lookup_handle(const GUID_t &id) const
#define TheServiceParticipant
bool is_bit_
The time interval for sending liveliness message.
Definition: ReplayerImpl.h:306
static bool consistent(const DDS::ResourceLimitsQosPolicy &resource_limits, const DDS::HistoryQosPolicy &history)
Definition: Qos_Helper.inl:596
LM_ERROR
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
Defines the interface that allows DataWriters (and lower levels) to inform discovery.
CORBA::String_var topic_name_
The name of associated topic.
Definition: ReplayerImpl.h:218
DDS::OfferedIncompatibleQosStatus offered_incompatible_qos_status_
Status conditions.
Definition: ReplayerImpl.h:257
TypeIdentifierWithDependencies minimal
Definition: TypeObject.h:3373
#define OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(qos, error_rtn_value)
Cached_Allocator_With_Overflow< ACE_Data_Block, ACE_Thread_Mutex > DataBlockAllocator
sequence< string > StringSeq
Definition: DdsDcpsCore.idl:50