OpenDDS  Snapshot(2023/04/28-20:55)
RecorderImpl.cpp
Go to the documentation of this file.
1 /*
2  * Distributed under the OpenDDS License.
3  * See: http://www.opendds.org/license.html
4  */
5 
6 #include <DCPS/DdsDcps_pch.h> // Only the _pch include should start with DCPS/
7 
8 #include "RecorderImpl.h"
9 
10 #include "SubscriptionInstance.h"
12 #include "DomainParticipantImpl.h"
13 #include "Service_Participant.h"
14 #include "Qos_Helper.h"
16 #include "GuidConverter.h"
17 #include "Serializer.h"
18 #include "SubscriberImpl.h"
19 #include "Transient_Kludge.h"
20 #include "Util.h"
21 #include "QueryConditionImpl.h"
22 #include "ReadConditionImpl.h"
23 #include "MonitorFactory.h"
24 #include "SafetyProfileStreams.h"
25 #include "TypeSupportImpl.h"
26 #include "PoolAllocator.h"
27 #include "DCPS_Utils.h"
28 #ifndef DDS_HAS_MINIMUM_BIT
29 # include "BuiltInTopicUtils.h"
30 #endif
34 
35 #include <dds/DdsDcpsCoreC.h>
36 #include <dds/DdsDcpsGuidTypeSupportImpl.h>
37 #ifndef DDS_HAS_MINIMUM_BIT
38 # include <dds/DdsDcpsCoreTypeSupportC.h>
39 #endif
40 
41 #include <tao/ORB_Core.h>
42 
43 #include <ace/Reactor.h>
44 
45 #include <stdexcept>
46 
48 
49 namespace OpenDDS {
50 namespace DCPS {
51 
53  : qos_(TheServiceParticipant->initial_DataReaderQos())
54  , participant_servant_(0)
55  , topic_servant_(0)
56 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
57  , is_exclusive_ownership_(false)
58 #endif
59 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
60  , owner_manager_(0)
61 #endif
62  , subqos_(TheServiceParticipant->initial_SubscriberQos())
63  , topic_desc_(0)
64  , listener_mask_(DEFAULT_STATUS_MASK)
65  , domain_id_(0)
66  , is_bit_(false)
67  , check_encap_(true)
69 {
74 
80 }
81 
82 // This method is called when there are no longer any reference to the
83 // the servant.
85 {
86  DBG_ENTRY_LVL("RecorderImpl","~RecorderImpl",6);
87 }
88 
89 
92 {
93 
94  Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
95  if (!disco || !disco->remove_subscription(domain_id_,
98  if (log_level >= LogLevel::Notice) {
99  ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: RecorderImpl::cleanup: "
100  "could not remove subscription from discovery\n"));
101  }
102  return DDS::RETCODE_ERROR;
103  }
104 
105  // Call remove association before unregistering the datareader from the transport,
106  // otherwise some callbacks resulted from remove_association may lost.
107 
109 
110  return DDS::RETCODE_OK;
111 }
112 
114  TopicDescriptionImpl* a_topic_desc,
115  const DDS::DataReaderQos & qos,
116  RecorderListener_rch a_listener,
117  const DDS::StatusMask & mask,
119  DDS::SubscriberQos subqos)
120 {
121  if (DCPS_debug_level >= 2) {
122  ACE_DEBUG((LM_DEBUG, "(%P|%t) RecorderImpl::init\n"));
123  }
124 
125 
126  topic_desc_ = DDS::TopicDescription::_duplicate(a_topic_desc);
127  if (TopicImpl* a_topic = dynamic_cast<TopicImpl*>(a_topic_desc)) {
128  topic_servant_ = a_topic;
129  }
130 
131  CORBA::String_var topic_name = a_topic_desc->get_name();
132  qos_ = qos;
133  passed_qos_ = qos;
134 
135 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
137 #endif
138 
139  listener_ = a_listener;
140  listener_mask_ = mask;
141 
142  // Only store the participant pointer, since it is our "grand"
143  // parent, we will exist as long as it does
145 
146 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
149  }
150 #endif
151 
153  subqos_ = subqos;
154 }
155 
157 {
159  return ti.is_reliable();
160  }
161  return true;
162 }
163 
165 {
166  return subscription_id_;
167 }
168 
170 {
172 }
173 
174 
176 {
177  DBG_ENTRY_LVL("RecorderImpl","data_received",6);
178 
179  // Ensure some other thread is not changing the sample container
180  // or statuses related to samples.
182 
183  if (DCPS_debug_level >= 8) {
185  "(%P|%t) RecorderImpl::data_received: "
186  "%C received sample: %C\n",
187  LogGuid(subscription_id_).c_str(),
188  to_string(sample.header_).c_str()));
189  }
190 
191  // we only support SAMPLE_DATA messages
192  if (sample.header_.message_id_ == SAMPLE_DATA && listener_.in()) {
193  Message_Block_Ptr payload(sample.data(&mb_alloc_));
195  if (sample.header_.cdr_encapsulation_ && check_encap_) {
196  Encoding enc;
197  Serializer ser(payload.get(), enc);
198  EncapsulationHeader encap;
199  if (ser >> encap && encap.to_any_encoding(enc)) {
200  kind = enc.kind();
201  }
202  }
203  RawDataSample rawSample(sample.header_,
204  static_cast<MessageId> (sample.header_.message_id_),
207  sample.header_.publication_id_,
208  sample.header_.byte_order_,
209  payload.get(),
210  kind);
211  listener_->on_sample_data_received(this, rawSample);
212  }
213 }
214 
216 {
217 }
218 
220 {
221 }
222 
223 void
225 {
226 }
227 
229 {
230 }
231 
232 #ifndef OPENDDS_SAFETY_PROFILE
233 void
235 {
237  DDS::DynamicType_var dt = tls->type_identifier_to_dynamic(ti, pub_id);
238  if (DCPS_debug_level >= 4) {
240  "(%P|%t) RecorderImpl::add_association: "
241  "DynamicType added to map with guid: %C\n", LogGuid(pub_id).c_str()));
242  }
243  dt_map_.insert(std::make_pair(pub_id, dt));
244 }
245 #endif
246 
247 void
249  const WriterAssociation& writer,
250  bool active)
251 {
252  if (DCPS_debug_level >= 4) {
253  ACE_DEBUG((LM_DEBUG, "(%P|%t) RecorderImpl::add_association: "
254  "bit %d local %C remote %C\n",
255  is_bit_,
256  LogGuid(yourId).c_str(),
257  LogGuid(writer.writerId).c_str()));
258  }
259 
260  //
261  // This block prevents adding associations to deleted readers.
262  // Presumably this is a "good thing(tm)".
263  //
264  // if (entity_deleted_) {
265  // if (DCPS_debug_level >= 1)
266  // ACE_DEBUG((LM_DEBUG,
267  // ACE_TEXT("(%P|%t) RecorderImpl::add_association")
268  // ACE_TEXT(" This is a deleted datareader, ignoring add.\n")));
269  //
270  // return;
271  // }
272 
273  //
274  // We are being called back from the repository before we are done
275  // processing after our call to the repository that caused this call
276  // (from the repository) to be made.
277  //
279  // add_associations was invoked before DCSPInfoRepo::add_subscription() returned.
280  subscription_id_ = yourId;
281  }
282 
283  //
284  // We do the following while holding the publication_handle_lock_.
285  //
286  {
288 
289  //
290  // For each writer in the list of writers to associate with, we
291  // create a WriterInfo and a WriterStats object and store them in
292  // our internal maps.
293  //
294  {
296 
297  const GUID_t& writer_id = writer.writerId;
298  RcHandle<WriterInfo> info ( make_rch<WriterInfo>(rchandle_from<WriterInfoListener>(this), writer_id, writer.writerQos));
299  /*std::pair<WriterMapType::iterator, bool> bpair =*/
300  writers_.insert(
301  // This insertion is idempotent.
302  WriterMapType::value_type(
303  writer_id,
304  info));
305  // statistics_.insert(
306  // StatsMapType::value_type(
307  // writer_id,
308  // WriterStats(
309  // raw_latency_buffer_size_,
310  // raw_latency_buffer_type_)));
311 
312  // if (DCPS_debug_level > 4) {
313  // GuidConverter converter(writer_id);
314  // ACE_DEBUG((LM_DEBUG,
315  // "(%P|%t) RecorderImpl::add_association: "
316  // "inserted writer %C.return %d\n",
317  // OPENDDS_STRING(converter).c_str(), bpair.second));
318  //
319  // WriterMapType::iterator iter = writers_.find(writer_id);
320  // if (iter != writers_.end()) {
321  // // This may not be an error since it could happen that the sample
322  // // is delivered to the datareader after the write is dis-associated
323  // // with this datareader.
324  // GuidConverter reader_converter(subscription_id_);
325  // GuidConverter writer_converter(writer_id);
326  // ACE_DEBUG((LM_DEBUG,
327  // ACE_TEXT("(%P|%t) RecorderImpl::add_association: ")
328  // ACE_TEXT("reader %C is associated with writer %C.\n"),
329  // OPENDDS_STRING(reader_converter).c_str(),
330  // OPENDDS_STRING(writer_converter).c_str()));
331  // }
332  // }
333  }
334 
335  //
336  // Propagate the add_associations processing down into the Transport
337  // layer here. This will establish the transport support and reserve
338  // usage of an existing connection or initiate creation of a new
339  // connection if no suitable connection is available.
340  //
341  AssociationData data;
342  data.remote_id_ = writer.writerId;
343  data.remote_data_ = writer.writerTransInfo;
344  data.discovery_locator_ = writer.writerDiscInfo;
348  data.remote_reliable_ =
350  data.remote_durable_ =
352 
353  if (!associate(data, active)) {
354  if (log_level >= LogLevel::Warning) {
355  ACE_ERROR((LM_WARNING, "(%P|%t) WARNING: RecorderImpl::add_association: "
356  "transport layer failed to associate\n"));
357  }
358  return;
359  }
360 
361  // Check if any publications have already sent a REQUEST_ACK message.
362  // {
363  // ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, writers_lock_);
364  //
365  // WriterMapType::iterator where = writers_.find(writer.writerId);
366  //
367  // if (where != writers_.end()) {
368  // const MonotonicTimePoint now = MonotonicTimePoint::now();
369  //
370  // ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
371  //
372  // if (where->second->should_ack(now)) {
373  // const SequenceNumber sequence = where->second->ack_sequence();
374  // if (send_sample_ack(writer.writerId, sequence, now.to_dds_time())) {
375  // where->second->clear_acks(sequence);
376  // }
377  // }
378  // }
379  // }
380 
381  //
382  // LIVELINESS policy timers are managed here.
383  //
384  // if (liveliness_lease_duration_ != TimeDuration::zero) {
385  // // this call will start the timer if it is not already set
386  // const MonotonicTimePoint now = MonotonicTimePoint::now();
387  //
388  // if (DCPS_debug_level >= 5) {
389  // GuidConverter converter(subscription_id_);
390  // ACE_DEBUG((LM_DEBUG,
391  // ACE_TEXT("(%P|%t) RecorderImpl::add_association: ")
392  // ACE_TEXT("starting/resetting liveliness timer for reader %C\n"),
393  // OPENDDS_STRING(converter).c_str()));
394  // }
395  //
396  // handle_timeout(now, this);
397  // }
398 
399  // else - no timer needed when LIVELINESS.lease_duration is INFINITE
400 
401  }
402  //
403  // We no longer hold the publication_handle_lock_.
404  //
405 
406  //
407  // We only do the following processing for readers that are *not*
408  // readers of Builtin Topics.
409  //
410  if (!is_bit_) {
411 
413 
414  //
415  // We acquire the publication_handle_lock_ for the remainder of our
416  // processing.
417  //
418  {
420 
421  // This insertion is idempotent.
422  id_to_handle_map_.insert(
423  RepoIdToHandleMap::value_type(writer.writerId, handle));
424 
425  if (DCPS_debug_level > 4) {
427  ACE_TEXT("(%P|%t) RecorderImpl::add_association: ")
428  ACE_TEXT("id_to_handle_map_[ %C] = 0x%x.\n"),
429  LogGuid(writer.writerId).c_str(),
430  handle));
431  }
432 
433  // We need to adjust these after the insertions have all completed
434  // since insertions are not guaranteed to increase the number of
435  // currently matched publications.
436  int matchedPublications = static_cast<int>(id_to_handle_map_.size());
438  = matchedPublications - subscription_match_status_.current_count;
439  subscription_match_status_.current_count = matchedPublications;
440 
443 
445 
446  // set_status_changed_flag(DDS::SUBSCRIPTION_MATCHED_STATUS, true);
447 
448 
449  if (listener_.in()) {
451  this,
453 
454  // TBD - why does the spec say to change this but not change
455  // the ChangeFlagStatus after a listener call?
456 
457  // Client will look at it so next time it looks the change should be 0
460  }
461 
462  // notify_status_condition();
463  }
464 
465  {
468 
469  if (!writers_.count(writer.writerId)) {
470  return;
471  }
472 
473  writers_[writer.writerId]->handle(handle);
474  }
475  }
476 
477  // if (monitor_) {
478  // monitor_->report();
479  // }
480 }
481 
482 void
484  bool notify_lost)
485 {
486  DBG_ENTRY_LVL("RecorderImpl", "remove_associations", 6);
487  if (writers.length() == 0) {
488  return;
489  }
490 
491  if (DCPS_debug_level >= 4) {
493  ACE_TEXT("(%P|%t) RecorderImpl::remove_associations: ")
494  ACE_TEXT("bit %d local %C remote %C num remotes %d\n"),
495  is_bit_,
496  LogGuid(subscription_id_).c_str(),
497  LogGuid(writers[0]).c_str(),
498  writers.length()));
499  }
500  if (!get_deleted()) {
501  // stop pending associations for these writer ids
502  stop_associating(writers.get_buffer(), writers.length());
503  }
504 
505  remove_associations_i(writers, notify_lost);
506 }
507 
508 void
510  bool notify_lost)
511 {
512  DBG_ENTRY_LVL("RecorderImpl", "remove_associations_i", 6);
513 
514  if (writers.length() == 0) {
515  return;
516  }
517 
518  if (DCPS_debug_level >= 4) {
520  ACE_TEXT("(%P|%t) RecorderImpl::remove_associations_i: ")
521  ACE_TEXT("bit %d local %C remote %C num remotes %d\n"),
522  is_bit_,
523  LogGuid(subscription_id_).c_str(),
524  LogGuid(writers[0]).c_str(),
525  writers.length()));
526  }
527  DDS::InstanceHandleSeq handles;
528 
530 
531  // This is used to hold the list of writers which were actually
532  // removed, which is a proper subset of the writers which were
533  // requested to be removed.
534  WriterIdSeq updated_writers;
535 
536  CORBA::ULong wr_len;
537 
538  //Remove the writers from writer list. If the supplied writer
539  //is not in the cached writers list then it is already removed.
540  //We just need remove the writers in the list that have not been
541  //removed.
542  {
544 
545  wr_len = writers.length();
546 
547  for (CORBA::ULong i = 0; i < wr_len; i++) {
548  GUID_t writer_id = writers[i];
549 
550 #ifndef OPENDDS_SAFETY_PROFILE
551  if (dt_map_.erase(writer_id) == 0) {
552  if (DCPS_debug_level >= 4) {
553  ACE_DEBUG((LM_DEBUG, "(%P|%t) RecorderImpl::remove_associations_i: -"
554  "failed to find writer_id in the DynamicTypeByPubId map.\n"));
555  }
556  }
557 #endif
558 
559  WriterMapType::iterator it = writers_.find(writer_id);
560  if (it != writers_.end()) {
561  it->second->removed();
562  }
563 
564  if (writers_.erase(writer_id) == 0) {
565  if (DCPS_debug_level >= 4) {
567  ACE_TEXT("(%P|%t) RecorderImpl::remove_associations_i: ")
568  ACE_TEXT("the writer local %C was already removed.\n"),
569  LogGuid(writer_id).c_str()));
570  }
571 
572  } else {
573  push_back(updated_writers, writer_id);
574  }
575  }
576  }
577 
578  wr_len = updated_writers.length();
579 
580  // Return now if the supplied writers have been removed already.
581  if (wr_len == 0) {
582  return;
583  }
584 
585  if (!is_bit_) {
586  // The writer should be in the id_to_handle map at this time. Note
587  // it if it not there.
588  lookup_instance_handles(updated_writers, handles);
589 
590  for (CORBA::ULong i = 0; i < wr_len; ++i) {
591  id_to_handle_map_.erase(updated_writers[i]);
592  }
593  }
594  for (CORBA::ULong i = 0; i < updated_writers.length(); ++i) {
595  disassociate(updated_writers[i]);
596  }
597 
598  // Mirror the add_associations SUBSCRIPTION_MATCHED_STATUS processing.
599  if (!is_bit_) {
600  // Derive the change in the number of publications writing to this reader.
601  int matchedPublications = static_cast<int>(id_to_handle_map_.size());
603  = matchedPublications - subscription_match_status_.current_count;
604 
605  // Only process status if the number of publications has changed.
607  subscription_match_status_.current_count = matchedPublications;
608  /// Section 7.1.4.1: total_count will not decrement.
609 
610  /// @TODO: Reconcile this with the verbiage in section 7.1.4.1
612  = handles[ wr_len - 1];
613 
614  // set_status_changed_flag(DDS::SUBSCRIPTION_MATCHED_STATUS, true);
615 
616  // DDS::DataReaderListener_var listener
617  // = listener_for(DDS::SUBSCRIPTION_MATCHED_STATUS);
618 
619  if (listener_.in()) {
621  this,
623 
624  // Client will look at it so next time it looks the change should be 0
627  }
628 
629  // notify_status_condition();
630  }
631  }
632 
633  // If this remove_association is invoked when the InfoRepo
634  // detects a lost writer then make a callback to notify
635  // subscription lost.
636  if (notify_lost) {
637  notify_subscription_lost(handles);
638  }
639 
640  // if (monitor_) {
641  // monitor_->report();
642  // }
643 
644  for (unsigned int i = 0; i < handles.length(); ++i) {
645  participant_servant_->return_handle(handles[i]);
646  }
647 }
648 
649 void
651 {
652  DBG_ENTRY_LVL("RecorderImpl","remove_all_associations",6);
653 
655  int size;
656 
658 
659  {
661 
662  size = static_cast<int>(writers_.size());
663  writers.length(size);
664 
665  WriterMapType::iterator curr_writer = writers_.begin();
666  WriterMapType::iterator end_writer = writers_.end();
667 
668  int i = 0;
669 
670  while (curr_writer != end_writer) {
671  writers[i++] = curr_writer->first;
672  ++curr_writer;
673  }
674  }
675 
676  try {
677  CORBA::Boolean dont_notify_lost = false;
678 
679  if (0 < size) {
680  remove_associations(writers, dont_notify_lost);
681  }
682 
683  } catch (const CORBA::Exception&) {
684  }
685 
686  transport_stop();
687 }
688 
689 void
691 {
693  guard,
695 
697  // This test should make the method idempotent.
698  return;
699  }
700 
701  // set_status_changed_flag(DDS::REQUESTED_INCOMPATIBLE_QOS_STATUS,
702  // true);
703 
704  // copy status and increment change
707  status.count_since_last_send;
709  status.last_policy_id;
711 
712  // if (!CORBA::is_nil(listener.in())) {
713  // listener->on_requested_incompatible_qos(this,
714  // requested_incompatible_qos_status_);
715  //
716  // // TBD - why does the spec say to change total_count_change but not
717  // // change the ChangeFlagStatus after a listener call?
718  //
719  // // client just looked at it so next time it looks the
720  // // change should be 0
721  // requested_incompatible_qos_status_.total_count_change = 0;
722  // }
723  //
724  // notify_status_condition();
725 }
726 
727 void
728 RecorderImpl::signal_liveliness(const GUID_t& remote_participant)
729 {
730  GUID_t prefix = remote_participant;
731  prefix.entityId = EntityId_t();
732 
734 
735  typedef std::pair<GUID_t, RcHandle<WriterInfo> > WriterSetElement;
736  typedef OPENDDS_VECTOR(WriterSetElement) WriterSet;
737  WriterSet writers;
738 
739  {
741  for (WriterMapType::iterator pos = writers_.lower_bound(prefix),
742  limit = writers_.end();
743  pos != limit && equal_guid_prefixes(pos->first, prefix);
744  ++pos) {
745  writers.push_back(std::make_pair(pos->first, pos->second));
746  }
747  }
748 
750  for (WriterSet::iterator pos = writers.begin(), limit = writers.end();
751  pos != limit;
752  ++pos) {
753  pos->second->received_activity(now);
754  }
755 }
756 
758  const DDS::SubscriberQos & subscriber_qos,
759  const DDS::DataReaderQos & qos)
760 {
762 
763  if (Qos_Helper::valid(subscriber_qos) && Qos_Helper::consistent(subscriber_qos)) {
764  if (subqos_ != subscriber_qos) {
765  // for the not changeable qos, it can be changed before enable
766  if (!Qos_Helper::changeable(subqos_, subscriber_qos) && enabled_) {
768 
769  } else {
770  subqos_ = subscriber_qos;
771  }
772  }
773  } else {
775  }
776 
780 
781  if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
782  if (qos_ == qos)
783  return DDS::RETCODE_OK;
784 
785  if (!Qos_Helper::changeable(qos_, qos) && is_enabled()) {
787 
788  } else {
789  Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
790  const bool status =
791  disco->update_subscription_qos(
795  qos,
796  subscriber_qos);
797  if (!status) {
798  if (log_level >= LogLevel::Notice) {
799  ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: RecorderImpl::set_qos: qos not updated\n"));
800  }
801  return DDS::RETCODE_ERROR;
802  }
803  }
804 
805  qos_ = qos;
806  subqos_ = subscriber_qos;
807 
808  return DDS::RETCODE_OK;
809 
810  } else {
812  }
813 }
814 
817  DDS::SubscriberQos & subscriber_qos,
818  DDS::DataReaderQos & qos)
819 {
820  qos = passed_qos_;
821  subscriber_qos = subqos_;
822  return DDS::RETCODE_OK;
823 }
824 
827  DDS::StatusMask mask)
828 {
829  listener_mask_ = mask;
830  //note: OK to duplicate a nil object ref
831  listener_ = a_listener;
832  return DDS::RETCODE_OK;
833 }
834 
837 {
838  return listener_;
839 }
840 
841 void
843  DDS::InstanceHandleSeq & hdls)
844 {
845  CORBA::ULong const num_wrts = ids.length();
846 
847  if (DCPS_debug_level > 9) {
848  OPENDDS_STRING separator = "";
849  OPENDDS_STRING buffer;
850 
851  for (CORBA::ULong i = 0; i < num_wrts; ++i) {
852  buffer += separator + LogGuid(ids[i]).conv_;
853  separator = ", ";
854  }
855 
857  ACE_TEXT("(%P|%t) RecorderImpl::lookup_instance_handles: ")
858  ACE_TEXT("searching for handles for writer Ids: %C.\n"),
859  buffer.c_str()));
860  }
861 
862  hdls.length(num_wrts);
863 
864  for (CORBA::ULong i = 0; i < num_wrts; ++i) {
865  hdls[i] = participant_servant_->lookup_handle(ids[i]);
866  }
867 }
868 
871 {
872  if (DCPS_debug_level >= 2) {
874  ACE_TEXT("(%P|%t) RecorderImpl::enable\n")));
875  }
876  //According spec:
877  // - Calling enable on an already enabled Entity returns OK and has no
878  // effect.
879  // - Calling enable on an Entity whose factory is not enabled will fail
880  // and return PRECONDITION_NOT_MET.
881 
882  if (is_enabled()) {
883  return DDS::RETCODE_OK;
884  }
885 
886  set_enabled();
887 
888  // if (topic_servant_ && !transport_disabled_) {
889  if (topic_servant_) {
890  if (DCPS_debug_level >= 2) {
891  ACE_DEBUG((LM_DEBUG, "(%P|%t) RecorderImpl::enable: enable_transport\n"));
892  }
893 
894  try {
897  } catch (const Transport::Exception&) {
898  if (log_level >= LogLevel::Warning) {
899  ACE_ERROR((LM_WARNING, "(%P|%t) WARNING: RecorderImpl::enable: Transport Exception\n"));
900  }
901  return DDS::RETCODE_ERROR;
902  }
903 
904  const TransportLocatorSeq& trans_conf_info = connection_info();
905 
906  CORBA::String_var filterClassName = "";
907  CORBA::String_var filterExpression = "";
908  DDS::StringSeq exprParams;
909 
910  Discovery_rch disco =
911  TheServiceParticipant->get_discovery(domain_id_);
912 
914  if (!topic_servant_->check_data_representation(qos_.representation.value, false)) {
915  return DDS::RETCODE_ERROR;
916  }
917 
918  if (DCPS_debug_level >= 2) {
919  ACE_DEBUG((LM_DEBUG, "(%P|%t) RecorderImpl::enable: add_subscription\n"));
920  }
921 
922  XTypes::TypeInformation type_info;
923 
925  disco->add_subscription(domain_id_,
927  topic_servant_->get_id(),
928  rchandle_from(this),
929  qos_,
930  trans_conf_info,
931  subqos_,
932  filterClassName,
933  filterExpression,
934  exprParams,
935  type_info);
936 
938  if (log_level >= LogLevel::Warning) {
939  ACE_ERROR((LM_WARNING, "(%P|%t) WARNING: RecorderImpl::enable: "
940  "add_subscription returned invalid id\n"));
941  }
942  return DDS::RETCODE_ERROR;
943  }
944  }
945 
946  return DDS::RETCODE_OK;
947 }
948 
951 {
953 }
954 
955 void
957  const GUID_t& readerid,
958  const GUID_t& writerid,
959  const TransportLocatorSeq& locators,
960  DiscoveryListener* listener)
961 {
962  TransportClient::register_for_writer(participant, readerid, writerid, locators, listener);
963 }
964 
965 void
967  const GUID_t& readerid,
968  const GUID_t& writerid)
969 {
970  TransportClient::unregister_for_writer(participant, readerid, writerid);
971 }
972 
973 #if !defined (DDS_HAS_MINIMUM_BIT)
977 {
978  const DDS::InstanceHandle_t publication_handle = participant_servant_->lookup_handle(id);
979 
981  guard,
984 
985  DDS::PublicationBuiltinTopicDataSeq data;
986 
987  DDS::ReturnCode_t const ret = instance_handle_to_bit_data<DDS::PublicationBuiltinTopicDataDataReader_var>(
990  publication_handle,
991  data);
992 
993  if (ret == DDS::RETCODE_OK) {
994  key = data[0].key;
995  }
996 
997  return ret;
998 }
999 #endif // !defined (DDS_HAS_MINIMUM_BIT)
1000 
1001 #ifndef OPENDDS_SAFETY_PROFILE
1002 DDS::DynamicData_ptr RecorderImpl::get_dynamic_data(const RawDataSample& sample)
1003 {
1004  const Encoding enc(sample.encoding_kind_, sample.header_.byte_order_ ? ENDIAN_LITTLE : ENDIAN_BIG);
1005  const DynamicTypeByPubId::const_iterator dt_found = dt_map_.find(sample.publication_id_);
1006  if (dt_found == dt_map_.end()) {
1007  if (log_level >= LogLevel::Error) {
1008  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: RecorderImpl::get_dynamic_data: "
1009  "failed to find GUID: %C in DynamicTypeByPubId.\n", LogGuid(sample.publication_id_).c_str()));
1010  }
1011  return 0;
1012  }
1013 
1014  DDS::DynamicType_var dt = dt_found->second;
1016  DDS::DynamicData_var dd_var = dd;
1017  if (!dd->check_xcdr1_mutable(dt)) {
1018  if (log_level >= LogLevel::Notice) {
1019  ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: RecorderImpl::get_dynamic_data: "
1020  "Encountered unsupported combination of XCDR1 encoding and mutable extensibility.\n"));
1021  }
1022  return 0;
1023  }
1024  return dd_var._retn();
1025 }
1026 #endif
1027 
1028 } // namespace DCPS
1029 } // namespace
1030 
DataSampleHeader header_
The demarshalled sample header.
Implements the OpenDDS::DCPS::DomainParticipant interfaces.
#define ACE_DEBUG(X)
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
sequence< InstanceHandle_t > InstanceHandleSeq
Definition: DdsDcpsCore.idl:53
ACE_CDR::Long Long
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
typedef OPENDDS_VECTOR(WeakRcHandle< TransportImpl >) ImplsType
DDS::ReturnCode_t set_enabled()
Definition: EntityImpl.cpp:40
DDS::StatusMask listener_mask_
Definition: RecorderImpl.h:184
const InstanceHandle_t HANDLE_NIL
RecorderListener_rch listener_
Definition: RecorderImpl.h:185
char message_id_
The enum MessageId.
Base class to hold configuration settings for TransportImpls.
Definition: TransportInst.h:64
void enable_transport(bool reliable, bool durable)
const ReturnCode_t RETCODE_INCONSISTENT_POLICY
ReliabilityQosPolicy reliability
DDS::DataReaderQos qos_
Definition: RecorderImpl.h:164
RecorderListener_rch get_listener()
const TransportLocatorSeq & connection_info() const
DurabilityQosPolicy durability
TransportLocatorSeq remote_data_
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
virtual bool check_transport_qos(const TransportInst &inst)
Message_Block_Ptr sample_
The data in unspecified format.
Definition: RawDataSample.h:53
sequence< octet > key
GUID_t publication_id_
Id of the datawriter that sent the sample.
Definition: RawDataSample.h:48
static bool changeable(const DDS::UserDataQosPolicy &qos1, const DDS::UserDataQosPolicy &qos2)
Definition: Qos_Helper.inl:948
OwnershipQosPolicy ownership
const char * c_str() const
DDS::TopicDescription_var topic_desc_
Definition: RecorderImpl.h:183
#define ACE_READ_GUARD(MUTEX, OBJ, LOCK)
TransportLocator discovery_locator_
ACE_CDR::ULong remote_transport_context_
DDS::SubscriberQos subqos_
Definition: RecorderImpl.h:179
#define OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(qos, error_rtn_value)
OwnershipQosPolicyKind kind
sequence< TransportLocator > TransportLocatorSeq
DomainParticipantImpl * participant()
Definition: RecorderImpl.h:129
void return_handle(DDS::InstanceHandle_t handle)
void disassociate(const GUID_t &peerId)
ACE_RW_Thread_Mutex writers_lock_
RW lock for reading/writing publications.
Definition: RecorderImpl.h:206
void set_reader_effective_data_rep_qos(DDS::DataRepresentationIdSeq &qos)
Definition: DCPS_Utils.cpp:517
virtual void data_received(const ReceivedDataSample &sample)
DDS::DynamicData_ptr get_dynamic_data(const RawDataSample &sample)
const DDS::StatusMask DEFAULT_STATUS_MASK
void unregister_for_writer(const GUID_t &participant, const GUID_t &readerid, const GUID_t &writerid)
DDS::ReturnCode_t set_qos(const DDS::SubscriberQos &subscriber_qos, const DDS::DataReaderQos &datareader_qos)
const ReturnCode_t RETCODE_IMMUTABLE_POLICY
#define OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(qos, error_rtn_value)
static TimePoint_T< MonotonicClock > now()
Definition: TimePoint_T.inl:41
const char *const BUILT_IN_PUBLICATION_TOPIC
Implements the DDS::Topic interface.
Definition: TopicImpl.h:37
#define OPENDDS_STRING
DDS::ReturnCode_t get_qos(DDS::SubscriberQos &subscriber_qos, DDS::DataReaderQos &datareader_qos)
LM_DEBUG
DomainParticipantImpl * participant_servant_
Definition: RecorderImpl.h:170
ReliabilityQosPolicyKind kind
ACE_CDR::ULong ULong
TransportMessageBlockAllocator mb_alloc_
Definition: RecorderImpl.h:214
DDS::DomainId_t domain_id_
Definition: RecorderImpl.h:186
DurabilityQosPolicyKind kind
Class to serialize and deserialize data for DDS.
Definition: Serializer.h:369
DurabilityQosPolicy durability
DDS::RequestedIncompatibleQosStatus requested_incompatible_qos_status_
Definition: RecorderImpl.h:193
virtual void remove_associations(const WriterIdSeq &writers, CORBA::Boolean callback)
ACE_CDR::Boolean Boolean
Holds a data sample received by the transport.
DDS::ReturnCode_t cleanup()
DataRepresentationQosPolicy representation
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
DDS::ReturnCode_t enable()
void lookup_instance_handles(const WriterIdSeq &ids, DDS::InstanceHandleSeq &hdls)
Lookup the instance handles by the publication repo ids.
LM_NOTICE
virtual void remove_associations_i(const WriterIdSeq &writers, bool callback)
RepoIdToHandleMap id_to_handle_map_
Definition: RecorderImpl.h:191
DDS::DataReaderQos passed_qos_
Definition: RecorderImpl.h:165
sequence< GUID_t > WriterIdSeq
OwnershipManager * owner_manager_
Definition: RecorderImpl.h:176
DataSampleHeader header_
The sample data header.
Definition: RawDataSample.h:42
virtual void register_for_writer(const GUID_t &, const GUID_t &, const GUID_t &, const TransportLocatorSeq &, DiscoveryListener *)
LM_WARNING
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
Definition: RecorderImpl.h:168
virtual void unregister_for_writer(const GUID_t &, const GUID_t &, const GUID_t &)
OPENDDS_STRING conv_
virtual void on_recorder_matched(Recorder *recorder, const DDS::SubscriptionMatchedStatus &status)=0
ACE_Recursive_Thread_Mutex publication_handle_lock_
Definition: RecorderImpl.h:188
DDS::InstanceHandle_t get_entity_instance_handle(const GUID_t &id, const RcHandle< DomainParticipantImpl > &participant)
Definition: EntityImpl.cpp:142
HANDLE_TYPE_NATIVE InstanceHandle_t
Definition: DdsDcpsCore.idl:51
void init(TopicDescriptionImpl *a_topic_desc, const DDS::DataReaderQos &qos, RecorderListener_rch a_listener, const DDS::StatusMask &mask, DomainParticipantImpl *participant, DDS::SubscriberQos subqos)
ACE_TEXT("TCP_Factory")
unsigned long StatusMask
bool associate(const AssociationData &peer, bool active)
TransportPriorityQosPolicy transport_priority
virtual GUID_t get_guid() const
virtual void update_incompatible_qos(const IncompatibleQosStatus &status)
OpenDDS_Dcps_Export bool equal_guid_prefixes(const GuidPrefix_t &lhs, const GuidPrefix_t &rhs)
Definition: GuidUtils.h:132
AtomicBool enabled_
The flag indicates the entity is enabled.
Definition: EntityImpl.h:82
virtual void add_association(const GUID_t &yourId, const WriterAssociation &writer, bool active)
void push_back(Seq &seq, const typename Seq::value_type &val)
std::vector-style push_back() for CORBA Sequences
Definition: Util.h:138
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
XTypes::TypeLookupService_rch get_type_lookup_service()
ReliabilityQosPolicy reliability
OpenDDS_Dcps_Export LogLevel log_level
DDS::SubscriptionMatchedStatus subscription_match_status_
Definition: RecorderImpl.h:194
#define OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK(qos, error_rtn_value)
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
Implements the DDS::TopicDescription interface.
virtual void signal_liveliness(const GUID_t &remote_participant)
const ReturnCode_t RETCODE_ERROR
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
ACE_Message_Block * data(ACE_Allocator *mb_alloc=0) const
DynamicTypeByPubId dt_map_
Definition: RecorderImpl.h:210
const ReturnCode_t RETCODE_OK
const ReturnCode_t RETCODE_UNSUPPORTED
virtual void on_sample_data_received(Recorder *recorder, const RawDataSample &sample)=0
void add_to_dynamic_type_map(const GUID_t &pub_id, const XTypes::TypeIdentifier &ti)
void register_for_writer(const GUID_t &participant, const GUID_t &readerid, const GUID_t &writerid, const TransportLocatorSeq &locators, DiscoveryListener *listener)
const char * to_string(MessageId value)
DDS::InstanceHandle_t assign_handle(const GUID_t &id=GUID_UNKNOWN)
DataRepresentationIdSeq value
virtual void notify_subscription_disconnected(const WriterIdSeq &pubids)
virtual void notify_subscription_lost(const WriterIdSeq &pubids)
#define ACE_WRITE_GUARD(MUTEX, OBJ, LOCK)
static bool valid(const DDS::UserDataQosPolicy &qos)
Definition: Qos_Helper.inl:723
DDS::InstanceHandle_t lookup_handle(const GUID_t &id) const
#define TheServiceParticipant
DDS::ReturnCode_t set_listener(const RecorderListener_rch &a_listener, DDS::StatusMask mask)
static bool consistent(const DDS::ResourceLimitsQosPolicy &resource_limits, const DDS::HistoryQosPolicy &history)
Definition: Qos_Helper.inl:596
LM_ERROR
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
Defines the interface that allows DataWriters (and lower levels) to inform discovery.
TopicDescriptionPtr< TopicImpl > topic_servant_
Definition: RecorderImpl.h:171
virtual bool is_reliable() const =0
Does the transport as configured support RELIABLE_RELIABILITY_QOS?
virtual CORBA::Long get_priority_value(const AssociationData &data) const
virtual DDS::ReturnCode_t repoid_to_bit_key(const DCPS::GUID_t &id, DDS::BuiltinTopicKey_t &key)
#define OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(qos, error_rtn_value)
Encoding::Kind encoding_kind_
Holds information on which type of encoding was read from the encapsulation header.
Definition: RawDataSample.h:55
key EntityId_t entityId
Definition: DdsDcpsGuid.idl:59
sequence< string > StringSeq
Definition: DdsDcpsCore.idl:50
virtual void notify_subscription_reconnected(const WriterIdSeq &pubids)
virtual DDS::InstanceHandle_t get_instance_handle()