Line data Source code
1 : /*
2 : * Distributed under the OpenDDS License.
3 : * See: http://www.opendds.org/license.html
4 : */
5 :
6 : #include <DCPS/DdsDcps_pch.h> // Only the _pch include should start with DCPS/
7 :
8 : #include "RecorderImpl.h"
9 :
10 : #include "SubscriptionInstance.h"
11 : #include "ReceivedDataElementList.h"
12 : #include "DomainParticipantImpl.h"
13 : #include "Service_Participant.h"
14 : #include "Qos_Helper.h"
15 : #include "FeatureDisabledQosCheck.h"
16 : #include "GuidConverter.h"
17 : #include "Serializer.h"
18 : #include "SubscriberImpl.h"
19 : #include "Transient_Kludge.h"
20 : #include "Util.h"
21 : #include "QueryConditionImpl.h"
22 : #include "ReadConditionImpl.h"
23 : #include "MonitorFactory.h"
24 : #include "SafetyProfileStreams.h"
25 : #include "TypeSupportImpl.h"
26 : #include "PoolAllocator.h"
27 : #include "DCPS_Utils.h"
28 : #ifndef DDS_HAS_MINIMUM_BIT
29 : # include "BuiltInTopicUtils.h"
30 : #endif
31 : #include "XTypes/DynamicDataXcdrReadImpl.h"
32 : #include "transport/framework/EntryExit.h"
33 : #include "transport/framework/TransportExceptions.h"
34 :
35 : #include <dds/DdsDcpsCoreC.h>
36 : #include <dds/DdsDcpsGuidTypeSupportImpl.h>
37 : #ifndef DDS_HAS_MINIMUM_BIT
38 : # include <dds/DdsDcpsCoreTypeSupportC.h>
39 : #endif
40 :
41 : #include <tao/ORB_Core.h>
42 :
43 : #include <ace/Reactor.h>
44 :
45 : #include <stdexcept>
46 :
47 : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
48 :
49 : namespace OpenDDS {
50 : namespace DCPS {
51 :
52 0 : RecorderImpl::RecorderImpl()
53 0 : : qos_(TheServiceParticipant->initial_DataReaderQos())
54 0 : , participant_servant_(0)
55 0 : , topic_servant_(0)
56 : #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
57 0 : , is_exclusive_ownership_(false)
58 : #endif
59 : #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
60 0 : , owner_manager_(0)
61 : #endif
62 0 : , subqos_(TheServiceParticipant->initial_SubscriberQos())
63 0 : , topic_desc_(0)
64 0 : , listener_mask_(DEFAULT_STATUS_MASK)
65 0 : , domain_id_(0)
66 0 : , is_bit_(false)
67 0 : , check_encap_(true)
68 0 : , mb_alloc_(DEFAULT_TRANSPORT_RECEIVE_BUFFERS)
69 : {
70 0 : requested_incompatible_qos_status_.total_count = 0;
71 0 : requested_incompatible_qos_status_.total_count_change = 0;
72 0 : requested_incompatible_qos_status_.last_policy_id = 0;
73 0 : requested_incompatible_qos_status_.policies.length(0);
74 :
75 0 : subscription_match_status_.total_count = 0;
76 0 : subscription_match_status_.total_count_change = 0;
77 0 : subscription_match_status_.current_count = 0;
78 0 : subscription_match_status_.current_count_change = 0;
79 0 : subscription_match_status_.last_publication_handle = DDS::HANDLE_NIL;
80 0 : }
81 :
82 : // This method is called when there are no longer any reference to the
83 : // the servant.
84 0 : RecorderImpl::~RecorderImpl()
85 : {
86 : DBG_ENTRY_LVL("RecorderImpl","~RecorderImpl",6);
87 0 : }
88 :
89 :
90 : DDS::ReturnCode_t
91 0 : RecorderImpl::cleanup()
92 : {
93 :
94 0 : Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
95 0 : if (!disco || !disco->remove_subscription(domain_id_,
96 0 : participant_servant_->get_id(),
97 0 : subscription_id_)) {
98 0 : if (log_level >= LogLevel::Notice) {
99 0 : ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: RecorderImpl::cleanup: "
100 : "could not remove subscription from discovery\n"));
101 : }
102 0 : return DDS::RETCODE_ERROR;
103 : }
104 :
105 : // Call remove association before unregistering the datareader from the transport,
106 : // otherwise some callbacks resulted from remove_association may lost.
107 :
108 0 : remove_all_associations();
109 :
110 0 : return DDS::RETCODE_OK;
111 0 : }
112 :
113 0 : void RecorderImpl::init(
114 : TopicDescriptionImpl* a_topic_desc,
115 : const DDS::DataReaderQos & qos,
116 : RecorderListener_rch a_listener,
117 : const DDS::StatusMask & mask,
118 : DomainParticipantImpl* participant,
119 : DDS::SubscriberQos subqos)
120 : {
121 0 : if (DCPS_debug_level >= 2) {
122 0 : ACE_DEBUG((LM_DEBUG, "(%P|%t) RecorderImpl::init\n"));
123 : }
124 :
125 :
126 0 : topic_desc_ = DDS::TopicDescription::_duplicate(a_topic_desc);
127 0 : if (TopicImpl* a_topic = dynamic_cast<TopicImpl*>(a_topic_desc)) {
128 0 : topic_servant_ = a_topic;
129 : }
130 :
131 0 : CORBA::String_var topic_name = a_topic_desc->get_name();
132 0 : qos_ = qos;
133 0 : passed_qos_ = qos;
134 :
135 : #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
136 0 : is_exclusive_ownership_ = qos_.ownership.kind == ::DDS::EXCLUSIVE_OWNERSHIP_QOS;
137 : #endif
138 :
139 0 : listener_ = a_listener;
140 0 : listener_mask_ = mask;
141 :
142 : // Only store the participant pointer, since it is our "grand"
143 : // parent, we will exist as long as it does
144 0 : participant_servant_ = participant;
145 :
146 : #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
147 0 : if (is_exclusive_ownership_) {
148 0 : owner_manager_ = participant_servant_->ownership_manager ();
149 : }
150 : #endif
151 :
152 0 : domain_id_ = participant_servant_->get_domain_id();
153 0 : subqos_ = subqos;
154 0 : }
155 :
156 0 : bool RecorderImpl::check_transport_qos(const TransportInst& ti)
157 : {
158 0 : if (qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS) {
159 0 : return ti.is_reliable();
160 : }
161 0 : return true;
162 : }
163 :
164 0 : GUID_t RecorderImpl::get_guid() const
165 : {
166 0 : return subscription_id_;
167 : }
168 :
169 0 : CORBA::Long RecorderImpl::get_priority_value(const AssociationData& data) const
170 : {
171 0 : return data.publication_transport_priority_;
172 : }
173 :
174 :
175 0 : void RecorderImpl::data_received(const ReceivedDataSample& sample)
176 : {
177 : DBG_ENTRY_LVL("RecorderImpl","data_received",6);
178 :
179 : // Ensure some other thread is not changing the sample container
180 : // or statuses related to samples.
181 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
182 :
183 0 : if (DCPS_debug_level >= 8) {
184 0 : ACE_DEBUG((LM_DEBUG,
185 : "(%P|%t) RecorderImpl::data_received: "
186 : "%C received sample: %C\n",
187 : LogGuid(subscription_id_).c_str(),
188 : to_string(sample.header_).c_str()));
189 : }
190 :
191 : // we only support SAMPLE_DATA messages
192 0 : if (sample.header_.message_id_ == SAMPLE_DATA && listener_.in()) {
193 0 : Message_Block_Ptr payload(sample.data(&mb_alloc_));
194 0 : Encoding::Kind kind = Encoding::KIND_UNALIGNED_CDR;
195 0 : if (sample.header_.cdr_encapsulation_ && check_encap_) {
196 0 : Encoding enc;
197 0 : Serializer ser(payload.get(), enc);
198 0 : EncapsulationHeader encap;
199 0 : if (ser >> encap && encap.to_any_encoding(enc)) {
200 0 : kind = enc.kind();
201 : }
202 0 : }
203 0 : RawDataSample rawSample(sample.header_,
204 0 : static_cast<MessageId> (sample.header_.message_id_),
205 0 : sample.header_.source_timestamp_sec_,
206 0 : sample.header_.source_timestamp_nanosec_,
207 : sample.header_.publication_id_,
208 0 : sample.header_.byte_order_,
209 : payload.get(),
210 0 : kind);
211 0 : listener_->on_sample_data_received(this, rawSample);
212 0 : }
213 0 : }
214 :
215 0 : void RecorderImpl::notify_subscription_disconnected(const WriterIdSeq&)
216 : {
217 0 : }
218 :
219 0 : void RecorderImpl::notify_subscription_reconnected(const WriterIdSeq&)
220 : {
221 0 : }
222 :
223 : void
224 0 : RecorderImpl::notify_subscription_lost(const DDS::InstanceHandleSeq&)
225 : {
226 0 : }
227 :
228 0 : void RecorderImpl::notify_subscription_lost(const WriterIdSeq&)
229 : {
230 0 : }
231 :
232 : #ifndef OPENDDS_SAFETY_PROFILE
233 : void
234 0 : RecorderImpl::add_to_dynamic_type_map(const GUID_t& pub_id, const XTypes::TypeIdentifier& ti)
235 : {
236 0 : XTypes::TypeLookupService_rch tls = participant_servant_->get_type_lookup_service();
237 0 : DDS::DynamicType_var dt = tls->type_identifier_to_dynamic(ti, pub_id);
238 0 : if (DCPS_debug_level >= 4) {
239 0 : ACE_DEBUG((LM_DEBUG,
240 : "(%P|%t) RecorderImpl::add_association: "
241 : "DynamicType added to map with guid: %C\n", LogGuid(pub_id).c_str()));
242 : }
243 0 : dt_map_.insert(std::make_pair(pub_id, dt));
244 0 : }
245 : #endif
246 :
247 : void
248 0 : RecorderImpl::add_association(const GUID_t& yourId,
249 : const WriterAssociation& writer,
250 : bool active)
251 : {
252 0 : if (DCPS_debug_level >= 4) {
253 0 : ACE_DEBUG((LM_DEBUG, "(%P|%t) RecorderImpl::add_association: "
254 : "bit %d local %C remote %C\n",
255 : is_bit_,
256 : LogGuid(yourId).c_str(),
257 : LogGuid(writer.writerId).c_str()));
258 : }
259 :
260 : //
261 : // This block prevents adding associations to deleted readers.
262 : // Presumably this is a "good thing(tm)".
263 : //
264 : // if (entity_deleted_) {
265 : // if (DCPS_debug_level >= 1)
266 : // ACE_DEBUG((LM_DEBUG,
267 : // ACE_TEXT("(%P|%t) RecorderImpl::add_association")
268 : // ACE_TEXT(" This is a deleted datareader, ignoring add.\n")));
269 : //
270 : // return;
271 : // }
272 :
273 : //
274 : // We are being called back from the repository before we are done
275 : // processing after our call to the repository that caused this call
276 : // (from the repository) to be made.
277 : //
278 0 : if (GUID_UNKNOWN == subscription_id_) {
279 : // add_associations was invoked before DCSPInfoRepo::add_subscription() returned.
280 0 : subscription_id_ = yourId;
281 : }
282 :
283 : //
284 : // We do the following while holding the publication_handle_lock_.
285 : //
286 : {
287 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, publication_handle_lock_);
288 :
289 : //
290 : // For each writer in the list of writers to associate with, we
291 : // create a WriterInfo and a WriterStats object and store them in
292 : // our internal maps.
293 : //
294 : {
295 0 : ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, writers_lock_);
296 :
297 0 : const GUID_t& writer_id = writer.writerId;
298 0 : RcHandle<WriterInfo> info ( make_rch<WriterInfo>(rchandle_from<WriterInfoListener>(this), writer_id, writer.writerQos));
299 : /*std::pair<WriterMapType::iterator, bool> bpair =*/
300 0 : writers_.insert(
301 : // This insertion is idempotent.
302 0 : WriterMapType::value_type(
303 : writer_id,
304 : info));
305 : // statistics_.insert(
306 : // StatsMapType::value_type(
307 : // writer_id,
308 : // WriterStats(
309 : // raw_latency_buffer_size_,
310 : // raw_latency_buffer_type_)));
311 :
312 : // if (DCPS_debug_level > 4) {
313 : // GuidConverter converter(writer_id);
314 : // ACE_DEBUG((LM_DEBUG,
315 : // "(%P|%t) RecorderImpl::add_association: "
316 : // "inserted writer %C.return %d\n",
317 : // OPENDDS_STRING(converter).c_str(), bpair.second));
318 : //
319 : // WriterMapType::iterator iter = writers_.find(writer_id);
320 : // if (iter != writers_.end()) {
321 : // // This may not be an error since it could happen that the sample
322 : // // is delivered to the datareader after the write is dis-associated
323 : // // with this datareader.
324 : // GuidConverter reader_converter(subscription_id_);
325 : // GuidConverter writer_converter(writer_id);
326 : // ACE_DEBUG((LM_DEBUG,
327 : // ACE_TEXT("(%P|%t) RecorderImpl::add_association: ")
328 : // ACE_TEXT("reader %C is associated with writer %C.\n"),
329 : // OPENDDS_STRING(reader_converter).c_str(),
330 : // OPENDDS_STRING(writer_converter).c_str()));
331 : // }
332 : // }
333 0 : }
334 :
335 : //
336 : // Propagate the add_associations processing down into the Transport
337 : // layer here. This will establish the transport support and reserve
338 : // usage of an existing connection or initiate creation of a new
339 : // connection if no suitable connection is available.
340 : //
341 0 : AssociationData data;
342 0 : data.remote_id_ = writer.writerId;
343 0 : data.remote_data_ = writer.writerTransInfo;
344 0 : data.discovery_locator_ = writer.writerDiscInfo;
345 0 : data.remote_transport_context_ = writer.transportContext;
346 0 : data.publication_transport_priority_ =
347 0 : writer.writerQos.transport_priority.value;
348 0 : data.remote_reliable_ =
349 0 : (writer.writerQos.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS);
350 0 : data.remote_durable_ =
351 0 : (writer.writerQos.durability.kind > DDS::VOLATILE_DURABILITY_QOS);
352 :
353 0 : if (!associate(data, active)) {
354 0 : if (log_level >= LogLevel::Warning) {
355 0 : ACE_ERROR((LM_WARNING, "(%P|%t) WARNING: RecorderImpl::add_association: "
356 : "transport layer failed to associate\n"));
357 : }
358 0 : return;
359 : }
360 :
361 : // Check if any publications have already sent a REQUEST_ACK message.
362 : // {
363 : // ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, writers_lock_);
364 : //
365 : // WriterMapType::iterator where = writers_.find(writer.writerId);
366 : //
367 : // if (where != writers_.end()) {
368 : // const MonotonicTimePoint now = MonotonicTimePoint::now();
369 : //
370 : // ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
371 : //
372 : // if (where->second->should_ack(now)) {
373 : // const SequenceNumber sequence = where->second->ack_sequence();
374 : // if (send_sample_ack(writer.writerId, sequence, now.to_dds_time())) {
375 : // where->second->clear_acks(sequence);
376 : // }
377 : // }
378 : // }
379 : // }
380 :
381 : //
382 : // LIVELINESS policy timers are managed here.
383 : //
384 : // if (liveliness_lease_duration_ != TimeDuration::zero) {
385 : // // this call will start the timer if it is not already set
386 : // const MonotonicTimePoint now = MonotonicTimePoint::now();
387 : //
388 : // if (DCPS_debug_level >= 5) {
389 : // GuidConverter converter(subscription_id_);
390 : // ACE_DEBUG((LM_DEBUG,
391 : // ACE_TEXT("(%P|%t) RecorderImpl::add_association: ")
392 : // ACE_TEXT("starting/resetting liveliness timer for reader %C\n"),
393 : // OPENDDS_STRING(converter).c_str()));
394 : // }
395 : //
396 : // handle_timeout(now, this);
397 : // }
398 :
399 : // else - no timer needed when LIVELINESS.lease_duration is INFINITE
400 :
401 0 : }
402 : //
403 : // We no longer hold the publication_handle_lock_.
404 : //
405 :
406 : //
407 : // We only do the following processing for readers that are *not*
408 : // readers of Builtin Topics.
409 : //
410 0 : if (!is_bit_) {
411 :
412 0 : const DDS::InstanceHandle_t handle = participant_servant_->assign_handle(writer.writerId);
413 :
414 : //
415 : // We acquire the publication_handle_lock_ for the remainder of our
416 : // processing.
417 : //
418 : {
419 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, publication_handle_lock_);
420 :
421 : // This insertion is idempotent.
422 0 : id_to_handle_map_.insert(
423 0 : RepoIdToHandleMap::value_type(writer.writerId, handle));
424 :
425 0 : if (DCPS_debug_level > 4) {
426 0 : ACE_DEBUG((LM_DEBUG,
427 : ACE_TEXT("(%P|%t) RecorderImpl::add_association: ")
428 : ACE_TEXT("id_to_handle_map_[ %C] = 0x%x.\n"),
429 : LogGuid(writer.writerId).c_str(),
430 : handle));
431 : }
432 :
433 : // We need to adjust these after the insertions have all completed
434 : // since insertions are not guaranteed to increase the number of
435 : // currently matched publications.
436 0 : int matchedPublications = static_cast<int>(id_to_handle_map_.size());
437 : subscription_match_status_.current_count_change
438 0 : = matchedPublications - subscription_match_status_.current_count;
439 0 : subscription_match_status_.current_count = matchedPublications;
440 :
441 0 : ++subscription_match_status_.total_count;
442 0 : ++subscription_match_status_.total_count_change;
443 :
444 0 : subscription_match_status_.last_publication_handle = handle;
445 :
446 : // set_status_changed_flag(DDS::SUBSCRIPTION_MATCHED_STATUS, true);
447 :
448 :
449 0 : if (listener_.in()) {
450 0 : listener_->on_recorder_matched(
451 : this,
452 0 : subscription_match_status_);
453 :
454 : // TBD - why does the spec say to change this but not change
455 : // the ChangeFlagStatus after a listener call?
456 :
457 : // Client will look at it so next time it looks the change should be 0
458 0 : subscription_match_status_.total_count_change = 0;
459 0 : subscription_match_status_.current_count_change = 0;
460 : }
461 :
462 : // notify_status_condition();
463 0 : }
464 :
465 : {
466 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
467 0 : ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, writers_lock_);
468 :
469 0 : if (!writers_.count(writer.writerId)) {
470 0 : return;
471 : }
472 :
473 0 : writers_[writer.writerId]->handle(handle);
474 0 : }
475 : }
476 :
477 : // if (monitor_) {
478 : // monitor_->report();
479 : // }
480 : }
481 :
482 : void
483 0 : RecorderImpl::remove_associations(const WriterIdSeq& writers,
484 : bool notify_lost)
485 : {
486 : DBG_ENTRY_LVL("RecorderImpl", "remove_associations", 6);
487 0 : if (writers.length() == 0) {
488 0 : return;
489 : }
490 :
491 0 : if (DCPS_debug_level >= 4) {
492 0 : ACE_DEBUG((LM_DEBUG,
493 : ACE_TEXT("(%P|%t) RecorderImpl::remove_associations: ")
494 : ACE_TEXT("bit %d local %C remote %C num remotes %d\n"),
495 : is_bit_,
496 : LogGuid(subscription_id_).c_str(),
497 : LogGuid(writers[0]).c_str(),
498 : writers.length()));
499 : }
500 0 : if (!get_deleted()) {
501 : // stop pending associations for these writer ids
502 0 : stop_associating(writers.get_buffer(), writers.length());
503 : }
504 :
505 0 : remove_associations_i(writers, notify_lost);
506 : }
507 :
508 : void
509 0 : RecorderImpl::remove_associations_i(const WriterIdSeq& writers,
510 : bool notify_lost)
511 : {
512 : DBG_ENTRY_LVL("RecorderImpl", "remove_associations_i", 6);
513 :
514 0 : if (writers.length() == 0) {
515 0 : return;
516 : }
517 :
518 0 : if (DCPS_debug_level >= 4) {
519 0 : ACE_DEBUG((LM_DEBUG,
520 : ACE_TEXT("(%P|%t) RecorderImpl::remove_associations_i: ")
521 : ACE_TEXT("bit %d local %C remote %C num remotes %d\n"),
522 : is_bit_,
523 : LogGuid(subscription_id_).c_str(),
524 : LogGuid(writers[0]).c_str(),
525 : writers.length()));
526 : }
527 0 : DDS::InstanceHandleSeq handles;
528 :
529 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, publication_handle_lock_);
530 :
531 : // This is used to hold the list of writers which were actually
532 : // removed, which is a proper subset of the writers which were
533 : // requested to be removed.
534 0 : WriterIdSeq updated_writers;
535 :
536 : CORBA::ULong wr_len;
537 :
538 : //Remove the writers from writer list. If the supplied writer
539 : //is not in the cached writers list then it is already removed.
540 : //We just need remove the writers in the list that have not been
541 : //removed.
542 : {
543 0 : ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, writers_lock_);
544 :
545 0 : wr_len = writers.length();
546 :
547 0 : for (CORBA::ULong i = 0; i < wr_len; i++) {
548 0 : GUID_t writer_id = writers[i];
549 :
550 : #ifndef OPENDDS_SAFETY_PROFILE
551 0 : if (dt_map_.erase(writer_id) == 0) {
552 0 : if (DCPS_debug_level >= 4) {
553 0 : ACE_DEBUG((LM_DEBUG, "(%P|%t) RecorderImpl::remove_associations_i: -"
554 : "failed to find writer_id in the DynamicTypeByPubId map.\n"));
555 : }
556 : }
557 : #endif
558 :
559 0 : WriterMapType::iterator it = writers_.find(writer_id);
560 0 : if (it != writers_.end()) {
561 0 : it->second->removed();
562 : }
563 :
564 0 : if (writers_.erase(writer_id) == 0) {
565 0 : if (DCPS_debug_level >= 4) {
566 0 : ACE_DEBUG((LM_DEBUG,
567 : ACE_TEXT("(%P|%t) RecorderImpl::remove_associations_i: ")
568 : ACE_TEXT("the writer local %C was already removed.\n"),
569 : LogGuid(writer_id).c_str()));
570 : }
571 :
572 : } else {
573 0 : push_back(updated_writers, writer_id);
574 : }
575 : }
576 0 : }
577 :
578 0 : wr_len = updated_writers.length();
579 :
580 : // Return now if the supplied writers have been removed already.
581 0 : if (wr_len == 0) {
582 0 : return;
583 : }
584 :
585 0 : if (!is_bit_) {
586 : // The writer should be in the id_to_handle map at this time. Note
587 : // it if it not there.
588 0 : lookup_instance_handles(updated_writers, handles);
589 :
590 0 : for (CORBA::ULong i = 0; i < wr_len; ++i) {
591 0 : id_to_handle_map_.erase(updated_writers[i]);
592 : }
593 : }
594 0 : for (CORBA::ULong i = 0; i < updated_writers.length(); ++i) {
595 0 : disassociate(updated_writers[i]);
596 : }
597 :
598 : // Mirror the add_associations SUBSCRIPTION_MATCHED_STATUS processing.
599 0 : if (!is_bit_) {
600 : // Derive the change in the number of publications writing to this reader.
601 0 : int matchedPublications = static_cast<int>(id_to_handle_map_.size());
602 : subscription_match_status_.current_count_change
603 0 : = matchedPublications - subscription_match_status_.current_count;
604 :
605 : // Only process status if the number of publications has changed.
606 0 : if (subscription_match_status_.current_count_change != 0) {
607 0 : subscription_match_status_.current_count = matchedPublications;
608 : /// Section 7.1.4.1: total_count will not decrement.
609 :
610 : /// @TODO: Reconcile this with the verbiage in section 7.1.4.1
611 : subscription_match_status_.last_publication_handle
612 0 : = handles[ wr_len - 1];
613 :
614 : // set_status_changed_flag(DDS::SUBSCRIPTION_MATCHED_STATUS, true);
615 :
616 : // DDS::DataReaderListener_var listener
617 : // = listener_for(DDS::SUBSCRIPTION_MATCHED_STATUS);
618 :
619 0 : if (listener_.in()) {
620 0 : listener_->on_recorder_matched(
621 : this,
622 0 : subscription_match_status_);
623 :
624 : // Client will look at it so next time it looks the change should be 0
625 0 : subscription_match_status_.total_count_change = 0;
626 0 : subscription_match_status_.current_count_change = 0;
627 : }
628 :
629 : // notify_status_condition();
630 : }
631 : }
632 :
633 : // If this remove_association is invoked when the InfoRepo
634 : // detects a lost writer then make a callback to notify
635 : // subscription lost.
636 0 : if (notify_lost) {
637 0 : notify_subscription_lost(handles);
638 : }
639 :
640 : // if (monitor_) {
641 : // monitor_->report();
642 : // }
643 :
644 0 : for (unsigned int i = 0; i < handles.length(); ++i) {
645 0 : participant_servant_->return_handle(handles[i]);
646 : }
647 0 : }
648 :
649 : void
650 0 : RecorderImpl::remove_all_associations()
651 : {
652 : DBG_ENTRY_LVL("RecorderImpl","remove_all_associations",6);
653 :
654 0 : OpenDDS::DCPS::WriterIdSeq writers;
655 : int size;
656 :
657 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, publication_handle_lock_);
658 :
659 : {
660 0 : ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, writers_lock_);
661 :
662 0 : size = static_cast<int>(writers_.size());
663 0 : writers.length(size);
664 :
665 0 : WriterMapType::iterator curr_writer = writers_.begin();
666 0 : WriterMapType::iterator end_writer = writers_.end();
667 :
668 0 : int i = 0;
669 :
670 0 : while (curr_writer != end_writer) {
671 0 : writers[i++] = curr_writer->first;
672 0 : ++curr_writer;
673 : }
674 0 : }
675 :
676 : try {
677 0 : CORBA::Boolean dont_notify_lost = false;
678 :
679 0 : if (0 < size) {
680 0 : remove_associations(writers, dont_notify_lost);
681 : }
682 :
683 0 : } catch (const CORBA::Exception&) {
684 0 : }
685 :
686 0 : transport_stop();
687 0 : }
688 :
689 : void
690 0 : RecorderImpl::update_incompatible_qos(const IncompatibleQosStatus& status)
691 : {
692 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex,
693 : guard,
694 : publication_handle_lock_);
695 :
696 0 : if (requested_incompatible_qos_status_.total_count == status.total_count) {
697 : // This test should make the method idempotent.
698 0 : return;
699 : }
700 :
701 : // set_status_changed_flag(DDS::REQUESTED_INCOMPATIBLE_QOS_STATUS,
702 : // true);
703 :
704 : // copy status and increment change
705 0 : requested_incompatible_qos_status_.total_count = status.total_count;
706 0 : requested_incompatible_qos_status_.total_count_change +=
707 0 : status.count_since_last_send;
708 0 : requested_incompatible_qos_status_.last_policy_id =
709 0 : status.last_policy_id;
710 0 : requested_incompatible_qos_status_.policies = status.policies;
711 :
712 : // if (!CORBA::is_nil(listener.in())) {
713 : // listener->on_requested_incompatible_qos(this,
714 : // requested_incompatible_qos_status_);
715 : //
716 : // // TBD - why does the spec say to change total_count_change but not
717 : // // change the ChangeFlagStatus after a listener call?
718 : //
719 : // // client just looked at it so next time it looks the
720 : // // change should be 0
721 : // requested_incompatible_qos_status_.total_count_change = 0;
722 : // }
723 : //
724 : // notify_status_condition();
725 0 : }
726 :
727 : void
728 0 : RecorderImpl::signal_liveliness(const GUID_t& remote_participant)
729 : {
730 0 : GUID_t prefix = remote_participant;
731 0 : prefix.entityId = EntityId_t();
732 :
733 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
734 :
735 : typedef std::pair<GUID_t, RcHandle<WriterInfo> > WriterSetElement;
736 : typedef OPENDDS_VECTOR(WriterSetElement) WriterSet;
737 0 : WriterSet writers;
738 :
739 : {
740 0 : ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, writers_lock_);
741 0 : for (WriterMapType::iterator pos = writers_.lower_bound(prefix),
742 0 : limit = writers_.end();
743 0 : pos != limit && equal_guid_prefixes(pos->first, prefix);
744 0 : ++pos) {
745 0 : writers.push_back(std::make_pair(pos->first, pos->second));
746 : }
747 0 : }
748 :
749 0 : const MonotonicTimePoint now = MonotonicTimePoint::now();
750 0 : for (WriterSet::iterator pos = writers.begin(), limit = writers.end();
751 0 : pos != limit;
752 0 : ++pos) {
753 0 : pos->second->received_activity(now);
754 : }
755 0 : }
756 :
757 0 : DDS::ReturnCode_t RecorderImpl::set_qos(
758 : const DDS::SubscriberQos & subscriber_qos,
759 : const DDS::DataReaderQos & qos)
760 : {
761 : OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK(subscriber_qos, DDS::RETCODE_UNSUPPORTED);
762 :
763 0 : if (Qos_Helper::valid(subscriber_qos) && Qos_Helper::consistent(subscriber_qos)) {
764 0 : if (subqos_ != subscriber_qos) {
765 : // for the not changeable qos, it can be changed before enable
766 0 : if (!Qos_Helper::changeable(subqos_, subscriber_qos) && enabled_) {
767 0 : return DDS::RETCODE_IMMUTABLE_POLICY;
768 :
769 : } else {
770 0 : subqos_ = subscriber_qos;
771 : }
772 : }
773 : } else {
774 0 : return DDS::RETCODE_INCONSISTENT_POLICY;
775 : }
776 :
777 : OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
778 : OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
779 : OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
780 :
781 0 : if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
782 0 : if (qos_ == qos)
783 0 : return DDS::RETCODE_OK;
784 :
785 0 : if (!Qos_Helper::changeable(qos_, qos) && is_enabled()) {
786 0 : return DDS::RETCODE_IMMUTABLE_POLICY;
787 :
788 : } else {
789 0 : Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
790 : const bool status =
791 0 : disco->update_subscription_qos(
792 0 : participant_servant_->get_domain_id(),
793 0 : participant_servant_->get_id(),
794 0 : subscription_id_,
795 : qos,
796 : subscriber_qos);
797 0 : if (!status) {
798 0 : if (log_level >= LogLevel::Notice) {
799 0 : ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: RecorderImpl::set_qos: qos not updated\n"));
800 : }
801 0 : return DDS::RETCODE_ERROR;
802 : }
803 0 : }
804 :
805 0 : qos_ = qos;
806 0 : subqos_ = subscriber_qos;
807 :
808 0 : return DDS::RETCODE_OK;
809 :
810 : } else {
811 0 : return DDS::RETCODE_INCONSISTENT_POLICY;
812 : }
813 : }
814 :
815 : DDS::ReturnCode_t
816 0 : RecorderImpl::get_qos(
817 : DDS::SubscriberQos & subscriber_qos,
818 : DDS::DataReaderQos & qos)
819 : {
820 0 : qos = passed_qos_;
821 0 : subscriber_qos = subqos_;
822 0 : return DDS::RETCODE_OK;
823 : }
824 :
825 : DDS::ReturnCode_t
826 0 : RecorderImpl::set_listener(const RecorderListener_rch& a_listener,
827 : DDS::StatusMask mask)
828 : {
829 0 : listener_mask_ = mask;
830 : //note: OK to duplicate a nil object ref
831 0 : listener_ = a_listener;
832 0 : return DDS::RETCODE_OK;
833 : }
834 :
835 : RecorderListener_rch
836 0 : RecorderImpl::get_listener()
837 : {
838 0 : return listener_;
839 : }
840 :
841 : void
842 0 : RecorderImpl::lookup_instance_handles(const WriterIdSeq& ids,
843 : DDS::InstanceHandleSeq & hdls)
844 : {
845 0 : CORBA::ULong const num_wrts = ids.length();
846 :
847 0 : if (DCPS_debug_level > 9) {
848 0 : OPENDDS_STRING separator = "";
849 0 : OPENDDS_STRING buffer;
850 :
851 0 : for (CORBA::ULong i = 0; i < num_wrts; ++i) {
852 0 : buffer += separator + LogGuid(ids[i]).conv_;
853 0 : separator = ", ";
854 : }
855 :
856 0 : ACE_DEBUG((LM_DEBUG,
857 : ACE_TEXT("(%P|%t) RecorderImpl::lookup_instance_handles: ")
858 : ACE_TEXT("searching for handles for writer Ids: %C.\n"),
859 : buffer.c_str()));
860 0 : }
861 :
862 0 : hdls.length(num_wrts);
863 :
864 0 : for (CORBA::ULong i = 0; i < num_wrts; ++i) {
865 0 : hdls[i] = participant_servant_->lookup_handle(ids[i]);
866 : }
867 0 : }
868 :
869 : DDS::ReturnCode_t
870 0 : RecorderImpl::enable()
871 : {
872 0 : if (DCPS_debug_level >= 2) {
873 0 : ACE_DEBUG((LM_DEBUG,
874 : ACE_TEXT("(%P|%t) RecorderImpl::enable\n")));
875 : }
876 : //According spec:
877 : // - Calling enable on an already enabled Entity returns OK and has no
878 : // effect.
879 : // - Calling enable on an Entity whose factory is not enabled will fail
880 : // and return PRECONDITION_NOT_MET.
881 :
882 0 : if (is_enabled()) {
883 0 : return DDS::RETCODE_OK;
884 : }
885 :
886 0 : set_enabled();
887 :
888 : // if (topic_servant_ && !transport_disabled_) {
889 0 : if (topic_servant_) {
890 0 : if (DCPS_debug_level >= 2) {
891 0 : ACE_DEBUG((LM_DEBUG, "(%P|%t) RecorderImpl::enable: enable_transport\n"));
892 : }
893 :
894 : try {
895 0 : enable_transport(qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS,
896 0 : qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS);
897 0 : } catch (const Transport::Exception&) {
898 0 : if (log_level >= LogLevel::Warning) {
899 0 : ACE_ERROR((LM_WARNING, "(%P|%t) WARNING: RecorderImpl::enable: Transport Exception\n"));
900 : }
901 0 : return DDS::RETCODE_ERROR;
902 0 : }
903 :
904 0 : const TransportLocatorSeq& trans_conf_info = connection_info();
905 :
906 0 : CORBA::String_var filterClassName = "";
907 0 : CORBA::String_var filterExpression = "";
908 0 : DDS::StringSeq exprParams;
909 :
910 : Discovery_rch disco =
911 0 : TheServiceParticipant->get_discovery(domain_id_);
912 :
913 0 : DCPS::set_reader_effective_data_rep_qos(qos_.representation.value);
914 0 : if (!topic_servant_->check_data_representation(qos_.representation.value, false)) {
915 0 : return DDS::RETCODE_ERROR;
916 : }
917 :
918 0 : if (DCPS_debug_level >= 2) {
919 0 : ACE_DEBUG((LM_DEBUG, "(%P|%t) RecorderImpl::enable: add_subscription\n"));
920 : }
921 :
922 0 : XTypes::TypeInformation type_info;
923 :
924 : subscription_id_ =
925 0 : disco->add_subscription(domain_id_,
926 0 : participant_servant_->get_id(),
927 0 : topic_servant_->get_id(),
928 0 : rchandle_from(this),
929 0 : qos_,
930 : trans_conf_info,
931 0 : subqos_,
932 : filterClassName,
933 : filterExpression,
934 : exprParams,
935 : type_info);
936 :
937 0 : if (subscription_id_ == OpenDDS::DCPS::GUID_UNKNOWN) {
938 0 : if (log_level >= LogLevel::Warning) {
939 0 : ACE_ERROR((LM_WARNING, "(%P|%t) WARNING: RecorderImpl::enable: "
940 : "add_subscription returned invalid id\n"));
941 : }
942 0 : return DDS::RETCODE_ERROR;
943 : }
944 0 : }
945 :
946 0 : return DDS::RETCODE_OK;
947 : }
948 :
949 : DDS::InstanceHandle_t
950 0 : RecorderImpl::get_instance_handle()
951 : {
952 0 : return get_entity_instance_handle(subscription_id_, rchandle_from(participant_servant_));
953 : }
954 :
955 : void
956 0 : RecorderImpl::register_for_writer(const GUID_t& participant,
957 : const GUID_t& readerid,
958 : const GUID_t& writerid,
959 : const TransportLocatorSeq& locators,
960 : DiscoveryListener* listener)
961 : {
962 0 : TransportClient::register_for_writer(participant, readerid, writerid, locators, listener);
963 0 : }
964 :
965 : void
966 0 : RecorderImpl::unregister_for_writer(const GUID_t& participant,
967 : const GUID_t& readerid,
968 : const GUID_t& writerid)
969 : {
970 0 : TransportClient::unregister_for_writer(participant, readerid, writerid);
971 0 : }
972 :
973 : #if !defined (DDS_HAS_MINIMUM_BIT)
974 : DDS::ReturnCode_t
975 0 : RecorderImpl::repoid_to_bit_key(const GUID_t& id,
976 : DDS::BuiltinTopicKey_t& key)
977 : {
978 0 : const DDS::InstanceHandle_t publication_handle = participant_servant_->lookup_handle(id);
979 :
980 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
981 : guard,
982 : publication_handle_lock_,
983 : DDS::RETCODE_ERROR);
984 :
985 0 : DDS::PublicationBuiltinTopicDataSeq data;
986 :
987 0 : DDS::ReturnCode_t const ret = instance_handle_to_bit_data<DDS::PublicationBuiltinTopicDataDataReader_var>(
988 : participant_servant_,
989 : BUILT_IN_PUBLICATION_TOPIC,
990 : publication_handle,
991 : data);
992 :
993 0 : if (ret == DDS::RETCODE_OK) {
994 0 : key = data[0].key;
995 : }
996 :
997 0 : return ret;
998 0 : }
999 : #endif // !defined (DDS_HAS_MINIMUM_BIT)
1000 :
1001 : #ifndef OPENDDS_SAFETY_PROFILE
1002 0 : DDS::DynamicData_ptr RecorderImpl::get_dynamic_data(const RawDataSample& sample)
1003 : {
1004 0 : const Encoding enc(sample.encoding_kind_, sample.header_.byte_order_ ? ENDIAN_LITTLE : ENDIAN_BIG);
1005 0 : const DynamicTypeByPubId::const_iterator dt_found = dt_map_.find(sample.publication_id_);
1006 0 : if (dt_found == dt_map_.end()) {
1007 0 : if (log_level >= LogLevel::Error) {
1008 0 : ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: RecorderImpl::get_dynamic_data: "
1009 : "failed to find GUID: %C in DynamicTypeByPubId.\n", LogGuid(sample.publication_id_).c_str()));
1010 : }
1011 0 : return 0;
1012 : }
1013 :
1014 0 : DDS::DynamicType_var dt = dt_found->second;
1015 0 : XTypes::DynamicDataXcdrReadImpl* dd = new XTypes::DynamicDataXcdrReadImpl(sample.sample_.get(), enc, dt);
1016 0 : DDS::DynamicData_var dd_var = dd;
1017 0 : if (!dd->check_xcdr1_mutable(dt)) {
1018 0 : if (log_level >= LogLevel::Notice) {
1019 0 : ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: RecorderImpl::get_dynamic_data: "
1020 : "Encountered unsupported combination of XCDR1 encoding and mutable extensibility.\n"));
1021 : }
1022 0 : return 0;
1023 : }
1024 0 : return dd_var._retn();
1025 0 : }
1026 : #endif
1027 :
1028 : } // namespace DCPS
1029 : } // namespace
1030 :
1031 : OPENDDS_END_VERSIONED_NAMESPACE_DECL
|