Line data Source code
1 : /*
2 : * Distributed under the OpenDDS License.
3 : * See: http://www.opendds.org/license.html
4 : */
5 :
6 : #include <DCPS/DdsDcps_pch.h> // Only the _pch include should start with DCPS/
7 :
8 : #include "DataReaderImpl.h"
9 :
10 : #include "SubscriptionInstance.h"
11 : #include "ReceivedDataElementList.h"
12 : #include "DomainParticipantImpl.h"
13 : #include "Service_Participant.h"
14 : #include "Qos_Helper.h"
15 : #include "FeatureDisabledQosCheck.h"
16 : #include "GuidConverter.h"
17 : #include "TopicImpl.h"
18 : #include "Serializer.h"
19 : #include "SubscriberImpl.h"
20 : #include "Transient_Kludge.h"
21 : #include "Util.h"
22 : #include "DCPS_Utils.h"
23 : #include "QueryConditionImpl.h"
24 : #include "ReadConditionImpl.h"
25 : #include "MonitorFactory.h"
26 : #include "transport/framework/EntryExit.h"
27 : #include "transport/framework/TransportExceptions.h"
28 : #include "SafetyProfileStreams.h"
29 : #include "TypeSupportImpl.h"
30 : #include "XTypes/TypeObject.h"
31 : #ifndef DDS_HAS_MINIMUM_BIT
32 : # include "BuiltInTopicUtils.h"
33 : #endif
34 :
35 : #ifndef DDS_HAS_MINIMUM_BIT
36 : # include <dds/DdsDcpsCoreTypeSupportC.h>
37 : #endif
38 : #include <dds/DdsDcpsCoreC.h>
39 : #include <dds/DdsDcpsGuidTypeSupportImpl.h>
40 :
41 : #include <ace/Reactor.h>
42 : #include <ace/Auto_Ptr.h>
43 : #include <ace/OS_NS_sys_time.h>
44 :
45 : #include <cstdio>
46 : #include <stdexcept>
47 : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
48 : # include <sstream>
49 : #endif
50 :
51 : #ifndef __ACE_INLINE__
52 : # include "DataReaderImpl.inl"
53 : #endif
54 :
55 : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
56 :
57 : namespace OpenDDS {
58 : namespace DCPS {
59 :
60 0 : DataReaderImpl::DataReaderImpl()
61 0 : : has_subscription_id_(false)
62 0 : , subscription_id_mutex_()
63 0 : , subscription_id_condition_(subscription_id_mutex_)
64 0 : , qos_(TheServiceParticipant->initial_DataReaderQos())
65 0 : , reverse_sample_lock_(sample_lock_)
66 0 : , topic_servant_(0)
67 0 : , type_support_(0)
68 0 : , topic_id_(GUID_UNKNOWN)
69 : #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
70 0 : , is_exclusive_ownership_(false)
71 : #endif
72 0 : , coherent_(false)
73 0 : , subqos_(TheServiceParticipant->initial_SubscriberQos())
74 0 : , topic_desc_(0)
75 0 : , listener_mask_(DEFAULT_STATUS_MASK)
76 0 : , domain_id_(0)
77 0 : , end_historic_sweeper_(make_rch<EndHistoricSamplesMissedSweeper>(TheServiceParticipant->reactor(), TheServiceParticipant->reactor_owner(), this))
78 0 : , n_chunks_(TheServiceParticipant->n_chunks())
79 0 : , reactor_(0)
80 0 : , liveliness_timer_(make_rch<LivelinessTimer>(TheServiceParticipant->reactor(), TheServiceParticipant->reactor_owner(), this))
81 0 : , last_deadline_missed_total_count_(0)
82 0 : , deadline_queue_enabled_(false)
83 0 : , deadline_task_(make_rch<DRISporadicTask>(TheServiceParticipant->time_source(), TheServiceParticipant->interceptor(), rchandle_from(this), &DataReaderImpl::deadline_task))
84 0 : , is_bit_(false)
85 0 : , always_get_history_(false)
86 0 : , statistics_enabled_(false)
87 0 : , raw_latency_buffer_size_(0)
88 0 : , raw_latency_buffer_type_(DataCollector<double>::KeepOldest)
89 0 : , transport_disabled_(false)
90 0 : , mb_alloc_(DEFAULT_TRANSPORT_RECEIVE_BUFFERS)
91 : {
92 0 : reactor_ = TheServiceParticipant->timer();
93 :
94 0 : liveliness_changed_status_.alive_count = 0;
95 0 : liveliness_changed_status_.not_alive_count = 0;
96 0 : liveliness_changed_status_.alive_count_change = 0;
97 0 : liveliness_changed_status_.not_alive_count_change = 0;
98 0 : liveliness_changed_status_.last_publication_handle =
99 : DDS::HANDLE_NIL;
100 :
101 0 : requested_deadline_missed_status_.total_count = 0;
102 0 : requested_deadline_missed_status_.total_count_change = 0;
103 0 : requested_deadline_missed_status_.last_instance_handle =
104 : DDS::HANDLE_NIL;
105 :
106 0 : requested_incompatible_qos_status_.total_count = 0;
107 0 : requested_incompatible_qos_status_.total_count_change = 0;
108 0 : requested_incompatible_qos_status_.last_policy_id = 0;
109 0 : requested_incompatible_qos_status_.policies.length(0);
110 :
111 0 : subscription_match_status_.total_count = 0;
112 0 : subscription_match_status_.total_count_change = 0;
113 0 : subscription_match_status_.current_count = 0;
114 0 : subscription_match_status_.current_count_change = 0;
115 0 : subscription_match_status_.last_publication_handle =
116 : DDS::HANDLE_NIL;
117 :
118 0 : sample_lost_status_.total_count = 0;
119 0 : sample_lost_status_.total_count_change = 0;
120 :
121 0 : sample_rejected_status_.total_count = 0;
122 0 : sample_rejected_status_.total_count_change = 0;
123 0 : sample_rejected_status_.last_reason = DDS::NOT_REJECTED;
124 0 : sample_rejected_status_.last_instance_handle = DDS::HANDLE_NIL;
125 :
126 0 : this->budget_exceeded_status_.total_count = 0;
127 0 : this->budget_exceeded_status_.total_count_change = 0;
128 0 : this->budget_exceeded_status_.last_instance_handle = DDS::HANDLE_NIL;
129 :
130 0 : monitor_.reset(TheServiceParticipant->monitor_factory_->create_data_reader_monitor(this));
131 0 : periodic_monitor_.reset(TheServiceParticipant->monitor_factory_->create_data_reader_periodic_monitor(this));
132 0 : }
133 :
134 : // This method is called when there are no longer any reference to the
135 : // the servant.
136 0 : DataReaderImpl::~DataReaderImpl()
137 : {
138 : DBG_ENTRY_LVL("DataReaderImpl", "~DataReaderImpl", 6);
139 :
140 0 : deadline_task_->cancel();
141 :
142 : #ifndef OPENDDS_SAFETY_PROFILE
143 0 : RcHandle<DomainParticipantImpl> participant = participant_servant_.lock();
144 0 : if (participant) {
145 0 : XTypes::TypeLookupService_rch type_lookup_service = participant->get_type_lookup_service();
146 0 : if (type_lookup_service) {
147 0 : type_lookup_service->remove_guid_from_dynamic_map(subscription_id_);
148 : }
149 0 : }
150 : #endif
151 0 : }
152 :
153 : // this method is called when delete_datareader is called.
154 : void
155 0 : DataReaderImpl::cleanup()
156 : {
157 : // As first step set our listener to nill which will prevent us from calling
158 : // back onto the listener at the moment the related DDS entity has been
159 : // deleted
160 0 : set_listener(0, NO_STATUS_MASK);
161 :
162 : #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
163 0 : OwnershipManagerPtr owner_manager = this->ownership_manager();
164 0 : if (owner_manager) {
165 0 : owner_manager->unregister_reader(topic_servant_->type_name(), this);
166 : }
167 : #endif
168 :
169 0 : topic_servant_ = 0;
170 :
171 : #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
172 : {
173 0 : ACE_Guard<ACE_Thread_Mutex> guard(content_filtered_topic_mutex_);
174 0 : content_filtered_topic_ = 0;
175 0 : }
176 : #endif
177 :
178 : #ifndef OPENDDS_NO_MULTI_TOPIC
179 0 : multi_topic_ = 0;
180 : #endif
181 :
182 0 : }
183 :
184 0 : void DataReaderImpl::init(
185 : TopicDescriptionImpl* topic_desc,
186 : const DDS::DataReaderQos &qos,
187 : DDS::DataReaderListener_ptr listener,
188 : const DDS::StatusMask & mask,
189 : DomainParticipantImpl* participant,
190 : SubscriberImpl* subscriber)
191 : {
192 0 : topic_desc_ = DDS::TopicDescription::_duplicate(topic_desc);
193 0 : if (TopicImpl* topic = dynamic_cast<TopicImpl*>(topic_desc)) {
194 0 : topic_servant_ = topic;
195 0 : type_support_ = dynamic_cast<TypeSupportImpl*>(topic->get_type_support());
196 0 : topic_id_ = topic->get_id();
197 : }
198 :
199 : #ifndef DDS_HAS_MINIMUM_BIT
200 0 : CORBA::String_var topic_name = topic_desc->get_name();
201 0 : CORBA::String_var topic_type_name = topic_desc->get_type_name();
202 0 : is_bit_ = topicIsBIT(topic_name, topic_type_name);
203 : #endif // !defined (DDS_HAS_MINIMUM_BIT)
204 :
205 0 : qos_ = qos;
206 0 : passed_qos_ = qos;
207 :
208 : #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
209 0 : is_exclusive_ownership_ = this->qos_.ownership.kind == ::DDS::EXCLUSIVE_OWNERSHIP_QOS;
210 : #endif
211 :
212 0 : set_listener(listener, mask);
213 :
214 : // Only store the participant pointer, since it is our "grand"
215 : // parent, we will exist as long as it does
216 0 : participant_servant_ = *participant;
217 :
218 0 : domain_id_ = participant->get_domain_id();
219 :
220 0 : subscriber_servant_ = rchandle_from(subscriber);
221 :
222 0 : if (subscriber->get_qos(this->subqos_) != ::DDS::RETCODE_OK) {
223 0 : ACE_DEBUG((LM_WARNING,
224 : ACE_TEXT("(%P|%t) WARNING: DataReaderImpl::init() - ")
225 : ACE_TEXT("failed to get SubscriberQos\n")));
226 : }
227 0 : }
228 :
229 : DDS::InstanceHandle_t
230 0 : DataReaderImpl::get_instance_handle()
231 : {
232 0 : const RcHandle<DomainParticipantImpl> participant = participant_servant_.lock();
233 0 : return get_entity_instance_handle(subscription_id_, participant);
234 0 : }
235 :
236 : void
237 0 : DataReaderImpl::add_association(const GUID_t& yourId,
238 : const WriterAssociation& writer,
239 : bool active)
240 : {
241 0 : if (DCPS_debug_level) {
242 0 : ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DataReaderImpl::add_association - ")
243 : ACE_TEXT("bit %d local %C remote %C\n"), is_bit_,
244 : LogGuid(yourId).c_str(),
245 : LogGuid(writer.writerId).c_str()));
246 : }
247 :
248 0 : if (get_deleted()) {
249 0 : if (DCPS_debug_level) {
250 0 : ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DataReaderImpl::add_association")
251 : ACE_TEXT(" This is a deleted datareader, ignoring add.\n")));
252 : }
253 0 : return;
254 : }
255 :
256 : // We are being called back from the repository before we are done
257 : // processing after our call to the repository that caused this call
258 : // (from the repository) to be made.
259 : {
260 0 : ACE_Guard<ACE_Thread_Mutex> guard(subscription_id_mutex_);
261 0 : if (GUID_UNKNOWN == subscription_id_) {
262 0 : subscription_id_ = yourId;
263 0 : has_subscription_id_ = true;
264 0 : subscription_id_condition_.notify_all();
265 : }
266 0 : }
267 :
268 : // For each writer in the list of writers to associate with, we
269 : // create a WriterInfo and a WriterStats object and store them in
270 : // our internal maps.
271 : //
272 : {
273 :
274 0 : ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, writers_lock_);
275 :
276 0 : const GUID_t& writer_id = writer.writerId;
277 0 : WriterInfo_rch info = make_rch<WriterInfo>(rchandle_from<WriterInfoListener>(this), writer_id, writer.writerQos);
278 0 : std::pair<WriterMapType::iterator, bool> bpair = writers_.insert(
279 : // This insertion is idempotent.
280 0 : WriterMapType::value_type(
281 : writer_id,
282 : info));
283 :
284 : // Schedule timer if necessary
285 : // - only need to check reader qos - we know the writer must be >= reader
286 0 : if (this->qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS) {
287 0 : info->waiting_for_end_historic_samples(true);
288 : }
289 :
290 : {
291 0 : ACE_Guard<ACE_Recursive_Thread_Mutex> guard(statistics_lock_);
292 0 : statistics_.insert(
293 0 : StatsMapType::value_type(
294 : writer_id,
295 0 : WriterStats(raw_latency_buffer_size_, raw_latency_buffer_type_)));
296 0 : }
297 :
298 : // If this is a durable reader
299 0 : if (this->qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS) {
300 : // TODO schedule timer for removing flag from writers
301 : }
302 :
303 0 : if (DCPS_debug_level > 4) {
304 0 : ACE_DEBUG((LM_DEBUG,
305 : "(%P|%t) DataReaderImpl::add_association: "
306 : "inserted writer %C.return %d\n",
307 : LogGuid(writer_id).c_str(), bpair.second));
308 :
309 0 : WriterMapType::iterator iter = writers_.find(writer_id);
310 0 : if (iter != writers_.end()) {
311 : // This may not be an error since it could happen that the sample
312 : // is delivered to the datareader after the write is dis-associated
313 : // with this datareader.
314 0 : ACE_DEBUG((LM_DEBUG,
315 : ACE_TEXT("(%P|%t) DataReaderImpl::add_association: ")
316 : ACE_TEXT("reader %C is associated with writer %C.\n"),
317 : LogGuid(get_guid()).c_str(),
318 : LogGuid(writer_id).c_str()));
319 : }
320 : }
321 0 : }
322 :
323 : // Propagate the add_associations processing down into the Transport
324 : // layer here. This will establish the transport support and reserve
325 : // usage of an existing connection or initiate creation of a new
326 : // connection if no suitable connection is available.
327 0 : AssociationData data;
328 0 : data.remote_id_ = writer.writerId;
329 0 : data.remote_data_ = writer.writerTransInfo;
330 0 : data.discovery_locator_ = writer.writerDiscInfo;
331 0 : data.participant_discovered_at_ = writer.participantDiscoveredAt;
332 0 : data.remote_transport_context_ = writer.transportContext;
333 0 : data.publication_transport_priority_ =
334 0 : writer.writerQos.transport_priority.value;
335 0 : data.remote_reliable_ =
336 0 : (writer.writerQos.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS);
337 0 : data.remote_durable_ =
338 0 : (writer.writerQos.durability.kind > DDS::VOLATILE_DURABILITY_QOS);
339 :
340 0 : if (associate(data, active)) {
341 0 : const Observer_rch observer = get_observer(Observer::e_ASSOCIATED);
342 0 : if (observer) {
343 0 : observer->on_associated(this, data.remote_id_);
344 : }
345 0 : } else {
346 0 : if (DCPS_debug_level) {
347 0 : ACE_ERROR((LM_ERROR,
348 : ACE_TEXT("(%P|%t) DataReaderImpl::add_association: ")
349 : ACE_TEXT("ERROR: transport layer failed to associate.\n")));
350 : }
351 : }
352 0 : }
353 :
354 : void
355 0 : DataReaderImpl::transport_assoc_done(int flags, const GUID_t& remote_id)
356 : {
357 0 : if (!(flags & ASSOC_OK)) {
358 0 : if (DCPS_debug_level) {
359 0 : ACE_ERROR((LM_ERROR,
360 : ACE_TEXT("(%P|%t) DataReaderImpl::transport_assoc_done: ")
361 : ACE_TEXT("ERROR: transport layer failed to associate %C\n"),
362 : LogGuid(remote_id).c_str()));
363 : }
364 0 : return;
365 : }
366 :
367 : // LIVELINESS policy timers are managed here.
368 0 : if (!liveliness_lease_duration_.is_zero()) {
369 0 : if (DCPS_debug_level >= 5) {
370 0 : ACE_DEBUG((LM_DEBUG,
371 : ACE_TEXT("(%P|%t) DataReaderImpl::transport_assoc_done: ")
372 : ACE_TEXT("starting/resetting liveliness timer for reader %C\n"),
373 : LogGuid(get_guid()).c_str()));
374 : }
375 : // this call will start the timer if it is not already set
376 0 : liveliness_timer_->check_liveliness();
377 : }
378 :
379 0 : const RcHandle<DomainParticipantImpl> participant = participant_servant_.lock();
380 :
381 0 : if (!participant)
382 0 : return;
383 :
384 0 : const DDS::InstanceHandle_t handle = participant->assign_handle(remote_id);
385 :
386 0 : if (!is_bit_) {
387 : // We acquire the publication_handle_lock_ for the remainder of our
388 : // processing.
389 : {
390 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, publication_handle_lock_);
391 :
392 : // This insertion is idempotent.
393 0 : publication_id_to_handle_map_.insert(RepoIdToHandleMap::value_type(remote_id, handle));
394 :
395 0 : if (DCPS_debug_level > 4) {
396 0 : ACE_DEBUG((LM_DEBUG,
397 : ACE_TEXT("(%P|%t) DataReaderImpl::transport_assoc_done: ")
398 : ACE_TEXT("id_to_handle_map_[ %C] = 0x%x.\n"),
399 : LogGuid(remote_id).c_str(),
400 : handle));
401 : }
402 :
403 : // We need to adjust these after the insertions have all completed
404 : // since insertions are not guaranteed to increase the number of
405 : // currently matched publications.
406 0 : const int matchedPublications = static_cast<int>(publication_id_to_handle_map_.size());
407 0 : subscription_match_status_.current_count_change =
408 0 : matchedPublications - subscription_match_status_.current_count;
409 0 : subscription_match_status_.current_count = matchedPublications;
410 :
411 0 : ++subscription_match_status_.total_count;
412 0 : ++subscription_match_status_.total_count_change;
413 :
414 0 : subscription_match_status_.last_publication_handle = handle;
415 :
416 0 : set_status_changed_flag(DDS::SUBSCRIPTION_MATCHED_STATUS, true);
417 :
418 : DDS::DataReaderListener_var listener =
419 0 : listener_for(DDS::SUBSCRIPTION_MATCHED_STATUS);
420 :
421 0 : if (!CORBA::is_nil(listener)) {
422 0 : listener->on_subscription_matched(this, subscription_match_status_);
423 :
424 : // TBD - why does the spec say to change this but not change
425 : // the ChangeFlagStatus after a listener call?
426 :
427 : // Client will look at it so next time it looks the change should be 0
428 0 : subscription_match_status_.total_count_change = 0;
429 0 : subscription_match_status_.current_count_change = 0;
430 : }
431 :
432 0 : notify_status_condition();
433 0 : }
434 :
435 : {
436 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
437 0 : ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, writers_lock_);
438 :
439 0 : if (!writers_.count(remote_id)) {
440 0 : return;
441 : }
442 0 : writers_[remote_id]->handle(handle);
443 0 : }
444 : }
445 :
446 0 : if (monitor_) {
447 0 : monitor_->report();
448 : }
449 0 : }
450 :
451 : void
452 0 : DataReaderImpl::remove_associations(const WriterIdSeq& writers,
453 : bool notify_lost)
454 : {
455 : DBG_ENTRY_LVL("DataReaderImpl", "remove_associations", 6);
456 :
457 0 : if (writers.length() == 0) {
458 0 : return;
459 : }
460 :
461 0 : const Observer_rch observer = get_observer(Observer::e_DISASSOCIATED);
462 0 : if (observer) {
463 0 : for (CORBA::ULong i = 0; i < writers.length(); ++i) {
464 0 : observer->on_disassociated(this, writers[i]);
465 : }
466 : }
467 :
468 0 : if (DCPS_debug_level >= 1) {
469 0 : ACE_DEBUG((LM_DEBUG,
470 : ACE_TEXT("(%P|%t) DataReaderImpl::remove_associations: ")
471 : ACE_TEXT("bit %d local %C remote %C num remotes %d\n"),
472 : is_bit_,
473 : LogGuid(get_guid()).c_str(),
474 : LogGuid(writers[0]).c_str(),
475 : writers.length()));
476 : }
477 0 : if (!get_deleted()) {
478 : // stop pending associations for these writer ids
479 0 : this->stop_associating(writers.get_buffer(), writers.length());
480 :
481 : {
482 0 : CORBA::ULong wr_len = writers.length();
483 0 : ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_);
484 :
485 0 : for (CORBA::ULong i = 0; i < wr_len; i++) {
486 0 : const GUID_t writer_id = writers[i];
487 : {
488 0 : ACE_Guard<ACE_Recursive_Thread_Mutex> guard(statistics_lock_);
489 0 : statistics_.erase(writer_id);
490 0 : }
491 : }
492 0 : }
493 : }
494 :
495 0 : remove_associations_i(writers, notify_lost);
496 0 : }
497 :
498 : void
499 0 : DataReaderImpl::remove_associations_i(const WriterIdSeq& writers,
500 : bool notify_lost)
501 : {
502 : DBG_ENTRY_LVL("DataReaderImpl", "remove_associations_i", 6);
503 :
504 0 : if (writers.length() == 0) {
505 0 : return;
506 : }
507 :
508 0 : if (DCPS_debug_level >= 1) {
509 0 : ACE_DEBUG((LM_DEBUG,
510 : ACE_TEXT("(%P|%t) DataReaderImpl::remove_associations_i: ")
511 : ACE_TEXT("bit %d local %C remote %C num remotes %d\n"),
512 : is_bit_,
513 : LogGuid(get_guid()).c_str(),
514 : LogGuid(writers[0]).c_str(),
515 : writers.length()));
516 : }
517 0 : DDS::InstanceHandleSeq handles;
518 :
519 0 : CORBA::ULong wr_len = writers.length();
520 :
521 : // Flush historic samples and/or allow in-progress delivery of historic samples to complete
522 0 : for (CORBA::ULong i = 0; i < wr_len; i++) {
523 0 : resume_sample_processing(writers[i]);
524 : }
525 :
526 : // This is used to hold the list of writers which were actually
527 : // removed, which is a proper subset of the writers which were
528 : // requested to be removed.
529 0 : WriterIdSeq updated_writers;
530 0 : WriterMapType removed_writers;
531 :
532 : //Remove the writers from writer list. If the supplied writer
533 : //is not in the cached writers list then it is already removed.
534 : //We just need remove the writers in the list that have not been
535 : //removed.
536 : {
537 0 : ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_);
538 :
539 0 : for (CORBA::ULong i = 0; i < wr_len; i++) {
540 0 : const GUID_t writer_id = writers[i];
541 :
542 0 : WriterMapType::iterator it = this->writers_.find(writer_id);
543 :
544 0 : if (it != this->writers_.end()) {
545 0 : removed_writers.insert(*it);
546 0 : end_historic_sweeper_->cancel_timer(it->second);
547 : }
548 :
549 0 : if (this->writers_.erase(writer_id) == 0) {
550 0 : if (DCPS_debug_level >= 1) {
551 0 : ACE_DEBUG((LM_DEBUG,
552 : ACE_TEXT("(%P|%t) DataReaderImpl::remove_associations_i: ")
553 : ACE_TEXT("the writer local %C was already removed.\n"),
554 : LogGuid(writer_id).c_str()));
555 : }
556 :
557 : } else {
558 0 : push_back(updated_writers, writer_id);
559 : }
560 : }
561 0 : }
562 :
563 0 : for (WriterMapType::iterator it = removed_writers.begin(); it != removed_writers.end(); ++it) {
564 0 : it->second->removed();
565 : }
566 0 : removed_writers.clear();
567 :
568 0 : wr_len = updated_writers.length();
569 :
570 : // Return now if the supplied writers have been removed already.
571 0 : if (wr_len == 0) {
572 0 : return;
573 : }
574 :
575 0 : if (!is_bit_) {
576 : // The writer should be in the id_to_handle map at this time.
577 0 : this->lookup_instance_handles(updated_writers, handles);
578 :
579 0 : ACE_Guard<ACE_Recursive_Thread_Mutex> guard(publication_handle_lock_);
580 :
581 0 : for (CORBA::ULong i = 0; i < wr_len; ++i) {
582 0 : publication_id_to_handle_map_.erase(updated_writers[i]);
583 : }
584 0 : }
585 :
586 0 : for (CORBA::ULong i = 0; i < updated_writers.length(); ++i) {
587 : {
588 0 : this->disassociate(updated_writers[i]);
589 : }
590 : }
591 :
592 : // Mirror the add_associations SUBSCRIPTION_MATCHED_STATUS processing.
593 0 : if (!this->is_bit_) {
594 0 : ACE_Guard<ACE_Recursive_Thread_Mutex> justMe(publication_handle_lock_);
595 :
596 : // Derive the change in the number of publications writing to this reader.
597 0 : int matchedPublications = static_cast<int>(this->publication_id_to_handle_map_.size());
598 : this->subscription_match_status_.current_count_change
599 0 : = matchedPublications - this->subscription_match_status_.current_count;
600 :
601 : // Only process status if the number of publications has changed.
602 0 : if (this->subscription_match_status_.current_count_change != 0) {
603 0 : this->subscription_match_status_.current_count = matchedPublications;
604 :
605 : /// Section 7.1.4.1: total_count will not decrement.
606 :
607 : /// @TODO: Reconcile this with the verbiage in section 7.1.4.1
608 : this->subscription_match_status_.last_publication_handle
609 0 : = handles[ wr_len - 1];
610 :
611 0 : set_status_changed_flag(DDS::SUBSCRIPTION_MATCHED_STATUS, true);
612 :
613 : DDS::DataReaderListener_var listener
614 0 : = listener_for(DDS::SUBSCRIPTION_MATCHED_STATUS);
615 :
616 0 : if (!CORBA::is_nil(listener.in())) {
617 0 : listener->on_subscription_matched(this, this->subscription_match_status_);
618 :
619 : // Client will look at it so next time it looks the change should be 0
620 0 : this->subscription_match_status_.total_count_change = 0;
621 0 : this->subscription_match_status_.current_count_change = 0;
622 : }
623 0 : notify_status_condition();
624 0 : }
625 0 : }
626 :
627 : // If this remove_association is invoked when the InfoRepo
628 : // detects a lost writer then make a callback to notify
629 : // subscription lost.
630 0 : if (notify_lost) {
631 0 : this->notify_subscription_lost(handles);
632 : }
633 :
634 0 : if (this->monitor_) {
635 0 : this->monitor_->report();
636 : }
637 0 : }
638 :
639 : void
640 0 : DataReaderImpl::remove_all_associations()
641 : {
642 : DBG_ENTRY_LVL("DataReaderImpl","remove_all_associations",6);
643 0 : stop_associating();
644 :
645 0 : OpenDDS::DCPS::WriterIdSeq writers;
646 : int size;
647 :
648 : {
649 0 : ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
650 :
651 0 : size = static_cast<int>(writers_.size());
652 0 : writers.length(size);
653 :
654 0 : WriterMapType::iterator curr_writer = writers_.begin();
655 0 : WriterMapType::iterator end_writer = writers_.end();
656 :
657 0 : int i = 0;
658 :
659 0 : while (curr_writer != end_writer) {
660 0 : writers[i++] = curr_writer->first;
661 0 : ++curr_writer;
662 : }
663 0 : }
664 :
665 : try {
666 0 : if (0 < size) {
667 0 : remove_associations(writers, false);
668 : }
669 0 : } catch (const CORBA::Exception&) {
670 0 : ACE_DEBUG((LM_WARNING,
671 : ACE_TEXT("(%P|%t) WARNING: DataReaderImpl::remove_all_associations() - ")
672 : ACE_TEXT("caught exception from remove_associations.\n")));
673 0 : }
674 :
675 0 : transport_stop();
676 0 : }
677 :
678 : void
679 0 : DataReaderImpl::update_incompatible_qos(const IncompatibleQosStatus& status)
680 : {
681 : DDS::DataReaderListener_var listener =
682 0 : listener_for(DDS::REQUESTED_INCOMPATIBLE_QOS_STATUS);
683 :
684 0 : if (this->requested_incompatible_qos_status_.total_count == status.total_count) {
685 : // This test should make the method idempotent.
686 0 : return;
687 : }
688 :
689 0 : set_status_changed_flag(DDS::REQUESTED_INCOMPATIBLE_QOS_STATUS,
690 : true);
691 :
692 : // copy status and increment change
693 0 : requested_incompatible_qos_status_.total_count = status.total_count;
694 0 : requested_incompatible_qos_status_.total_count_change +=
695 0 : status.count_since_last_send;
696 0 : requested_incompatible_qos_status_.last_policy_id =
697 0 : status.last_policy_id;
698 0 : requested_incompatible_qos_status_.policies = status.policies;
699 :
700 0 : if (!CORBA::is_nil(listener.in())) {
701 0 : listener->on_requested_incompatible_qos(this, requested_incompatible_qos_status_);
702 :
703 : // TBD - why does the spec say to change total_count_change but not
704 : // change the ChangeFlagStatus after a listener call?
705 :
706 : // client just looked at it so next time it looks the
707 : // change should be 0
708 0 : requested_incompatible_qos_status_.total_count_change = 0;
709 : }
710 :
711 0 : notify_status_condition();
712 0 : }
713 :
714 : void
715 0 : DataReaderImpl::signal_liveliness(const GUID_t& remote_participant)
716 : {
717 0 : GUID_t prefix = remote_participant;
718 0 : prefix.entityId = EntityId_t();
719 :
720 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_);
721 :
722 : typedef std::pair<GUID_t, WriterInfo_rch> RepoWriterPair;
723 : typedef OPENDDS_VECTOR(RepoWriterPair) WriterSet;
724 0 : WriterSet writers;
725 :
726 : {
727 0 : ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
728 0 : for (WriterMapType::iterator pos = writers_.lower_bound(prefix),
729 0 : limit = writers_.end();
730 0 : pos != limit && equal_guid_prefixes(pos->first, prefix);
731 0 : ++pos) {
732 0 : writers.push_back(std::make_pair(pos->first, pos->second));
733 : }
734 0 : }
735 :
736 0 : const MonotonicTimePoint when = MonotonicTimePoint::now();
737 0 : for (WriterSet::iterator pos = writers.begin(), limit = writers.end();
738 0 : pos != limit;
739 0 : ++pos) {
740 0 : pos->second->received_activity(when);
741 : }
742 :
743 0 : if (!writers.empty()) {
744 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
745 0 : for (WriterSet::iterator pos = writers.begin(), limit = writers.end();
746 0 : pos != limit;
747 0 : ++pos) {
748 0 : for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
749 0 : iter != instances_.end();
750 0 : ++iter) {
751 0 : SubscriptionInstance_rch ptr = iter->second;
752 0 : ptr->instance_state_->lively(pos->first);
753 0 : }
754 : }
755 0 : }
756 0 : }
757 :
758 0 : DDS::ReadCondition_ptr DataReaderImpl::create_readcondition(
759 : DDS::SampleStateMask sample_states,
760 : DDS::ViewStateMask view_states,
761 : DDS::InstanceStateMask instance_states)
762 : {
763 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_, 0);
764 0 : DDS::ReadCondition_var rc = new ReadConditionImpl(this, sample_states,
765 0 : view_states, instance_states);
766 0 : read_conditions_.insert(rc);
767 0 : return rc._retn();
768 0 : }
769 :
770 : #ifndef OPENDDS_NO_QUERY_CONDITION
771 0 : DDS::QueryCondition_ptr DataReaderImpl::create_querycondition(
772 : DDS::SampleStateMask sample_states,
773 : DDS::ViewStateMask view_states,
774 : DDS::InstanceStateMask instance_states,
775 : const char* query_expression,
776 : const DDS::StringSeq& query_parameters)
777 : {
778 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_, 0);
779 : try {
780 0 : DDS::QueryCondition_var qc = new QueryConditionImpl(this, sample_states,
781 0 : view_states, instance_states, query_expression);
782 0 : if (qc->set_query_parameters(query_parameters) != DDS::RETCODE_OK) {
783 0 : return 0;
784 : }
785 0 : DDS::ReadCondition_var rc = DDS::ReadCondition::_duplicate(qc);
786 0 : read_conditions_.insert(rc);
787 0 : return qc._retn();
788 0 : } catch (const std::exception& e) {
789 0 : if (DCPS_debug_level) {
790 0 : ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ")
791 : ACE_TEXT("DataReaderImpl::create_querycondition - %C\n"),
792 : e.what()));
793 : }
794 0 : }
795 0 : return 0;
796 0 : }
797 : #endif
798 :
799 0 : bool DataReaderImpl::has_readcondition(DDS::ReadCondition_ptr a_condition)
800 : {
801 : //sample lock already held
802 0 : DDS::ReadCondition_var rc = DDS::ReadCondition::_duplicate(a_condition);
803 0 : return read_conditions_.find(rc) != read_conditions_.end();
804 0 : }
805 :
806 0 : DDS::ReturnCode_t DataReaderImpl::delete_readcondition(
807 : DDS::ReadCondition_ptr a_condition)
808 : {
809 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_,
810 : DDS::RETCODE_OUT_OF_RESOURCES);
811 0 : DDS::ReadCondition_var rc = DDS::ReadCondition::_duplicate(a_condition);
812 0 : return read_conditions_.erase(rc)
813 0 : ? DDS::RETCODE_OK : DDS::RETCODE_PRECONDITION_NOT_MET;
814 0 : }
815 :
816 0 : DDS::ReturnCode_t DataReaderImpl::delete_contained_entities()
817 : {
818 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_,
819 : DDS::RETCODE_OUT_OF_RESOURCES);
820 0 : read_conditions_.clear();
821 0 : return DDS::RETCODE_OK;
822 0 : }
823 :
824 0 : DDS::ReturnCode_t DataReaderImpl::set_qos(const DDS::DataReaderQos& qos)
825 : {
826 : OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
827 : OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
828 : OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
829 :
830 0 : DDS::DataReaderQos new_qos = qos;
831 0 : new_qos.representation.value = qos_.representation.value;
832 0 : if (Qos_Helper::valid(new_qos) && Qos_Helper::consistent(new_qos)) {
833 :
834 0 : if (qos_ == new_qos)
835 0 : return DDS::RETCODE_OK;
836 :
837 0 : if (enabled_) {
838 0 : if (!Qos_Helper::changeable(qos_, new_qos)) {
839 0 : return DDS::RETCODE_IMMUTABLE_POLICY;
840 :
841 : } else {
842 0 : Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
843 0 : DDS::SubscriberQos subscriberQos;
844 :
845 0 : RcHandle<SubscriberImpl> subscriber = get_subscriber_servant();
846 0 : bool status = false;
847 0 : if (subscriber) {
848 0 : subscriber->get_qos(subscriberQos);
849 : status =
850 0 : disco->update_subscription_qos(
851 : domain_id_,
852 0 : dp_id_,
853 0 : subscription_id_,
854 : new_qos,
855 : subscriberQos);
856 : }
857 0 : if (!status) {
858 0 : ACE_ERROR_RETURN((LM_ERROR,
859 : ACE_TEXT("(%P|%t) DataReaderImpl::set_qos, ")
860 : ACE_TEXT("qos not updated.\n")),
861 : DDS::RETCODE_ERROR);
862 : }
863 0 : }
864 : }
865 :
866 0 : qos_change(new_qos);
867 0 : qos_ = new_qos;
868 0 : passed_qos_ = qos;
869 :
870 0 : const Observer_rch observer = get_observer(Observer::e_QOS_CHANGED);
871 0 : if (observer) {
872 0 : observer->on_qos_changed(this);
873 : }
874 :
875 0 : return DDS::RETCODE_OK;
876 :
877 0 : } else {
878 0 : return DDS::RETCODE_INCONSISTENT_POLICY;
879 : }
880 0 : }
881 :
882 0 : void DataReaderImpl::qos_change(const DDS::DataReaderQos & qos)
883 : {
884 : // Reset the deadline timer if the period has changed.
885 0 : if (qos_.deadline.period.sec != qos.deadline.period.sec ||
886 0 : qos_.deadline.period.nanosec != qos.deadline.period.nanosec) {
887 0 : if (qos_.deadline.period.sec == DDS::DURATION_INFINITE_SEC &&
888 0 : qos_.deadline.period.nanosec == DDS::DURATION_INFINITE_NSEC) {
889 0 : deadline_period_ = TimeDuration(qos.deadline.period);
890 0 : deadline_queue_enabled_ = true;
891 0 : } else if (qos.deadline.period.sec == DDS::DURATION_INFINITE_SEC &&
892 0 : qos.deadline.period.nanosec == DDS::DURATION_INFINITE_NSEC) {
893 0 : cancel_all_deadlines();
894 0 : deadline_queue_enabled_ = false;
895 : } else {
896 0 : reset_deadline_period(TimeDuration(qos.deadline.period));
897 : }
898 : }
899 0 : }
900 :
901 : DDS::ReturnCode_t
902 0 : DataReaderImpl::get_qos(
903 : DDS::DataReaderQos & qos)
904 : {
905 0 : qos = passed_qos_;
906 0 : return DDS::RETCODE_OK;
907 : }
908 :
909 0 : DDS::ReturnCode_t DataReaderImpl::set_listener(
910 : DDS::DataReaderListener_ptr a_listener,
911 : DDS::StatusMask mask)
912 : {
913 0 : ACE_Guard<ACE_Thread_Mutex> g(listener_mutex_);
914 0 : listener_mask_ = mask;
915 : //note: OK to duplicate a nil object ref
916 0 : listener_ = DDS::DataReaderListener::_duplicate(a_listener);
917 0 : return DDS::RETCODE_OK;
918 0 : }
919 :
920 0 : DDS::DataReaderListener_ptr DataReaderImpl::get_listener()
921 : {
922 0 : ACE_Guard<ACE_Thread_Mutex> g(listener_mutex_);
923 0 : return DDS::DataReaderListener::_duplicate(listener_.in());
924 0 : }
925 :
926 0 : DataReaderListener_ptr DataReaderImpl::get_ext_listener()
927 : {
928 0 : ACE_Guard<ACE_Thread_Mutex> g(listener_mutex_);
929 0 : return DataReaderListener::_narrow(listener_.in());
930 0 : }
931 :
932 0 : DDS::TopicDescription_ptr DataReaderImpl::get_topicdescription()
933 : {
934 : #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
935 : {
936 0 : ACE_Guard<ACE_Thread_Mutex> guard(content_filtered_topic_mutex_);
937 0 : if (content_filtered_topic_) {
938 0 : return DDS::TopicDescription::_duplicate(content_filtered_topic_.get());
939 : }
940 0 : }
941 : #endif
942 0 : return DDS::TopicDescription::_duplicate(topic_desc_.in());
943 : }
944 :
945 0 : DDS::Subscriber_ptr DataReaderImpl::get_subscriber()
946 : {
947 0 : return get_subscriber_servant()._retn();
948 : }
949 :
950 : DDS::ReturnCode_t
951 0 : DataReaderImpl::get_sample_rejected_status(
952 : DDS::SampleRejectedStatus & status)
953 : {
954 0 : ACE_Guard<ACE_Recursive_Thread_Mutex> justMe(this->sample_lock_);
955 :
956 0 : set_status_changed_flag(DDS::SAMPLE_REJECTED_STATUS, false);
957 0 : status = sample_rejected_status_;
958 0 : sample_rejected_status_.total_count_change = 0;
959 0 : return DDS::RETCODE_OK;
960 0 : }
961 :
962 : DDS::ReturnCode_t
963 0 : DataReaderImpl::get_liveliness_changed_status(
964 : DDS::LivelinessChangedStatus & status)
965 : {
966 0 : ACE_Guard<ACE_Recursive_Thread_Mutex> justMe(this->sample_lock_);
967 :
968 0 : set_status_changed_flag(DDS::LIVELINESS_CHANGED_STATUS,
969 : false);
970 0 : status = liveliness_changed_status_;
971 :
972 0 : liveliness_changed_status_.alive_count_change = 0;
973 0 : liveliness_changed_status_.not_alive_count_change = 0;
974 :
975 0 : return DDS::RETCODE_OK;
976 0 : }
977 :
978 : DDS::ReturnCode_t
979 0 : DataReaderImpl::get_requested_deadline_missed_status(
980 : DDS::RequestedDeadlineMissedStatus & status)
981 : {
982 0 : ACE_Guard<ACE_Recursive_Thread_Mutex> justMe(this->sample_lock_);
983 :
984 0 : set_status_changed_flag(DDS::REQUESTED_DEADLINE_MISSED_STATUS,
985 : false);
986 :
987 0 : this->requested_deadline_missed_status_.total_count_change =
988 0 : this->requested_deadline_missed_status_.total_count
989 0 : - this->last_deadline_missed_total_count_;
990 :
991 : // DDS::RequestedDeadlineMissedStatus::last_instance_handle field
992 : // is updated by the RequestedDeadlineWatchdog.
993 :
994 : // Update for next status check.
995 0 : this->last_deadline_missed_total_count_ =
996 0 : this->requested_deadline_missed_status_.total_count;
997 :
998 0 : status = requested_deadline_missed_status_;
999 :
1000 0 : return DDS::RETCODE_OK;
1001 0 : }
1002 :
1003 : DDS::ReturnCode_t
1004 0 : DataReaderImpl::get_requested_incompatible_qos_status(
1005 : DDS::RequestedIncompatibleQosStatus & status)
1006 : {
1007 0 : ACE_Guard<ACE_Recursive_Thread_Mutex> justMe(publication_handle_lock_);
1008 :
1009 0 : set_status_changed_flag(DDS::REQUESTED_INCOMPATIBLE_QOS_STATUS, false);
1010 0 : status = requested_incompatible_qos_status_;
1011 0 : requested_incompatible_qos_status_.total_count_change = 0;
1012 :
1013 0 : return DDS::RETCODE_OK;
1014 0 : }
1015 :
1016 : DDS::ReturnCode_t
1017 0 : DataReaderImpl::get_subscription_matched_status(
1018 : DDS::SubscriptionMatchedStatus & status)
1019 : {
1020 0 : ACE_Guard<ACE_Recursive_Thread_Mutex> justMe(publication_handle_lock_);
1021 :
1022 0 : set_status_changed_flag(DDS::SUBSCRIPTION_MATCHED_STATUS, false);
1023 0 : status = subscription_match_status_;
1024 0 : subscription_match_status_.total_count_change = 0;
1025 0 : subscription_match_status_.current_count_change = 0;
1026 :
1027 0 : return DDS::RETCODE_OK;
1028 0 : }
1029 :
1030 : DDS::ReturnCode_t
1031 0 : DataReaderImpl::get_sample_lost_status(
1032 : DDS::SampleLostStatus & status)
1033 : {
1034 0 : ACE_Guard<ACE_Recursive_Thread_Mutex> justMe(this->sample_lock_);
1035 :
1036 0 : set_status_changed_flag(DDS::SAMPLE_LOST_STATUS, false);
1037 0 : status = sample_lost_status_;
1038 0 : sample_lost_status_.total_count_change = 0;
1039 0 : return DDS::RETCODE_OK;
1040 0 : }
1041 :
1042 : DDS::ReturnCode_t
1043 0 : DataReaderImpl::wait_for_historical_data(
1044 : const DDS::Duration_t & /* max_wait */)
1045 : {
1046 : // Add your implementation here
1047 0 : return DDS::RETCODE_OK;
1048 : }
1049 :
1050 : DDS::ReturnCode_t
1051 0 : DataReaderImpl::get_matched_publications(
1052 : DDS::InstanceHandleSeq & publication_handles)
1053 : {
1054 0 : if (!enabled_) {
1055 0 : ACE_ERROR_RETURN((LM_ERROR,
1056 : ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::get_matched_publications: ")
1057 : ACE_TEXT(" Entity is not enabled.\n")),
1058 : DDS::RETCODE_NOT_ENABLED);
1059 : }
1060 :
1061 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
1062 : guard,
1063 : publication_handle_lock_,
1064 : DDS::RETCODE_ERROR);
1065 :
1066 : // Copy out the handles for the current set of publications.
1067 0 : int index = 0;
1068 0 : publication_handles.length(static_cast<CORBA::ULong>(this->publication_id_to_handle_map_.size()));
1069 :
1070 0 : for (RepoIdToHandleMap::iterator
1071 0 : current = this->publication_id_to_handle_map_.begin();
1072 0 : current != this->publication_id_to_handle_map_.end();
1073 0 : ++current, ++index) {
1074 0 : publication_handles[index] = current->second;
1075 : }
1076 :
1077 0 : return DDS::RETCODE_OK;
1078 0 : }
1079 :
1080 : #if !defined (DDS_HAS_MINIMUM_BIT)
1081 : DDS::ReturnCode_t
1082 0 : DataReaderImpl::get_matched_publication_data(
1083 : DDS::PublicationBuiltinTopicData & publication_data,
1084 : DDS::InstanceHandle_t publication_handle)
1085 : {
1086 0 : if (!enabled_) {
1087 0 : ACE_ERROR_RETURN((LM_ERROR,
1088 : ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::")
1089 : ACE_TEXT("get_matched_publication_data: ")
1090 : ACE_TEXT("Entity is not enabled.\n")),
1091 : DDS::RETCODE_NOT_ENABLED);
1092 : }
1093 :
1094 0 : RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock();
1095 :
1096 0 : if (!participant)
1097 0 : return DDS::RETCODE_ERROR;
1098 :
1099 0 : DDS::PublicationBuiltinTopicDataSeq data;
1100 0 : const DDS::ReturnCode_t ret = instance_handle_to_bit_data<DDS::PublicationBuiltinTopicDataDataReader_var>(
1101 : participant.in(),
1102 : BUILT_IN_PUBLICATION_TOPIC,
1103 : publication_handle,
1104 : data);
1105 :
1106 0 : if (ret == DDS::RETCODE_OK) {
1107 0 : publication_data = data[0];
1108 : }
1109 :
1110 0 : return ret;
1111 0 : }
1112 : #endif // !defined (DDS_HAS_MINIMUM_BIT)
1113 :
1114 : DDS::ReturnCode_t
1115 0 : DataReaderImpl::enable()
1116 : {
1117 : // According to spec:
1118 : // - Calling enable on an already enabled Entity has no effect and returns OK.
1119 : // - Calling enable on an Entity whose factory is not enabled will fail
1120 : // and return PRECONDITION_NOT_MET.
1121 :
1122 0 : if (this->is_enabled()) {
1123 0 : return DDS::RETCODE_OK;
1124 : }
1125 :
1126 0 : RcHandle<SubscriberImpl> subscriber = get_subscriber_servant();
1127 0 : if (!subscriber) {
1128 0 : return DDS::RETCODE_ERROR;
1129 : }
1130 :
1131 0 : if (!subscriber->is_enabled()) {
1132 0 : return DDS::RETCODE_PRECONDITION_NOT_MET;
1133 : }
1134 :
1135 0 : if (topic_servant_ && !topic_servant_->is_enabled()) {
1136 0 : return DDS::RETCODE_PRECONDITION_NOT_MET;
1137 : }
1138 :
1139 0 : RcHandle<DomainParticipantImpl> participant = participant_servant_.lock();
1140 0 : if (participant) {
1141 0 : dp_id_ = participant->get_id();
1142 : }
1143 :
1144 0 : if (topic_servant_) {
1145 0 : set_reader_effective_data_rep_qos(qos_.representation.value);
1146 0 : if (!topic_servant_->check_data_representation(qos_.representation.value, false)) {
1147 0 : return DDS::RETCODE_ERROR;
1148 : }
1149 : }
1150 :
1151 0 : if (qos_.history.kind == DDS::KEEP_ALL_HISTORY_QOS) {
1152 : // The spec says qos_.history.depth is "has no effect"
1153 : // when history.kind = KEEP_ALL so use max_samples_per_instance
1154 0 : depth_ = qos_.resource_limits.max_samples_per_instance;
1155 :
1156 : } else { // qos_.history.kind == DDS::KEEP_LAST_HISTORY_QOS
1157 0 : depth_ = qos_.history.depth;
1158 : }
1159 :
1160 0 : if (depth_ == DDS::LENGTH_UNLIMITED) {
1161 : // DDS::LENGTH_UNLIMITED is negative so make it a positive
1162 : // value that is, for all intents and purposes, unlimited
1163 : // and we can use it for comparisons.
1164 : // WARNING: The client risks running out of memory in this case.
1165 0 : depth_ = ACE_INT32_MAX;
1166 : }
1167 :
1168 0 : if (qos_.resource_limits.max_samples != DDS::LENGTH_UNLIMITED) {
1169 0 : n_chunks_ = qos_.resource_limits.max_samples;
1170 : }
1171 :
1172 : //else using value from Service_Participant
1173 :
1174 : // enable the type specific part of this DataReader
1175 0 : this->enable_specific();
1176 :
1177 : //Note: the QoS used to set n_chunks_ is Changeable=No so
1178 : // it is OK that we cannot change the size of our allocators.
1179 0 : rd_allocator_.reset(new ReceivedDataAllocator(n_chunks_));
1180 :
1181 0 : if (DCPS_debug_level >= 2)
1182 0 : ACE_DEBUG((LM_DEBUG,"(%P|%t) DataReaderImpl::enable"
1183 : " Cached_Allocator_With_Overflow %x with %d chunks\n",
1184 : rd_allocator_.get(), n_chunks_));
1185 :
1186 0 : if ((qos_.liveliness.lease_duration.sec !=
1187 0 : DDS::DURATION_INFINITE_SEC) &&
1188 0 : (qos_.liveliness.lease_duration.nanosec !=
1189 : DDS::DURATION_INFINITE_NSEC)) {
1190 0 : liveliness_lease_duration_ = TimeDuration(qos_.liveliness.lease_duration);
1191 : }
1192 :
1193 : // Setup the requested deadline watchdog if the configured deadline
1194 : // period is not the default (infinite).
1195 0 : DDS::Duration_t const deadline_period = this->qos_.deadline.period;
1196 :
1197 0 : if (!deadline_queue_enabled_
1198 0 : && (deadline_period.sec != DDS::DURATION_INFINITE_SEC
1199 0 : || deadline_period.nanosec != DDS::DURATION_INFINITE_NSEC)) {
1200 0 : deadline_period_ = TimeDuration(qos_.deadline.period);
1201 0 : deadline_queue_enabled_ = true;
1202 : }
1203 :
1204 0 : Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
1205 0 : disco->pre_reader(this);
1206 :
1207 0 : this->set_enabled();
1208 :
1209 0 : if (topic_servant_ && !transport_disabled_) {
1210 : try {
1211 0 : this->enable_transport(this->qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS,
1212 0 : this->qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS);
1213 0 : } catch (const Transport::Exception&) {
1214 0 : ACE_ERROR((LM_ERROR,
1215 : ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::enable, ")
1216 : ACE_TEXT("Transport Exception.\n")));
1217 0 : return DDS::RETCODE_ERROR;
1218 0 : }
1219 :
1220 0 : const DDS::ReturnCode_t setup_deserialization_result = setup_deserialization();
1221 0 : if (setup_deserialization_result != DDS::RETCODE_OK) {
1222 0 : return setup_deserialization_result;
1223 : }
1224 :
1225 0 : const TransportLocatorSeq& trans_conf_info = connection_info();
1226 :
1227 0 : CORBA::String_var filterClassName = "";
1228 0 : CORBA::String_var filterExpression = "";
1229 0 : DDS::StringSeq exprParams;
1230 : #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
1231 : {
1232 0 : ACE_Guard<ACE_Thread_Mutex> guard(content_filtered_topic_mutex_);
1233 0 : if (content_filtered_topic_) {
1234 0 : filterClassName = content_filtered_topic_->get_filter_class_name();
1235 0 : filterExpression = content_filtered_topic_->get_filter_expression();
1236 0 : content_filtered_topic_->get_expression_parameters(exprParams);
1237 : }
1238 0 : }
1239 : #endif
1240 :
1241 0 : DDS::SubscriberQos sub_qos;
1242 0 : subscriber->get_qos(sub_qos);
1243 :
1244 : TypeSupportImpl* const typesupport =
1245 0 : dynamic_cast<TypeSupportImpl*>(topic_servant_->get_type_support());
1246 0 : if (!typesupport) {
1247 0 : return DDS::RETCODE_ERROR;
1248 : }
1249 :
1250 0 : XTypes::TypeInformation type_info;
1251 0 : typesupport->to_type_info(type_info);
1252 :
1253 0 : XTypes::TypeLookupService_rch type_lookup_service = participant->get_type_lookup_service();
1254 0 : typesupport->add_types(type_lookup_service);
1255 :
1256 0 : install_type_support(typesupport);
1257 :
1258 : const GUID_t subscription_id =
1259 0 : disco->add_subscription(domain_id_,
1260 0 : dp_id_,
1261 0 : topic_servant_->get_id(),
1262 0 : rchandle_from(this),
1263 0 : qos_,
1264 : trans_conf_info,
1265 : sub_qos,
1266 : filterClassName,
1267 : filterExpression,
1268 : exprParams,
1269 : type_info);
1270 :
1271 : #if defined(OPENDDS_SECURITY)
1272 : {
1273 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, sample_lock_, DDS::RETCODE_ERROR);
1274 0 : security_config_ = participant->get_security_config();
1275 0 : dynamic_type_ = typesupport->get_type();
1276 0 : }
1277 : #endif
1278 :
1279 : {
1280 0 : ACE_Guard<ACE_Thread_Mutex> guard(subscription_id_mutex_);
1281 0 : subscription_id_ = subscription_id;
1282 0 : has_subscription_id_ = true;
1283 0 : subscription_id_condition_.notify_all();
1284 0 : }
1285 :
1286 0 : if (subscription_id == GUID_UNKNOWN) {
1287 0 : if (DCPS_debug_level >= 1) {
1288 0 : ACE_DEBUG((LM_WARNING, "(%P|%t) WARNING: DataReaderImpl::enable: "
1289 : "add_subscription failed\n"));
1290 : }
1291 0 : return DDS::RETCODE_ERROR;
1292 : }
1293 :
1294 0 : if (DCPS_debug_level >= 2) {
1295 0 : ACE_DEBUG((LM_DEBUG, "(%P|%t) DataReaderImpl::enable: "
1296 : "got GUID %C, subscribed to topic name \"%C\" type \"%C\"\n",
1297 : LogGuid(get_guid()).c_str(),
1298 : topic_servant_->topic_name(), topic_servant_->type_name()));
1299 : }
1300 0 : }
1301 :
1302 0 : DDS::ReturnCode_t return_value = DDS::RETCODE_OK;
1303 0 : if (topic_servant_) {
1304 0 : const CORBA::String_var name = topic_servant_->get_name();
1305 0 : return_value = subscriber->reader_enabled(name.in(), this);
1306 :
1307 0 : if (this->monitor_) {
1308 0 : this->monitor_->report();
1309 : }
1310 0 : }
1311 :
1312 0 : if (return_value == DDS::RETCODE_OK) {
1313 0 : const Observer_rch observer = get_observer(Observer::e_ENABLED);
1314 0 : if (observer) {
1315 0 : observer->on_enabled(this);
1316 : }
1317 0 : }
1318 :
1319 0 : return return_value;
1320 0 : }
1321 :
1322 : void
1323 0 : DataReaderImpl::writer_activity(const DataSampleHeader& header)
1324 : {
1325 : // caller should have the sample_lock_ !!!
1326 :
1327 0 : WriterInfo_rch writer;
1328 :
1329 : // The received_activity() has to be called outside the writers_lock_
1330 : // because it probably acquire writers_lock_ read lock recursively
1331 : // (in handle_timeout). This could cause deadlock when there are writers
1332 : // waiting.
1333 : {
1334 0 : ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
1335 0 : WriterMapType::iterator iter = writers_.find(header.publication_id_);
1336 :
1337 0 : if (iter != writers_.end()) {
1338 0 : writer = iter->second;
1339 :
1340 0 : } else if (DCPS_debug_level > 4) {
1341 : // This may not be an error since it could happen that the sample
1342 : // is delivered to the datareader after the write is dis-associated
1343 : // with this datareader.
1344 0 : ACE_DEBUG((LM_DEBUG,
1345 : ACE_TEXT("(%P|%t) DataReaderImpl::writer_activity: ")
1346 : ACE_TEXT("reader %C is not associated with writer %C.\n"),
1347 : LogGuid(get_guid()).c_str(),
1348 : LogGuid(header.publication_id_).c_str()));
1349 : }
1350 0 : }
1351 :
1352 0 : if (!writer.is_nil()) {
1353 0 : writer->received_activity(MonotonicTimePoint::now());
1354 :
1355 0 : if ((header.message_id_ == SAMPLE_DATA) ||
1356 0 : (header.message_id_ == INSTANCE_REGISTRATION) ||
1357 0 : (header.message_id_ == UNREGISTER_INSTANCE) ||
1358 0 : (header.message_id_ == DISPOSE_INSTANCE) ||
1359 0 : (header.message_id_ == DISPOSE_UNREGISTER_INSTANCE)) {
1360 :
1361 : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
1362 0 : if (header.coherent_change_) {
1363 0 : writer->add_coherent_samples(header.sequence_);
1364 : }
1365 : #endif
1366 : }
1367 : }
1368 0 : }
1369 :
1370 : void
1371 0 : DataReaderImpl::data_received(const ReceivedDataSample& sample)
1372 : {
1373 : DBG_ENTRY_LVL("DataReaderImpl","data_received",6);
1374 :
1375 0 : DDS::InstanceHandle_t publication_handle = DDS::HANDLE_NIL;
1376 : {
1377 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, publication_handle_lock_);
1378 0 : RepoIdToHandleMap::const_iterator pos = publication_id_to_handle_map_.find(sample.header_.publication_id_);
1379 0 : if (pos != publication_id_to_handle_map_.end()) {
1380 0 : publication_handle = pos->second;
1381 : }
1382 0 : }
1383 :
1384 : // ensure some other thread is not changing the sample container
1385 : // or statuses related to samples.
1386 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_);
1387 :
1388 0 : if (get_deleted()) return;
1389 :
1390 0 : if (DCPS_debug_level > 9) {
1391 0 : ACE_DEBUG((LM_DEBUG,
1392 : ACE_TEXT("(%P|%t) DataReaderImpl::data_received: ")
1393 : ACE_TEXT("%C received sample: %C.\n"),
1394 : LogGuid(get_guid()).c_str(),
1395 : to_string(sample.header_).c_str()));
1396 : }
1397 :
1398 0 : const ValueDispatcher* vd = get_value_dispatcher();
1399 0 : const Observer_rch observer = get_observer(Observer::e_SAMPLE_RECEIVED);
1400 :
1401 0 : RcHandle<MessageHolder> real_data;
1402 0 : SubscriptionInstance_rch instance;
1403 0 : switch (sample.header_.message_id_) {
1404 0 : case SAMPLE_DATA:
1405 : case INSTANCE_REGISTRATION: {
1406 0 : if (!check_historic(sample)) break;
1407 :
1408 0 : DataSampleHeader const & header = sample.header_;
1409 :
1410 0 : this->writer_activity(header);
1411 :
1412 : // Verify data has not exceeded its lifespan.
1413 0 : if (this->filter_sample(header)) break;
1414 :
1415 : // This adds the reader to the set/list of readers with data.
1416 0 : RcHandle<SubscriberImpl> subscriber = get_subscriber_servant();
1417 0 : if (subscriber) {
1418 0 : subscriber->data_received(this);
1419 : }
1420 :
1421 : // Only gather statistics about real samples, not registration data, etc.
1422 0 : if (header.message_id_ == SAMPLE_DATA) {
1423 0 : this->process_latency(sample);
1424 : }
1425 :
1426 : // This also adds to the sample container and makes any callbacks
1427 : // and condition modifications.
1428 :
1429 0 : bool is_new_instance = false;
1430 0 : bool filtered = false;
1431 0 : if (sample.header_.key_fields_only_) {
1432 0 : dds_demarshal(sample, publication_handle, instance, is_new_instance, filtered, KEY_ONLY_MARSHALING, false);
1433 : } else {
1434 0 : real_data = dds_demarshal(sample, publication_handle, instance, is_new_instance, filtered, FULL_MARSHALING, observer && vd);
1435 : }
1436 :
1437 : // Per sample logging
1438 0 : if (DCPS_debug_level >= 8) {
1439 0 : ACE_DEBUG((LM_DEBUG,
1440 : ACE_TEXT("(%P|%t) DataReaderImpl::data_received: reader %C writer %C ")
1441 : ACE_TEXT("instance %d is_new_instance %d filtered %d\n"),
1442 : LogGuid(get_guid()).c_str(),
1443 : LogGuid(header.publication_id_).c_str(),
1444 : instance ? instance->instance_handle_ : 0,
1445 : is_new_instance, filtered));
1446 : }
1447 :
1448 0 : if (filtered) break; // sample filtered from instance
1449 :
1450 0 : if (instance) accept_sample_processing(instance, header, is_new_instance);
1451 0 : }
1452 0 : break;
1453 :
1454 : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
1455 0 : case END_COHERENT_CHANGES: {
1456 0 : CoherentChangeControl control;
1457 :
1458 0 : this->writer_activity(sample.header_);
1459 :
1460 0 : Message_Block_Ptr payload(sample.data(&mb_alloc_));
1461 : Serializer serializer(
1462 : payload.get(), Encoding::KIND_UNALIGNED_CDR,
1463 0 : sample.header_.byte_order_ ? ENDIAN_LITTLE : ENDIAN_BIG);
1464 0 : if (!(serializer >> control)) {
1465 0 : ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) DataReaderImpl::data_received ")
1466 : ACE_TEXT("deserialization coherent change control failed.\n")));
1467 0 : return;
1468 : }
1469 :
1470 0 : if (DCPS_debug_level > 0) {
1471 0 : std::stringstream buffer;
1472 0 : buffer << control << std::endl;
1473 :
1474 0 : ACE_DEBUG((LM_DEBUG,
1475 : ACE_TEXT("(%P|%t) DataReaderImpl::data_received: ")
1476 : ACE_TEXT("END_COHERENT_CHANGES %C\n"),
1477 : buffer.str().c_str()));
1478 0 : }
1479 :
1480 0 : WriterInfo_rch writer;
1481 : {
1482 0 : ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
1483 :
1484 : WriterMapType::iterator it =
1485 0 : this->writers_.find(sample.header_.publication_id_);
1486 :
1487 0 : if (it == this->writers_.end()) {
1488 0 : ACE_DEBUG((LM_WARNING,
1489 : ACE_TEXT("(%P|%t) WARNING: DataReaderImpl::data_received() - ")
1490 : ACE_TEXT(" subscription %C failed to find ")
1491 : ACE_TEXT(" publication data for %C!\n"),
1492 : LogGuid(get_guid()).c_str(),
1493 : LogGuid(sample.header_.publication_id_).c_str()));
1494 0 : return;
1495 : }
1496 : else {
1497 0 : writer = it->second;
1498 : }
1499 0 : it->second->set_group_info(control);
1500 0 : }
1501 :
1502 0 : if (this->verify_coherent_changes_completion(writer.in())) {
1503 0 : this->notify_read_conditions();
1504 : }
1505 0 : }
1506 0 : break;
1507 : #endif // OPENDDS_NO_OBJECT_MODEL_PROFILE
1508 :
1509 0 : case DATAWRITER_LIVELINESS: {
1510 0 : if (DCPS_debug_level >= 4) {
1511 0 : ACE_DEBUG((LM_DEBUG,
1512 : ACE_TEXT("(%P|%t) DataReaderImpl::data_received: ")
1513 : ACE_TEXT("reader %C got datawriter liveliness from writer %C\n"),
1514 : LogGuid(get_guid()).c_str(),
1515 : LogGuid(sample.header_.publication_id_).c_str()));
1516 : }
1517 0 : this->writer_activity(sample.header_);
1518 :
1519 : // tell all instances they got a liveliness message
1520 : {
1521 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, instances_lock_);
1522 0 : for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
1523 0 : iter != instances_.end();
1524 0 : ++iter) {
1525 0 : if (iter->second->instance_state_->writes_instance(sample.header_.publication_id_)) {
1526 0 : iter->second->instance_state_->lively(sample.header_.publication_id_);
1527 : }
1528 : }
1529 0 : }
1530 :
1531 : }
1532 0 : break;
1533 :
1534 0 : case DISPOSE_INSTANCE: {
1535 0 : if (!check_historic(sample)) break;
1536 0 : this->writer_activity(sample.header_);
1537 0 : SubscriptionInstance_rch instance;
1538 :
1539 0 : if (deadline_queue_enabled_) {
1540 : // Find the instance first for timer cancellation since
1541 : // the instance may be deleted during dispose and can
1542 : // not be accessed.
1543 0 : ReceivedDataSample dup(sample);
1544 0 : this->lookup_instance(dup, instance);
1545 : #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
1546 0 : OwnershipManagerPtr owner_manager = this->ownership_manager();
1547 :
1548 0 : if (! this->is_exclusive_ownership_
1549 0 : || (owner_manager
1550 0 : && (instance)
1551 0 : && (owner_manager->is_owner(instance->instance_handle_,
1552 0 : sample.header_.publication_id_)))) {
1553 : #endif
1554 0 : cancel_deadline(instance);
1555 : #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
1556 : }
1557 : #endif
1558 0 : }
1559 0 : instance.reset();
1560 0 : this->dispose_unregister(sample, publication_handle, instance);
1561 0 : }
1562 0 : this->notify_read_conditions();
1563 0 : break;
1564 :
1565 0 : case UNREGISTER_INSTANCE: {
1566 0 : if (!check_historic(sample)) break;
1567 0 : this->writer_activity(sample.header_);
1568 0 : SubscriptionInstance_rch instance;
1569 :
1570 0 : if (deadline_queue_enabled_) {
1571 : // Find the instance first for timer cancellation since
1572 : // the instance may be deleted during dispose and can
1573 : // not be accessed.
1574 0 : ReceivedDataSample dup(sample);
1575 0 : this->lookup_instance(dup, instance);
1576 0 : if (instance) {
1577 : #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
1578 0 : if (! this->is_exclusive_ownership_
1579 0 : || (this->is_exclusive_ownership_
1580 0 : && instance->instance_state_->is_last(sample.header_.publication_id_))) {
1581 : #endif
1582 0 : cancel_deadline(instance);
1583 : #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
1584 : }
1585 : #endif
1586 : }
1587 0 : }
1588 0 : instance.reset();
1589 0 : this->dispose_unregister(sample, publication_handle, instance);
1590 0 : }
1591 0 : this->notify_read_conditions();
1592 0 : break;
1593 :
1594 0 : case DISPOSE_UNREGISTER_INSTANCE: {
1595 0 : if (!check_historic(sample)) break;
1596 0 : this->writer_activity(sample.header_);
1597 0 : SubscriptionInstance_rch instance;
1598 :
1599 0 : if (deadline_queue_enabled_) {
1600 : // Find the instance first for timer cancellation since
1601 : // the instance may be deleted during dispose and can
1602 : // not be accessed.
1603 0 : ReceivedDataSample dup(sample);
1604 0 : this->lookup_instance(dup, instance);
1605 : #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
1606 0 : OwnershipManagerPtr owner_manager = this->ownership_manager();
1607 0 : if (! this->is_exclusive_ownership_
1608 0 : || (owner_manager
1609 0 : && (instance)
1610 0 : && (owner_manager->is_owner (instance->instance_handle_,
1611 0 : sample.header_.publication_id_)))
1612 0 : || (is_exclusive_ownership_
1613 0 : && (instance)
1614 0 : && instance->instance_state_->is_last(sample.header_.publication_id_))) {
1615 : #endif
1616 0 : if (instance) {
1617 0 : cancel_deadline(instance);
1618 : }
1619 : #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
1620 : }
1621 : #endif
1622 0 : }
1623 0 : instance.reset();
1624 0 : this->dispose_unregister(sample, publication_handle, instance);
1625 0 : }
1626 0 : this->notify_read_conditions();
1627 0 : break;
1628 :
1629 0 : case END_HISTORIC_SAMPLES: {
1630 0 : if (sample.header_.message_length_ >= sizeof(GUID_t)) {
1631 0 : Message_Block_Ptr payload(sample.data(&mb_alloc_));
1632 0 : Serializer ser(payload.get(), Encoding::KIND_UNALIGNED_CDR);
1633 0 : GUID_t readerId = GUID_UNKNOWN;
1634 0 : if (!(ser >> readerId)) {
1635 0 : ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) DataReaderImpl::data_received ")
1636 : ACE_TEXT("deserialization reader failed.\n")));
1637 0 : return;
1638 : }
1639 0 : const GUID_t repo_id(get_guid());
1640 0 : if (readerId != GUID_UNKNOWN && readerId != repo_id) {
1641 0 : break; // not our message
1642 : }
1643 0 : }
1644 0 : if (DCPS_debug_level > 4) {
1645 0 : ACE_DEBUG((LM_INFO, "(%P|%t) Received END_HISTORIC_SAMPLES control message\n"));
1646 : }
1647 : // Going to acquire writers lock, release samples lock
1648 0 : guard.release();
1649 0 : resume_sample_processing(sample.header_.publication_id_);
1650 0 : if (DCPS_debug_level > 4) {
1651 0 : ACE_DEBUG((
1652 : LM_INFO,
1653 : "(%P|%t) Resumed sample processing for durable writer %C\n",
1654 : LogGuid(sample.header_.publication_id_).c_str()));
1655 : }
1656 0 : break;
1657 : }
1658 :
1659 0 : default:
1660 0 : ACE_ERROR((LM_ERROR,
1661 : "(%P|%t) ERROR: DataReaderImpl::data_received"
1662 : "unexpected message_id = %d\n",
1663 : sample.header_.message_id_));
1664 0 : break;
1665 : }
1666 :
1667 0 : if (observer && real_data && vd) {
1668 : const DDS::Time_t timestamp = {
1669 0 : sample.header_.source_timestamp_sec_,
1670 0 : sample.header_.source_timestamp_nanosec_
1671 0 : };
1672 0 : Observer::Sample s(instance ? instance->instance_handle_ : DDS::HANDLE_NIL, sample.header_.instance_state(), timestamp, sample.header_.sequence_, real_data->get(), *vd);
1673 0 : observer->on_sample_received(this, s);
1674 : }
1675 0 : }
1676 :
1677 : RcHandle<EntityImpl>
1678 0 : DataReaderImpl::parent() const
1679 : {
1680 0 : return subscriber_servant_.lock();
1681 : }
1682 :
1683 : bool
1684 0 : DataReaderImpl::check_transport_qos(const TransportInst& ti)
1685 : {
1686 0 : if (this->qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS) {
1687 0 : return ti.is_reliable();
1688 : }
1689 0 : return true;
1690 : }
1691 :
1692 0 : void DataReaderImpl::notify_read_conditions()
1693 : {
1694 : //sample lock is already held
1695 0 : ReadConditionSet local_read_conditions = read_conditions_;
1696 0 : ACE_GUARD(Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
1697 :
1698 0 : for (ReadConditionSet::iterator it = local_read_conditions.begin(),
1699 0 : end = local_read_conditions.end(); it != end; ++it) {
1700 0 : ConditionImpl* ci = dynamic_cast<ConditionImpl*>(it->in());
1701 0 : if (ci) {
1702 0 : ci->signal_all();
1703 : } else {
1704 0 : ACE_ERROR((LM_ERROR,
1705 : ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::notify_read_conditions: ")
1706 : ACE_TEXT("Failed to obtain ConditionImpl - can't notify.\n")));
1707 : }
1708 : }
1709 0 : }
1710 :
1711 : RcHandle<SubscriberImpl>
1712 0 : DataReaderImpl::get_subscriber_servant()
1713 : {
1714 0 : return subscriber_servant_.lock();
1715 : }
1716 :
1717 0 : bool DataReaderImpl::have_sample_states(
1718 : DDS::SampleStateMask sample_states) const
1719 : {
1720 : //!!!caller should have acquired sample_lock_
1721 0 : ACE_Guard<ACE_Recursive_Thread_Mutex> instance_guard(instances_lock_);
1722 0 : return lookup_matching_instances(sample_states, DDS::ANY_VIEW_STATE, DDS::ANY_INSTANCE_STATE).size();
1723 0 : }
1724 :
1725 : bool
1726 0 : DataReaderImpl::have_view_states(DDS::ViewStateMask view_states) const
1727 : {
1728 : //!!!caller should have acquired sample_lock_
1729 0 : ACE_Guard<ACE_Recursive_Thread_Mutex> instance_guard(instances_lock_);
1730 0 : return lookup_matching_instances(DDS::ANY_SAMPLE_STATE, view_states, DDS::ANY_INSTANCE_STATE).size();
1731 0 : }
1732 :
1733 0 : bool DataReaderImpl::have_instance_states(
1734 : DDS::InstanceStateMask instance_states) const
1735 : {
1736 : //!!!caller should have acquired sample_lock_
1737 0 : ACE_Guard<ACE_Recursive_Thread_Mutex> instance_guard(instances_lock_);
1738 0 : return lookup_matching_instances(DDS::ANY_SAMPLE_STATE, DDS::ANY_VIEW_STATE, instance_states).size();
1739 0 : }
1740 :
1741 : /// Fold-in the three separate loops of have_sample_states(),
1742 : /// have_view_states(), and have_instance_states(). Takes the sample_lock_.
1743 0 : bool DataReaderImpl::contains_sample(DDS::SampleStateMask sample_states,
1744 : DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
1745 : {
1746 0 : ACE_Guard<ACE_Recursive_Thread_Mutex> sample_guard(sample_lock_);
1747 0 : ACE_Guard<ACE_Recursive_Thread_Mutex> instance_guard(instances_lock_);
1748 :
1749 0 : return lookup_matching_instances(sample_states, view_states, instance_states).size();
1750 0 : }
1751 :
1752 : DDS::DataReaderListener_ptr
1753 0 : DataReaderImpl::listener_for(DDS::StatusKind kind)
1754 : {
1755 : // per 2.1.4.3.1 Listener Access to Plain Communication Status
1756 : // use this entities factory if listener is mask not enabled
1757 : // for this kind.
1758 0 : RcHandle<SubscriberImpl> subscriber = get_subscriber_servant();
1759 0 : ACE_Guard<ACE_Thread_Mutex> g(listener_mutex_);
1760 0 : if (subscriber && (CORBA::is_nil(listener_.in()) || (listener_mask_ & kind) == 0)) {
1761 0 : g.release();
1762 0 : return subscriber->listener_for(kind);
1763 :
1764 : } else {
1765 0 : return DDS::DataReaderListener::_duplicate(listener_.in());
1766 : }
1767 0 : }
1768 :
1769 0 : void DataReaderImpl::sample_info(DDS::SampleInfo & sample_info,
1770 : const ReceivedDataElement *ptr)
1771 : {
1772 :
1773 0 : sample_info.sample_rank = 0;
1774 :
1775 : // generation_rank =
1776 : // (MRSIC.disposed_generation_count +
1777 : // MRSIC.no_writers_generation_count)
1778 : // - (S.disposed_generation_count +
1779 : // S.no_writers_generation_count)
1780 : //
1781 0 : sample_info.generation_rank =
1782 0 : (sample_info.disposed_generation_count +
1783 0 : sample_info.no_writers_generation_count) -
1784 0 : sample_info.generation_rank;
1785 :
1786 : // absolute_generation_rank =
1787 : // (MRS.disposed_generation_count +
1788 : // MRS.no_writers_generation_count)
1789 : // - (S.disposed_generation_count +
1790 : // S.no_writers_generation_count)
1791 : //
1792 0 : sample_info.absolute_generation_rank =
1793 0 : (static_cast<CORBA::Long>(ptr->disposed_generation_count_) +
1794 0 : static_cast<CORBA::Long>(ptr->no_writers_generation_count_)) -
1795 0 : sample_info.absolute_generation_rank;
1796 :
1797 0 : sample_info.opendds_reserved_publication_seq = ptr->sequence_.getValue();
1798 0 : }
1799 :
1800 0 : CORBA::Long DataReaderImpl::total_samples() const
1801 : {
1802 : //!!!caller should have acquired sample_lock_
1803 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_,0);
1804 :
1805 0 : CORBA::Long count(0);
1806 :
1807 0 : for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
1808 0 : iter != instances_.end();
1809 0 : ++iter) {
1810 0 : SubscriptionInstance_rch ptr = iter->second;
1811 :
1812 0 : count += static_cast<CORBA::Long>(ptr->rcvd_samples_.size());
1813 0 : }
1814 :
1815 0 : return count;
1816 0 : }
1817 :
1818 : void
1819 0 : DataReaderImpl::LivelinessTimer::check_liveliness()
1820 : {
1821 0 : execute_or_enqueue(make_rch<CheckLivelinessCommand>(this));
1822 0 : }
1823 :
1824 : int
1825 0 : DataReaderImpl::LivelinessTimer::handle_timeout(const ACE_Time_Value& tv,
1826 : const void * /*arg*/)
1827 : {
1828 0 : ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager());
1829 :
1830 0 : check_liveliness_i(false, MonotonicTimePoint(tv));
1831 0 : return 0;
1832 0 : }
1833 :
1834 : void
1835 0 : DataReaderImpl::LivelinessTimer::check_liveliness_i(bool cancel,
1836 : const MonotonicTimePoint& now)
1837 : {
1838 : // Working copy of the active timer Id.
1839 :
1840 0 : RcHandle<DataReaderImpl> data_reader = data_reader_.lock();
1841 0 : if (! data_reader) {
1842 0 : this->reactor()->purge_pending_notifications(this);
1843 0 : return;
1844 : }
1845 :
1846 0 : long local_timer_id = liveliness_timer_id_;
1847 0 : bool timer_was_reset = false;
1848 :
1849 0 : if (local_timer_id != -1 && cancel) {
1850 0 : if (DCPS_debug_level >= 5) {
1851 0 : ACE_DEBUG((LM_DEBUG,
1852 : ACE_TEXT("(%P|%t) DataReaderImpl::LivelinessTimer::check_liveliness_i: ")
1853 : ACE_TEXT(" canceling timer for reader %C.\n"),
1854 : LogGuid(data_reader->get_guid()).c_str()));
1855 : }
1856 :
1857 : // called from add_associations and there is already a timer
1858 : // so cancel the existing timer.
1859 0 : if (this->reactor()->cancel_timer(local_timer_id) == -1) {
1860 : // this could fail because the reactor's call and
1861 : // the add_associations' call to this could overlap
1862 : // so it is not a failure.
1863 0 : ACE_DEBUG((LM_DEBUG,
1864 : ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::LivelinessTimer::check_liveliness_i: ")
1865 : ACE_TEXT(" %p.\n"), ACE_TEXT("cancel_timer")));
1866 : }
1867 :
1868 0 : timer_was_reset = true;
1869 : }
1870 :
1871 : // Used after the lock scope ends.
1872 0 : MonotonicTimePoint smallest(MonotonicTimePoint::max_value);
1873 0 : int alive_writers = 0;
1874 :
1875 : // This is a bit convoluted. The reasoning goes as follows:
1876 : // 1) We grab the current timer Id value when we enter the method.
1877 : // 2) We *might* cancel the timer if it is active.
1878 : // 3) The timer *might* be rescheduled while we do not hold the sample lock.
1879 : // 4) If we (or another thread) canceled the timer that we can tell, then
1880 : // 5) we should clear the Id value,
1881 : // 6) unless it has been rescheduled.
1882 : // We are using a changed timer Id value as a proxy for having been
1883 : // rescheduled.
1884 0 : if( timer_was_reset && (liveliness_timer_id_ == local_timer_id)) {
1885 0 : liveliness_timer_id_ = -1;
1886 : }
1887 :
1888 : // Iterate over each writer to this reader
1889 : {
1890 0 : ACE_READ_GUARD(ACE_RW_Thread_Mutex,
1891 : read_guard,
1892 : data_reader->writers_lock_);
1893 0 : WriterMapType writers = data_reader->writers_;
1894 0 : read_guard.release();
1895 :
1896 0 : for (WriterMapType::iterator iter = writers.begin();
1897 0 : iter != writers.end();
1898 0 : ++iter) {
1899 : // deal with possibly not being alive or
1900 : // tell when it will not be alive next (if no activity)
1901 0 : const MonotonicTimePoint next_absolute(iter->second->check_activity(now));
1902 0 : if (!next_absolute.is_max()) {
1903 0 : alive_writers++;
1904 0 : smallest = std::min(smallest, next_absolute);
1905 : }
1906 0 : }
1907 0 : }
1908 :
1909 0 : if (!alive_writers) {
1910 : // no live writers so no need to schedule a timer
1911 : // but be sure we don't try to cancel the timer later.
1912 0 : liveliness_timer_id_ = -1;
1913 : }
1914 :
1915 0 : if (DCPS_debug_level >= 5) {
1916 0 : ACE_DEBUG((LM_DEBUG,
1917 : ACE_TEXT("(%P|%t) DataReaderImpl::LivelinessTimer::check_liveliness_i: ")
1918 : ACE_TEXT("reader %C has %d live writers; from_reactor=%d\n"),
1919 : LogGuid(data_reader->get_guid()).c_str(),
1920 : alive_writers,
1921 : !cancel));
1922 : }
1923 :
1924 : // Call into the reactor after releasing the sample lock.
1925 0 : if (alive_writers) {
1926 : // compare the time now with the earliest(smallest) deadline we found
1927 0 : TimeDuration relative;
1928 0 : if (now < smallest) {
1929 0 : relative = smallest - now;
1930 : } else {
1931 0 : relative = TimeDuration(0, 1); // ASAP
1932 : }
1933 0 : liveliness_timer_id_ = this->reactor()->schedule_timer(this, 0, relative.value());
1934 :
1935 0 : if (liveliness_timer_id_ == -1) {
1936 0 : ACE_ERROR((LM_ERROR,
1937 : ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::LivelinessTimer::check_liveliness_i: ")
1938 : ACE_TEXT(" %p.\n"), ACE_TEXT("schedule_timer")));
1939 : }
1940 0 : }
1941 0 : }
1942 :
1943 : void
1944 0 : DataReaderImpl::release_instance(DDS::InstanceHandle_t handle)
1945 : {
1946 : #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
1947 0 : OwnershipManagerPtr owner_manager = this->ownership_manager();
1948 0 : if (owner_manager) {
1949 0 : owner_manager->remove_writers(handle);
1950 : }
1951 : #endif
1952 :
1953 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_);
1954 0 : SubscriptionInstance_rch instance = this->get_handle_instance(handle);
1955 :
1956 0 : if (!instance) {
1957 0 : ACE_ERROR((LM_ERROR, "(%P|%t) DataReaderImpl::release_instance "
1958 : "could not find the instance by handle 0x%x\n", handle));
1959 0 : return;
1960 : }
1961 :
1962 0 : this->purge_data(instance);
1963 :
1964 : {
1965 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, instances_lock_);
1966 0 : instances_.erase(handle);
1967 0 : }
1968 :
1969 0 : this->release_instance_i(handle);
1970 0 : if (this->monitor_) {
1971 0 : this->monitor_->report();
1972 : }
1973 0 : }
1974 :
1975 : void
1976 0 : DataReaderImpl::state_updated(DDS::InstanceHandle_t handle)
1977 : {
1978 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
1979 0 : state_updated_i(handle);
1980 0 : }
1981 :
1982 0 : OpenDDS::DCPS::WriterStats::WriterStats(
1983 : int amount,
1984 0 : DataCollector<double>::OnFull type) : stats_(amount, type)
1985 : {
1986 0 : }
1987 :
1988 0 : void OpenDDS::DCPS::WriterStats::add_stat(const TimeDuration& delay)
1989 : {
1990 0 : double datum = static_cast<double>(delay.value().sec());
1991 0 : datum += delay.value().usec() / 1000000.0;
1992 0 : this->stats_.add(datum);
1993 0 : }
1994 :
1995 0 : OpenDDS::DCPS::LatencyStatistics OpenDDS::DCPS::WriterStats::get_stats() const
1996 : {
1997 : LatencyStatistics value;
1998 :
1999 0 : value.publication = GUID_UNKNOWN;
2000 0 : value.n = this->stats_.n();
2001 0 : value.maximum = this->stats_.maximum();
2002 0 : value.minimum = this->stats_.minimum();
2003 0 : value.mean = this->stats_.mean();
2004 0 : value.variance = this->stats_.var();
2005 :
2006 0 : return value;
2007 : }
2008 :
2009 0 : void OpenDDS::DCPS::WriterStats::reset_stats()
2010 : {
2011 0 : this->stats_.reset();
2012 0 : }
2013 :
2014 : #ifndef OPENDDS_SAFETY_PROFILE
2015 0 : std::ostream& OpenDDS::DCPS::WriterStats::raw_data(std::ostream& str) const
2016 : {
2017 0 : str << std::dec << this->stats_.size()
2018 0 : << " samples out of " << this->stats_.n() << std::endl;
2019 0 : return str << this->stats_;
2020 : }
2021 : #endif //OPENDDS_SAFETY_PROFILE
2022 :
2023 : void
2024 0 : DataReaderImpl::writer_removed(WriterInfo& info)
2025 : {
2026 0 : const GUID_t info_writer_id = info.writer_id();
2027 :
2028 0 : if (DCPS_debug_level >= 5) {
2029 0 : ACE_DEBUG((LM_DEBUG,
2030 : ACE_TEXT("(%P|%t) DataReaderImpl::writer_removed: ")
2031 : ACE_TEXT("reader %C from writer %C.\n"),
2032 : LogGuid(get_guid()).c_str(),
2033 : LogGuid(info_writer_id).c_str()));
2034 : }
2035 :
2036 : #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
2037 0 : OwnershipManagerPtr owner_manager = this->ownership_manager();
2038 0 : if (owner_manager) {
2039 0 : owner_manager->remove_writer(info_writer_id);
2040 0 : info.clear_owner_evaluated();
2041 : }
2042 : #endif
2043 :
2044 : {
2045 0 : DDS::InstanceHandle_t publication_handle = DDS::HANDLE_NIL;
2046 : {
2047 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, publication_handle_lock_);
2048 0 : RepoIdToHandleMap::const_iterator pos = publication_id_to_handle_map_.find(info_writer_id);
2049 0 : if (pos != publication_id_to_handle_map_.end()) {
2050 0 : publication_handle = pos->second;
2051 : }
2052 0 : }
2053 :
2054 0 : bool liveliness_changed = false;
2055 :
2056 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
2057 :
2058 0 : const WriterInfo::WriterState info_state = info.state();
2059 :
2060 0 : if (info_state == WriterInfo::ALIVE) {
2061 0 : --liveliness_changed_status_.alive_count;
2062 0 : --liveliness_changed_status_.alive_count_change;
2063 0 : liveliness_changed = true;
2064 : }
2065 :
2066 0 : if (info_state == WriterInfo::DEAD) {
2067 0 : --liveliness_changed_status_.not_alive_count;
2068 0 : --liveliness_changed_status_.not_alive_count_change;
2069 0 : liveliness_changed = true;
2070 : }
2071 :
2072 0 : liveliness_changed_status_.last_publication_handle = info.handle();
2073 0 : instances_liveliness_update(info_writer_id, publication_handle);
2074 :
2075 0 : if (liveliness_changed) {
2076 0 : set_status_changed_flag(DDS::LIVELINESS_CHANGED_STATUS, true);
2077 0 : this->notify_liveliness_change();
2078 : }
2079 0 : }
2080 0 : }
2081 :
2082 : void
2083 0 : DataReaderImpl::writer_became_alive(WriterInfo& info, const MonotonicTimePoint& /* when */)
2084 : {
2085 0 : const GUID_t info_writer_id = info.writer_id();
2086 :
2087 0 : if (DCPS_debug_level >= 5) {
2088 0 : ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DataReaderImpl::writer_became_alive: ")
2089 : ACE_TEXT("reader %C from writer %C previous state %C.\n"),
2090 : LogGuid(get_guid()).c_str(),
2091 : LogGuid(info_writer_id).c_str(),
2092 : info.get_state_str()));
2093 : }
2094 :
2095 : // NOTE: each instance will change to ALIVE_STATE when they receive a sample
2096 :
2097 0 : const WriterInfo::WriterState info_state = info.state();
2098 :
2099 : {
2100 0 : bool liveliness_changed = false;
2101 :
2102 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
2103 :
2104 0 : if (info_state != WriterInfo::ALIVE) {
2105 0 : liveliness_changed_status_.alive_count++;
2106 0 : liveliness_changed_status_.alive_count_change++;
2107 0 : liveliness_changed = true;
2108 : }
2109 :
2110 0 : if (info_state == WriterInfo::DEAD) {
2111 0 : liveliness_changed_status_.not_alive_count--;
2112 0 : liveliness_changed_status_.not_alive_count_change--;
2113 : }
2114 :
2115 0 : if (liveliness_changed_status_.alive_count < 0) {
2116 0 : ACE_ERROR((LM_ERROR,
2117 : ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::writer_became_alive: ")
2118 : ACE_TEXT("invalid liveliness_changed_status alive count - %d.\n"),
2119 : liveliness_changed_status_.alive_count));
2120 0 : return;
2121 : }
2122 :
2123 0 : if (liveliness_changed_status_.not_alive_count < 0) {
2124 0 : ACE_ERROR((LM_ERROR,
2125 : ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::writer_became_alive: ")
2126 : ACE_TEXT("invalid liveliness_changed_status not alive count - %d.\n"),
2127 : liveliness_changed_status_.not_alive_count));
2128 0 : return;
2129 : }
2130 :
2131 0 : liveliness_changed_status_.last_publication_handle = info.handle();
2132 :
2133 : // Change the state to ALIVE since handle_timeout may call writer_became_dead
2134 : // which need the current state info.
2135 0 : info.state(WriterInfo::ALIVE);
2136 :
2137 0 : if (this->monitor_) {
2138 0 : this->monitor_->report();
2139 : }
2140 :
2141 : // Call listener only when there are liveliness status changes.
2142 0 : if (liveliness_changed) {
2143 0 : set_status_changed_flag(DDS::LIVELINESS_CHANGED_STATUS, true);
2144 0 : this->notify_liveliness_change();
2145 : }
2146 0 : }
2147 :
2148 : // this call will start the liveliness timer if it is not already set
2149 0 : liveliness_timer_->check_liveliness();
2150 : }
2151 :
2152 : void
2153 0 : DataReaderImpl::writer_became_dead(WriterInfo& info)
2154 : {
2155 0 : const GUID_t info_writer_id = info.writer_id();
2156 :
2157 0 : if (DCPS_debug_level >= 5) {
2158 0 : ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DataReaderImpl::writer_became_dead: ")
2159 : ACE_TEXT("reader %C from writer %C previous state %C.\n"),
2160 : LogGuid(get_guid()).c_str(),
2161 : LogGuid(info_writer_id).c_str(),
2162 : info.get_state_str()));
2163 : }
2164 :
2165 : #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
2166 0 : OwnershipManagerPtr owner_manager = this->ownership_manager();
2167 0 : if (owner_manager) {
2168 0 : owner_manager->remove_writer(info_writer_id);
2169 0 : info.clear_owner_evaluated();
2170 : }
2171 : #endif
2172 :
2173 0 : bool liveliness_changed = false;
2174 :
2175 0 : const WriterInfo::WriterState info_state = info.state();
2176 :
2177 0 : DDS::InstanceHandle_t publication_handle = DDS::HANDLE_NIL;
2178 : {
2179 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, publication_handle_lock_);
2180 0 : RepoIdToHandleMap::const_iterator pos = publication_id_to_handle_map_.find(info_writer_id);
2181 0 : if (pos != publication_id_to_handle_map_.end()) {
2182 0 : publication_handle = pos->second;
2183 : }
2184 0 : }
2185 :
2186 : {
2187 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
2188 :
2189 0 : if (info_state != WriterInfo::DEAD) {
2190 0 : ++liveliness_changed_status_.not_alive_count;
2191 0 : ++liveliness_changed_status_.not_alive_count_change;
2192 0 : liveliness_changed = true;
2193 : }
2194 :
2195 0 : if (info_state == WriterInfo::ALIVE) {
2196 0 : --liveliness_changed_status_.alive_count;
2197 0 : --liveliness_changed_status_.alive_count_change;
2198 : }
2199 :
2200 0 : if (liveliness_changed_status_.alive_count < 0) {
2201 0 : ACE_ERROR((LM_ERROR,
2202 : ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::writer_became_dead: ")
2203 : ACE_TEXT("invalid liveliness_changed_status alive count - %d.\n"),
2204 : liveliness_changed_status_.alive_count));
2205 0 : return;
2206 : }
2207 :
2208 0 : if (liveliness_changed_status_.not_alive_count < 0) {
2209 0 : ACE_ERROR((LM_ERROR,
2210 : ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::writer_became_dead: ")
2211 : ACE_TEXT("invalid liveliness_changed_status not alive count - %d.\n"),
2212 : liveliness_changed_status_.not_alive_count));
2213 0 : return;
2214 : }
2215 :
2216 0 : liveliness_changed_status_.last_publication_handle = info.handle();
2217 :
2218 0 : info.state(WriterInfo::DEAD);
2219 :
2220 0 : if (this->monitor_) {
2221 0 : this->monitor_->report();
2222 : }
2223 :
2224 0 : instances_liveliness_update(info_writer_id, publication_handle);
2225 :
2226 : // Call listener only when there are liveliness status changes.
2227 0 : if (liveliness_changed) {
2228 0 : set_status_changed_flag(DDS::LIVELINESS_CHANGED_STATUS, true);
2229 0 : this->notify_liveliness_change();
2230 : }
2231 0 : }
2232 0 : }
2233 :
2234 : void
2235 0 : DataReaderImpl::instances_liveliness_update(const GUID_t& writer,
2236 : DDS::InstanceHandle_t publication_handle)
2237 : {
2238 : // sample_lock_ must be held.
2239 0 : InstanceSet localinsts;
2240 : {
2241 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, instances_lock_);
2242 0 : if (instances_.size() == 0) {
2243 0 : return;
2244 : }
2245 0 : for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
2246 0 : iter != instances_.end(); ++iter) {
2247 0 : if (iter->second->instance_state_->writes_instance(writer)) {
2248 0 : localinsts.insert(iter->first);
2249 : }
2250 : }
2251 0 : }
2252 :
2253 0 : for (InstanceSet::iterator iter = localinsts.begin(); iter != localinsts.end(); ++iter) {
2254 0 : set_instance_state_i(*iter, publication_handle, DDS::NOT_ALIVE_NO_WRITERS_INSTANCE_STATE, SystemTimePoint::now(), writer);
2255 : }
2256 0 : }
2257 :
2258 :
2259 : void
2260 0 : DataReaderImpl::set_sample_lost_status(
2261 : const DDS::SampleLostStatus& status)
2262 : {
2263 : //!!!caller should have acquired sample_lock_
2264 0 : sample_lost_status_ = status;
2265 0 : }
2266 :
2267 : void
2268 0 : DataReaderImpl::set_sample_rejected_status(
2269 : const DDS::SampleRejectedStatus& status)
2270 : {
2271 : //!!!caller should have acquired sample_lock_
2272 0 : sample_rejected_status_ = status;
2273 0 : }
2274 :
2275 0 : void DataReaderImpl::dispose_unregister(const ReceivedDataSample&,
2276 : DDS::InstanceHandle_t,
2277 : SubscriptionInstance_rch&)
2278 : {
2279 0 : if (DCPS_debug_level > 0) {
2280 0 : ACE_DEBUG((LM_DEBUG, "(%P|%t) DataReaderImpl::dispose_unregister()\n"));
2281 : }
2282 0 : }
2283 :
2284 0 : void DataReaderImpl::process_latency(const ReceivedDataSample& sample)
2285 : {
2286 0 : ACE_Guard<ACE_Recursive_Thread_Mutex> guard(statistics_lock_);
2287 0 : StatsMapType::iterator location = this->statistics_.find(sample.header_.publication_id_);
2288 :
2289 0 : if (location != this->statistics_.end()) {
2290 0 : const DDS::Duration_t zero = { DDS::DURATION_ZERO_SEC, DDS::DURATION_ZERO_NSEC };
2291 :
2292 : // Only when the user has specified a latency budget or statistics
2293 : // are enabled we need to calculate our latency
2294 0 : if ((this->statistics_enabled()) ||
2295 0 : (this->qos_.latency_budget.duration > zero)) {
2296 : const DDS::Time_t timestamp = {
2297 0 : sample.header_.source_timestamp_sec_,
2298 0 : sample.header_.source_timestamp_nanosec_
2299 0 : };
2300 0 : const TimeDuration latency = SystemTimePoint::now() - SystemTimePoint(timestamp);
2301 :
2302 0 : if (this->statistics_enabled()) {
2303 0 : location->second.add_stat(latency);
2304 : }
2305 :
2306 0 : if (DCPS_debug_level > 9) {
2307 0 : ACE_DEBUG((LM_DEBUG,
2308 : ACE_TEXT("(%P|%t) DataReaderImpl::process_latency() - ")
2309 : ACE_TEXT("measured latency of %C for current sample.\n"),
2310 : latency.str().c_str()));
2311 : }
2312 :
2313 0 : if (this->qos_.latency_budget.duration > zero) {
2314 : // Check latency against the budget.
2315 0 : if (latency > TimeDuration(this->qos_.latency_budget.duration)) {
2316 0 : this->notify_latency(sample.header_.publication_id_);
2317 : }
2318 : }
2319 0 : }
2320 0 : } else if (DCPS_debug_level > 0) {
2321 : /// NB: This message is generated contemporaneously with a similar
2322 : /// message from writer_activity(). That message is not marked
2323 : /// as an error, so we follow that lead and leave this as an
2324 : /// informational message, guarded by debug level. This seems
2325 : /// to be due to late samples (samples delivered after an
2326 : /// association has been torn down). We may want to promote this
2327 : /// to a warning if other conditions causing this symptom are
2328 : /// discovered.
2329 0 : ACE_DEBUG((LM_DEBUG,
2330 : ACE_TEXT("(%P|%t) DataReaderImpl::process_latency() - ")
2331 : ACE_TEXT("reader %C is not associated with writer %C (late sample?).\n"),
2332 : LogGuid(get_guid()).c_str(),
2333 : LogGuid(sample.header_.publication_id_).c_str()));
2334 : }
2335 0 : }
2336 :
2337 0 : void DataReaderImpl::notify_latency(GUID_t writer)
2338 : {
2339 : // Narrow to DDS::DCPS::DataReaderListener. If a DDS::DataReaderListener
2340 : // is given to this DataReader then narrow() fails.
2341 0 : DataReaderListener_var listener = get_ext_listener();
2342 :
2343 0 : if (!CORBA::is_nil(listener.in())) {
2344 0 : WriterIdSeq writerIds;
2345 0 : writerIds.length(1);
2346 0 : writerIds[ 0] = writer;
2347 :
2348 0 : DDS::InstanceHandleSeq handles;
2349 0 : this->lookup_instance_handles(writerIds, handles);
2350 :
2351 0 : if (handles.length() >= 1) {
2352 0 : this->budget_exceeded_status_.last_instance_handle = handles[ 0];
2353 :
2354 : } else {
2355 0 : this->budget_exceeded_status_.last_instance_handle = -1;
2356 : }
2357 :
2358 0 : ++this->budget_exceeded_status_.total_count;
2359 0 : ++this->budget_exceeded_status_.total_count_change;
2360 :
2361 0 : listener->on_budget_exceeded(this, this->budget_exceeded_status_);
2362 :
2363 0 : this->budget_exceeded_status_.total_count_change = 0;
2364 0 : }
2365 0 : }
2366 :
2367 : #ifndef OPENDDS_SAFETY_PROFILE
2368 : void
2369 0 : DataReaderImpl::get_latency_stats(
2370 : OpenDDS::DCPS::LatencyStatisticsSeq & stats)
2371 : {
2372 0 : ACE_Guard<ACE_Recursive_Thread_Mutex> guard(statistics_lock_);
2373 0 : stats.length(static_cast<CORBA::ULong>(this->statistics_.size()));
2374 0 : int index = 0;
2375 :
2376 0 : for (StatsMapType::const_iterator current = this->statistics_.begin();
2377 0 : current != this->statistics_.end();
2378 0 : ++current, ++index) {
2379 0 : stats[ index] = current->second.get_stats();
2380 0 : stats[ index].publication = current->first;
2381 : }
2382 0 : }
2383 : #endif
2384 :
2385 : void
2386 0 : DataReaderImpl::reset_latency_stats()
2387 : {
2388 0 : ACE_Guard<ACE_Recursive_Thread_Mutex> guard(statistics_lock_);
2389 0 : for (StatsMapType::iterator current = this->statistics_.begin();
2390 0 : current != this->statistics_.end();
2391 0 : ++current) {
2392 0 : current->second.reset_stats();
2393 : }
2394 0 : }
2395 :
2396 : CORBA::Boolean
2397 0 : DataReaderImpl::statistics_enabled()
2398 : {
2399 0 : return statistics_enabled_;
2400 : }
2401 :
2402 : void
2403 0 : DataReaderImpl::statistics_enabled(
2404 : CORBA::Boolean statistics_enabled)
2405 : {
2406 0 : statistics_enabled_ = statistics_enabled;
2407 0 : }
2408 :
2409 : void
2410 0 : DataReaderImpl::prepare_to_delete()
2411 : {
2412 0 : const Observer_rch observer = get_observer(Observer::e_DELETED);
2413 0 : if (observer) {
2414 0 : observer->on_deleted(this);
2415 : }
2416 :
2417 0 : this->set_deleted(true);
2418 0 : this->stop_associating();
2419 0 : this->send_final_acks();
2420 0 : subscription_id_condition_.notify_all();
2421 0 : }
2422 :
2423 : SubscriptionInstance_rch
2424 0 : DataReaderImpl::get_handle_instance(DDS::InstanceHandle_t handle)
2425 : {
2426 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_, SubscriptionInstance_rch());
2427 :
2428 0 : SubscriptionInstanceMapType::iterator iter = instances_.find(handle);
2429 0 : if (iter == instances_.end()) {
2430 0 : ACE_DEBUG((LM_WARNING,
2431 : ACE_TEXT("(%P|%t) WARNING: ")
2432 : ACE_TEXT("DataReaderImpl::get_handle_instance: ")
2433 : ACE_TEXT("lookup for 0x%x failed\n"),
2434 : handle));
2435 0 : return SubscriptionInstance_rch();
2436 : } // if (0 != instances_.find(handle, instance))
2437 :
2438 0 : return iter->second;
2439 0 : }
2440 :
2441 : DDS::InstanceHandle_t
2442 0 : DataReaderImpl::get_next_handle(const DDS::BuiltinTopicKey_t& key)
2443 : {
2444 0 : RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock();
2445 0 : if (!participant)
2446 0 : return DDS::HANDLE_NIL;
2447 :
2448 0 : if (is_bit()) {
2449 0 : const GUID_t id = bit_key_to_guid(key);
2450 0 : return participant->assign_handle(id);
2451 :
2452 : } else {
2453 0 : return participant->assign_handle();
2454 : }
2455 0 : }
2456 :
2457 0 : void DataReaderImpl::return_handle(DDS::InstanceHandle_t handle)
2458 : {
2459 0 : const RcHandle<DomainParticipantImpl> participant = participant_servant_.lock();
2460 0 : if (participant) {
2461 0 : participant->return_handle(handle);
2462 : }
2463 0 : }
2464 :
2465 : void
2466 0 : DataReaderImpl::notify_subscription_disconnected(const WriterIdSeq& pubids)
2467 : {
2468 : DBG_ENTRY_LVL("DataReaderImpl","notify_subscription_disconnected",6);
2469 :
2470 : // Narrow to DDS::DCPS::DataReaderListener. If a DDS::DataReaderListener
2471 : // is given to this DataReader then narrow() fails.
2472 0 : DataReaderListener_var the_listener = get_ext_listener();
2473 :
2474 0 : if (!CORBA::is_nil(the_listener.in())) {
2475 0 : SubscriptionLostStatus status;
2476 :
2477 : // Since this callback may come after remove_association which removes
2478 : // the writer from id_to_handle map, we can ignore this error.
2479 0 : this->lookup_instance_handles(pubids, status.publication_handles);
2480 0 : the_listener->on_subscription_disconnected(this, status);
2481 0 : }
2482 0 : }
2483 :
2484 : void
2485 0 : DataReaderImpl::notify_subscription_reconnected(const WriterIdSeq& pubids)
2486 : {
2487 : DBG_ENTRY_LVL("DataReaderImpl","notify_subscription_reconnected",6);
2488 :
2489 0 : if (!this->is_bit_) {
2490 : // Narrow to DDS::DCPS::DataReaderListener. If a DDS::DataReaderListener
2491 : // is given to this DataReader then narrow() fails.
2492 0 : DataReaderListener_var the_listener = get_ext_listener();
2493 :
2494 0 : if (!CORBA::is_nil(the_listener.in())) {
2495 0 : SubscriptionLostStatus status;
2496 :
2497 : // If it's reconnected then the reader should be in id_to_handle
2498 0 : this->lookup_instance_handles(pubids, status.publication_handles);
2499 :
2500 0 : the_listener->on_subscription_reconnected(this, status);
2501 0 : }
2502 0 : }
2503 0 : }
2504 :
2505 : void
2506 0 : DataReaderImpl::notify_subscription_lost(const DDS::InstanceHandleSeq& handles)
2507 : {
2508 : DBG_ENTRY_LVL("DataReaderImpl","notify_subscription_lost",6);
2509 :
2510 0 : if (!this->is_bit_) {
2511 : // Narrow to DDS::DCPS::DataReaderListener. If a DDS::DataReaderListener
2512 : // is given to this DataReader then narrow() fails.
2513 0 : DataReaderListener_var the_listener = get_ext_listener();
2514 :
2515 0 : if (!CORBA::is_nil(the_listener.in())) {
2516 0 : SubscriptionLostStatus status;
2517 :
2518 0 : CORBA::ULong len = handles.length();
2519 0 : status.publication_handles.length(len);
2520 :
2521 0 : for (CORBA::ULong i = 0; i < len; ++ i) {
2522 0 : status.publication_handles[i] = handles[i];
2523 : }
2524 :
2525 0 : the_listener->on_subscription_lost(this, status);
2526 0 : }
2527 0 : }
2528 0 : }
2529 :
2530 : void
2531 0 : DataReaderImpl::notify_subscription_lost(const WriterIdSeq& pubids)
2532 : {
2533 : DBG_ENTRY_LVL("DataReaderImpl","notify_subscription_lost",6);
2534 :
2535 : // Narrow to DDS::DCPS::DataReaderListener. If a DDS::DataReaderListener
2536 : // is given to this DataReader then narrow() fails.
2537 0 : DataReaderListener_var the_listener = get_ext_listener();
2538 :
2539 0 : if (!CORBA::is_nil(the_listener.in())) {
2540 0 : SubscriptionLostStatus status;
2541 :
2542 : // Since this callback may come after remove_association which removes
2543 : // the writer from id_to_handle map, we can ignore this error.
2544 0 : this->lookup_instance_handles(pubids, status.publication_handles);
2545 0 : the_listener->on_subscription_lost(this, status);
2546 0 : }
2547 0 : }
2548 :
2549 :
2550 : void
2551 0 : DataReaderImpl::lookup_instance_handles(const WriterIdSeq& ids,
2552 : DDS::InstanceHandleSeq & hdls)
2553 : {
2554 0 : CORBA::ULong const num_wrts = ids.length();
2555 :
2556 0 : if (DCPS_debug_level > 9) {
2557 0 : const char* separator = "";
2558 0 : OPENDDS_STRING guids;
2559 :
2560 0 : for (CORBA::ULong i = 0; i < num_wrts; ++i) {
2561 0 : guids += separator;
2562 0 : guids += LogGuid(ids[i]).conv_;
2563 0 : separator = ", ";
2564 : }
2565 :
2566 0 : ACE_DEBUG((LM_DEBUG,
2567 : ACE_TEXT("(%P|%t) DataReaderImpl::lookup_instance_handles: ")
2568 : ACE_TEXT("searching for handles for writer Ids: %C.\n"),
2569 : guids.c_str()));
2570 0 : }
2571 :
2572 0 : hdls.length(num_wrts);
2573 :
2574 0 : RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock();
2575 0 : if (participant) {
2576 0 : for (CORBA::ULong i = 0; i < num_wrts; ++i) {
2577 0 : hdls[i] = participant->lookup_handle(ids[i]);
2578 : }
2579 : }
2580 0 : }
2581 :
2582 : bool
2583 0 : DataReaderImpl::filter_sample(const DataSampleHeader& header)
2584 : {
2585 0 : const SystemTimePoint now = SystemTimePoint::now();
2586 :
2587 : // Expire historic data if QoS indicates VOLATILE.
2588 0 : if (!always_get_history_ && header.historic_sample_
2589 0 : && qos_.durability.kind == DDS::VOLATILE_DURABILITY_QOS) {
2590 0 : if (DCPS_debug_level >= 8) {
2591 0 : ACE_DEBUG((LM_DEBUG,
2592 : ACE_TEXT("(%P|%t) DataReaderImpl::filter_sample: ")
2593 : ACE_TEXT("Discarded historic data.\n")));
2594 : }
2595 :
2596 0 : return true; // Data filtered.
2597 : }
2598 :
2599 : // The LIFESPAN_DURATION_FLAG is set when sample data is sent
2600 : // with a non-default LIFESPAN duration value.
2601 0 : if (header.lifespan_duration_) {
2602 : // Finite lifespan. Check if data has expired.
2603 :
2604 : const DDS::Time_t expiration_dds_time = {
2605 0 : header.source_timestamp_sec_ + header.lifespan_duration_sec_,
2606 0 : header.source_timestamp_nanosec_ + header.lifespan_duration_nanosec_
2607 0 : };
2608 0 : const SystemTimePoint expiration_time(expiration_dds_time);
2609 :
2610 : // We assume that the publisher host's clock and subcriber host's
2611 : // clock are synchronized (allowed by the spec).
2612 0 : if (now >= expiration_time) {
2613 0 : if (DCPS_debug_level >= 8) {
2614 0 : const TimeDuration diff(now - expiration_time);
2615 0 : ACE_DEBUG((LM_DEBUG,
2616 : ACE_TEXT("(%P|%t) Received data ")
2617 : ACE_TEXT("expired by %d seconds, %d microseconds.\n"),
2618 : diff.value().sec(),
2619 : diff.value().usec()));
2620 0 : }
2621 :
2622 0 : return true; // Data filtered.
2623 : }
2624 0 : }
2625 :
2626 0 : return false;
2627 0 : }
2628 :
2629 : bool
2630 0 : DataReaderImpl::ownership_filter_instance(const SubscriptionInstance_rch& instance,
2631 : const GUID_t& pubid)
2632 : {
2633 : #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
2634 0 : if (this->is_exclusive_ownership_) {
2635 :
2636 0 : ACE_WRITE_GUARD_RETURN(ACE_RW_Thread_Mutex, write_guard, writers_lock_, true);
2637 0 : WriterMapType::iterator iter = writers_.find(pubid);
2638 :
2639 0 : if (iter == writers_.end()) {
2640 0 : if (DCPS_debug_level > 4) {
2641 : // This may not be an error since it could happen that the sample
2642 : // is delivered to the datareader after the write is dis-associated
2643 : // with this datareader.
2644 0 : ACE_DEBUG((LM_DEBUG,
2645 : ACE_TEXT("(%P|%t) DataReaderImpl::ownership_filter_instance: ")
2646 : ACE_TEXT("reader %C is not associated with writer %C.\n"),
2647 : LogGuid(get_guid()).c_str(),
2648 : LogGuid(pubid).c_str()));
2649 : }
2650 0 : return true;
2651 : }
2652 :
2653 :
2654 : // Evaulate the owner of the instance if not selected and filter
2655 : // current message if it's not from owner writer.
2656 0 : if ( instance->instance_state_->get_owner() == GUID_UNKNOWN
2657 0 : || ! iter->second->is_owner_evaluated(instance->instance_handle_)) {
2658 0 : OwnershipManagerPtr owner_manager = this->ownership_manager();
2659 :
2660 0 : bool is_owner = owner_manager && owner_manager->select_owner (
2661 0 : instance->instance_handle_,
2662 0 : iter->second->writer_id(),
2663 0 : iter->second->writer_qos_ownership_strength(),
2664 0 : instance->instance_state_);
2665 0 : iter->second->set_owner_evaluated(instance->instance_handle_, true);
2666 :
2667 0 : if (! is_owner) {
2668 0 : if (DCPS_debug_level >= 1) {
2669 0 : ACE_DEBUG((LM_DEBUG,
2670 : ACE_TEXT("(%P|%t) DataReaderImpl::ownership_filter_instance: ")
2671 : ACE_TEXT("reader %C writer %C is not elected as owner %C\n"),
2672 : LogGuid(get_guid()).c_str(),
2673 : LogGuid(pubid).c_str(),
2674 : LogGuid(instance->instance_state_->get_owner()).c_str()));
2675 : }
2676 0 : return true;
2677 : }
2678 0 : }
2679 0 : else if (! (instance->instance_state_->get_owner() == pubid)) {
2680 0 : if (DCPS_debug_level >= 1) {
2681 0 : ACE_DEBUG((LM_DEBUG,
2682 : ACE_TEXT("(%P|%t) DataReaderImpl::ownership_filter_instance: ")
2683 : ACE_TEXT("reader %C writer %C is not owner %C\n"),
2684 : LogGuid(get_guid()).c_str(),
2685 : LogGuid(pubid).c_str(),
2686 : LogGuid(instance->instance_state_->get_owner()).c_str()));
2687 : }
2688 0 : return true;
2689 : }
2690 0 : }
2691 : #else
2692 : ACE_UNUSED_ARG(pubid);
2693 : ACE_UNUSED_ARG(instance);
2694 : #endif
2695 0 : return false;
2696 : }
2697 :
2698 : bool
2699 0 : DataReaderImpl::time_based_filter_instance(const SubscriptionInstance_rch& instance,
2700 : MonotonicTimePoint& now,
2701 : MonotonicTimePoint& deadline)
2702 : {
2703 0 : now = MonotonicTimePoint::now();
2704 0 : const TimeDuration minimum_separation(qos_.time_based_filter.minimum_separation);
2705 :
2706 : // TIME_BASED_FILTER processing; expire data samples
2707 : // if minimum separation is not met for instance.
2708 0 : if (!minimum_separation.is_zero()) {
2709 0 : if (now - instance->last_accepted_ < minimum_separation) {
2710 0 : deadline = now + minimum_separation;
2711 0 : return true; // Data filtered.
2712 : }
2713 : }
2714 :
2715 0 : instance->last_accepted_ = now;
2716 :
2717 0 : return false;
2718 0 : }
2719 :
2720 0 : bool DataReaderImpl::is_bit() const
2721 : {
2722 0 : return this->is_bit_;
2723 : }
2724 :
2725 : bool
2726 0 : DataReaderImpl::has_zero_copies()
2727 : {
2728 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
2729 : guard,
2730 : this->sample_lock_,
2731 : true /* assume we have loans */);
2732 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_, true);
2733 :
2734 0 : for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
2735 0 : iter != instances_.end();
2736 0 : ++iter) {
2737 0 : SubscriptionInstance_rch ptr = iter->second;
2738 :
2739 0 : if (ptr->rcvd_samples_.has_zero_copies()) {
2740 0 : return true;
2741 : }
2742 0 : }
2743 :
2744 0 : return false;
2745 0 : }
2746 :
2747 0 : void DataReaderImpl::notify_liveliness_change()
2748 : {
2749 : // sample_lock_ must be held.
2750 : // N.B. writers_lock_ should already be acquired when
2751 : // this method is called.
2752 :
2753 : DDS::DataReaderListener_var listener
2754 0 : = listener_for(DDS::LIVELINESS_CHANGED_STATUS);
2755 :
2756 0 : if (!CORBA::is_nil(listener.in())) {
2757 0 : const DDS::LivelinessChangedStatus status = liveliness_changed_status_;
2758 0 : liveliness_changed_status_.alive_count_change = 0;
2759 0 : liveliness_changed_status_.not_alive_count_change = 0;
2760 0 : ACE_GUARD(Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
2761 0 : listener->on_liveliness_changed(this, status);
2762 0 : }
2763 0 : notify_status_condition();
2764 :
2765 0 : if (DCPS_debug_level > 9) {
2766 0 : ACE_Guard<ACE_Thread_Mutex> g(listener_mutex_);
2767 0 : OPENDDS_STRING output_str;
2768 0 : output_str += "subscription ";
2769 0 : output_str += LogGuid(get_guid()).conv_;
2770 0 : output_str += ", listener at: 0x";
2771 0 : output_str += to_dds_string(this->listener_.in());
2772 :
2773 0 : for (WriterMapType::iterator current = this->writers_.begin();
2774 0 : current != this->writers_.end();
2775 0 : ++current) {
2776 0 : const GUID_t id = current->first;
2777 0 : output_str += "\n\tNOTIFY: writer[ ";
2778 0 : output_str += LogGuid(id).conv_;
2779 0 : output_str += "] == ";
2780 0 : output_str += current->second->get_state_str();
2781 : }
2782 :
2783 0 : ACE_DEBUG((LM_DEBUG,
2784 : ACE_TEXT("(%P|%t) DataReaderImpl::notify_liveliness_change: ")
2785 : ACE_TEXT("listener at 0x%x, mask 0x%x.\n")
2786 : ACE_TEXT("\tNOTIFY: %C\n"),
2787 : listener.in(),
2788 : listener_mask_,
2789 : output_str.c_str()));
2790 0 : }
2791 0 : }
2792 :
2793 0 : void DataReaderImpl::post_read_or_take()
2794 : {
2795 0 : set_status_changed_flag(DDS::DATA_AVAILABLE_STATUS, false);
2796 0 : RcHandle<SubscriberImpl> subscriber = get_subscriber_servant();
2797 0 : if (subscriber) {
2798 0 : subscriber->set_status_changed_flag(
2799 : DDS::DATA_ON_READERS_STATUS, false);
2800 : }
2801 0 : }
2802 :
2803 : ACE_Reactor_Timer_Interface*
2804 0 : DataReaderImpl::get_reactor()
2805 : {
2806 0 : return this->reactor_;
2807 : }
2808 :
2809 : OpenDDS::DCPS::GUID_t
2810 0 : DataReaderImpl::get_topic_id()
2811 : {
2812 0 : return topic_id_;
2813 : }
2814 :
2815 : OpenDDS::DCPS::GUID_t
2816 0 : DataReaderImpl::get_dp_id()
2817 : {
2818 0 : return dp_id_;
2819 : }
2820 :
2821 : void
2822 0 : DataReaderImpl::get_instance_handles(InstanceHandleVec& instance_handles)
2823 : {
2824 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
2825 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
2826 :
2827 0 : for (SubscriptionInstanceMapType::iterator iter = instances_.begin(),
2828 0 : end = instances_.end(); iter != end; ++iter) {
2829 0 : instance_handles.push_back(iter->first);
2830 : }
2831 0 : }
2832 :
2833 : void
2834 0 : DataReaderImpl::get_writer_states(WriterStatePairVec& writer_states)
2835 : {
2836 0 : ACE_READ_GUARD(ACE_RW_Thread_Mutex,
2837 : read_guard,
2838 : this->writers_lock_);
2839 0 : for (WriterMapType::iterator iter = writers_.begin();
2840 0 : iter != writers_.end();
2841 0 : ++iter) {
2842 0 : writer_states.push_back(WriterStatePair(iter->first,
2843 0 : iter->second->state()));
2844 : }
2845 0 : }
2846 :
2847 : #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
2848 : void
2849 0 : DataReaderImpl::update_ownership_strength(const GUID_t& pub_id,
2850 : const CORBA::Long& ownership_strength)
2851 : {
2852 0 : ACE_READ_GUARD(ACE_RW_Thread_Mutex,
2853 : read_guard,
2854 : this->writers_lock_);
2855 0 : for (WriterMapType::iterator iter = writers_.begin();
2856 0 : iter != writers_.end();
2857 0 : ++iter) {
2858 0 : if (iter->second->writer_id() == pub_id) {
2859 0 : if (ownership_strength != iter->second->writer_qos_ownership_strength()) {
2860 0 : if (DCPS_debug_level >= 1) {
2861 0 : ACE_DEBUG((LM_DEBUG,
2862 : ACE_TEXT("(%P|%t) DataReaderImpl::update_ownership_strength - ")
2863 : ACE_TEXT("local %C update remote %C strength from %d to %d\n"),
2864 : LogGuid(get_guid()).c_str(),
2865 : LogGuid(pub_id).c_str(),
2866 : iter->second->writer_qos_ownership_strength(), ownership_strength));
2867 : }
2868 0 : iter->second->writer_qos_ownership_strength(ownership_strength);
2869 0 : iter->second->clear_owner_evaluated();
2870 : }
2871 0 : break;
2872 : }
2873 : }
2874 0 : }
2875 : #endif
2876 :
2877 : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
2878 0 : bool DataReaderImpl::verify_coherent_changes_completion(WriterInfo* writer)
2879 : {
2880 0 : Coherent_State state = COMPLETED;
2881 0 : bool accept_here = true;
2882 :
2883 0 : const GUID_t writer_id = writer->writer_id();
2884 0 : const GUID_t publisher_id = writer->publisher_id();
2885 :
2886 0 : if (subqos_.presentation.access_scope != ::DDS::INSTANCE_PRESENTATION_QOS &&
2887 0 : subqos_.presentation.coherent_access) {
2888 : // verify current coherent changes from single writer
2889 0 : state = writer->coherent_change_received();
2890 0 : if (writer->group_coherent()) { // GROUP coherent any state
2891 0 : RcHandle<SubscriberImpl> subscriber = get_subscriber_servant();
2892 0 : if (subscriber && state != NOT_COMPLETED_YET) {
2893 : // verify if all readers received complete coherent changes in a group.
2894 0 : subscriber->coherent_change_received(publisher_id, this, state);
2895 0 : accept_here = false; // coherent_change_received does that itself
2896 : }
2897 0 : } else if (state != NOT_COMPLETED_YET) { // TOPIC coherent with final state
2898 0 : if (state == REJECTED) {
2899 0 : reject_coherent(writer_id, publisher_id);
2900 : }
2901 0 : writer->reset_coherent_info();
2902 : }
2903 : }
2904 :
2905 0 : if (state == COMPLETED && accept_here) {
2906 0 : accept_coherent(writer_id, publisher_id);
2907 0 : coherent_changes_completed(this);
2908 : }
2909 :
2910 0 : return state == COMPLETED;
2911 : }
2912 :
2913 :
2914 0 : void DataReaderImpl::accept_coherent(const GUID_t& writer_id,
2915 : const GUID_t& publisher_id)
2916 : {
2917 0 : if (::OpenDDS::DCPS::DCPS_debug_level > 0) {
2918 0 : ACE_DEBUG((LM_DEBUG,
2919 : ACE_TEXT("(%P|%t) DataReaderImpl::accept_coherent()")
2920 : ACE_TEXT(" reader %C writer %C publisher %C\n"),
2921 : LogGuid(get_guid()).c_str(),
2922 : LogGuid(writer_id).c_str(),
2923 : LogGuid(publisher_id).c_str()));
2924 : }
2925 0 : SubscriptionInstanceSet localsubs;
2926 : {
2927 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
2928 0 : for (SubscriptionInstanceMapType::iterator iter = this->instances_.begin();
2929 0 : iter != this->instances_.end(); ++iter) {
2930 0 : localsubs.insert(iter->second);
2931 : }
2932 0 : }
2933 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
2934 0 : for (SubscriptionInstanceSet::iterator iter = localsubs.begin();
2935 0 : iter != localsubs.end(); iter++) {
2936 0 : (*iter)->rcvd_strategy_->accept_coherent(writer_id, publisher_id);
2937 : }
2938 0 : }
2939 :
2940 :
2941 0 : void DataReaderImpl::reject_coherent(const GUID_t& writer_id,
2942 : const GUID_t& publisher_id)
2943 : {
2944 0 : if (::OpenDDS::DCPS::DCPS_debug_level > 0) {
2945 0 : ACE_DEBUG((LM_DEBUG,
2946 : ACE_TEXT("(%P|%t) DataReaderImpl::reject_coherent()")
2947 : ACE_TEXT(" reader %C writer %C publisher %C\n"),
2948 : LogGuid(get_guid()).c_str(),
2949 : LogGuid(writer_id).c_str(),
2950 : LogGuid(publisher_id).c_str()));
2951 : }
2952 :
2953 0 : SubscriptionInstanceSet localsubs;
2954 : {
2955 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
2956 0 : for (SubscriptionInstanceMapType::iterator iter = this->instances_.begin();
2957 0 : iter != this->instances_.end(); ++iter) {
2958 0 : localsubs.insert(iter->second);
2959 : }
2960 0 : }
2961 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
2962 0 : for (SubscriptionInstanceSet::iterator iter = localsubs.begin();
2963 0 : iter != localsubs.end(); iter++) {
2964 0 : (*iter)->rcvd_strategy_->reject_coherent(writer_id, publisher_id);
2965 : }
2966 0 : this->reset_coherent_info(writer_id, publisher_id);
2967 0 : }
2968 :
2969 :
2970 0 : void DataReaderImpl::reset_coherent_info(const GUID_t& writer_id,
2971 : const GUID_t& publisher_id)
2972 : {
2973 0 : ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
2974 :
2975 0 : WriterMapType::iterator itEnd = this->writers_.end();
2976 0 : for (WriterMapType::iterator it = this->writers_.begin();
2977 0 : it != itEnd; ++it) {
2978 0 : if (it->second->writer_id() == writer_id
2979 0 : && it->second->publisher_id() == publisher_id) {
2980 0 : it->second->reset_coherent_info();
2981 : }
2982 : }
2983 0 : }
2984 :
2985 :
2986 : void
2987 0 : DataReaderImpl::coherent_change_received(const GUID_t& publisher_id, Coherent_State& result)
2988 : {
2989 0 : ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
2990 :
2991 0 : result = COMPLETED;
2992 0 : for (WriterMapType::iterator iter = writers_.begin();
2993 0 : iter != writers_.end();
2994 0 : ++iter) {
2995 :
2996 0 : if (iter->second->publisher_id() == publisher_id) {
2997 0 : const Coherent_State state = iter->second->coherent_change_received();
2998 0 : if (state == NOT_COMPLETED_YET) {
2999 0 : result = NOT_COMPLETED_YET;
3000 0 : break;
3001 : }
3002 0 : else if (state == REJECTED) {
3003 0 : result = REJECTED;
3004 : }
3005 : }
3006 : }
3007 0 : }
3008 :
3009 :
3010 : void
3011 0 : DataReaderImpl::coherent_changes_completed(DataReaderImpl* reader)
3012 : {
3013 0 : RcHandle<SubscriberImpl> subscriber = get_subscriber_servant();
3014 0 : if (!subscriber) {
3015 0 : return;
3016 : }
3017 :
3018 0 : subscriber->set_status_changed_flag(::DDS::DATA_ON_READERS_STATUS, true);
3019 0 : this->set_status_changed_flag(::DDS::DATA_AVAILABLE_STATUS, true);
3020 :
3021 : ::DDS::SubscriberListener_var sub_listener =
3022 0 : subscriber->listener_for(::DDS::DATA_ON_READERS_STATUS);
3023 0 : if (!CORBA::is_nil(sub_listener.in()))
3024 : {
3025 0 : if (!is_bit()) {
3026 0 : this->set_status_changed_flag(::DDS::DATA_AVAILABLE_STATUS, false);
3027 0 : subscriber->set_status_changed_flag(::DDS::DATA_ON_READERS_STATUS, false);
3028 0 : if (reader == this) {
3029 : // Release the sample_lock before listener callback.
3030 0 : ACE_GUARD(Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
3031 0 : sub_listener->on_data_on_readers(subscriber.in());
3032 0 : }
3033 : } else {
3034 0 : TheServiceParticipant->job_queue()->enqueue(make_rch<OnDataOnReaders>(subscriber, sub_listener, rchandle_from(this), reader == this, true));
3035 : }
3036 : }
3037 : else
3038 : {
3039 0 : subscriber->notify_status_condition();
3040 :
3041 : ::DDS::DataReaderListener_var listener =
3042 0 : this->listener_for (::DDS::DATA_AVAILABLE_STATUS);
3043 :
3044 0 : if (!CORBA::is_nil(listener.in()))
3045 : {
3046 0 : if (!is_bit()) {
3047 0 : set_status_changed_flag(::DDS::DATA_AVAILABLE_STATUS, false);
3048 0 : subscriber->set_status_changed_flag(::DDS::DATA_ON_READERS_STATUS, false);
3049 0 : if (reader == this) {
3050 : // Release the sample_lock before listener callback.
3051 0 : ACE_GUARD(Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
3052 0 : listener->on_data_available(this);
3053 0 : } else {
3054 0 : listener->on_data_available(this);
3055 : }
3056 : } else {
3057 0 : TheServiceParticipant->job_queue()->enqueue(make_rch<OnDataAvailable>(listener, rchandle_from(this), reader == this, true, true));
3058 : }
3059 : }
3060 : else
3061 : {
3062 0 : this->notify_status_condition();
3063 : }
3064 0 : }
3065 0 : }
3066 :
3067 :
3068 0 : void DataReaderImpl::begin_access()
3069 : {
3070 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
3071 0 : this->coherent_ = true;
3072 0 : }
3073 :
3074 :
3075 0 : void DataReaderImpl::end_access()
3076 : {
3077 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
3078 0 : this->coherent_ = false;
3079 0 : this->group_coherent_ordered_data_.reset();
3080 0 : this->post_read_or_take();
3081 0 : }
3082 :
3083 :
3084 0 : void DataReaderImpl::get_ordered_data(GroupRakeData& data,
3085 : DDS::SampleStateMask sample_states,
3086 : DDS::ViewStateMask view_states,
3087 : DDS::InstanceStateMask instance_states)
3088 : {
3089 0 : SubscriptionInstanceSet localsubs;
3090 : {
3091 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, instances_lock_);
3092 0 : for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
3093 0 : iter != instances_.end(); ++iter) {
3094 0 : localsubs.insert(iter->second);
3095 : }
3096 0 : }
3097 :
3098 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
3099 :
3100 0 : for (SubscriptionInstanceSet::iterator iter = localsubs.begin(); iter != localsubs.end(); ++iter) {
3101 0 : const SubscriptionInstance_rch inst = *iter;
3102 0 : if (inst->instance_state_->match(view_states, instance_states)) {
3103 0 : size_t i(0);
3104 0 : for (ReceivedDataElement* item = inst->rcvd_samples_.get_next_match(sample_states, 0);
3105 0 : item; item = inst->rcvd_samples_.get_next_match(sample_states, item)) {
3106 0 : data.insert_sample(item, &inst->rcvd_samples_, *iter, ++i);
3107 0 : group_coherent_ordered_data_.insert_sample(item, &inst->rcvd_samples_, *iter, ++i);
3108 : }
3109 : }
3110 0 : }
3111 0 : }
3112 :
3113 : #endif // OPENDDS_NO_OBJECT_MODEL_PROFILE
3114 :
3115 : void
3116 0 : DataReaderImpl::set_subscriber_qos(
3117 : const DDS::SubscriberQos & qos)
3118 : {
3119 0 : this->subqos_ = qos;
3120 0 : }
3121 :
3122 : #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
3123 : void
3124 0 : DataReaderImpl::enable_filtering(ContentFilteredTopicImpl* cft)
3125 : {
3126 0 : cft->add_reader(*this);
3127 : {
3128 0 : ACE_Guard<ACE_Thread_Mutex> guard(content_filtered_topic_mutex_);
3129 0 : content_filtered_topic_ = cft;
3130 0 : }
3131 0 : }
3132 :
3133 : DDS::ContentFilteredTopic_ptr
3134 0 : DataReaderImpl::get_cf_topic() const
3135 : {
3136 0 : ACE_Guard<ACE_Thread_Mutex> guard(content_filtered_topic_mutex_);
3137 0 : return DDS::ContentFilteredTopic::_duplicate(content_filtered_topic_.get());
3138 0 : }
3139 : #endif
3140 :
3141 : #ifndef OPENDDS_NO_MULTI_TOPIC
3142 : void
3143 0 : DataReaderImpl::enable_multi_topic(MultiTopicImpl* mt)
3144 : {
3145 0 : multi_topic_ = mt;
3146 0 : }
3147 : #endif
3148 :
3149 : #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
3150 :
3151 : void
3152 0 : DataReaderImpl::update_subscription_params(const DDS::StringSeq& params) const
3153 : {
3154 0 : Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
3155 0 : disco->update_subscription_params(domain_id_,
3156 0 : dp_id_,
3157 0 : subscription_id_,
3158 : params);
3159 0 : }
3160 : #endif
3161 :
3162 : void
3163 0 : DataReaderImpl::reset_ownership(::DDS::InstanceHandle_t instance)
3164 : {
3165 0 : ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_);
3166 0 : for (WriterMapType::iterator iter = writers_.begin();
3167 0 : iter != writers_.end();
3168 0 : ++iter) {
3169 0 : iter->second->set_owner_evaluated(instance, false);
3170 : }
3171 0 : }
3172 :
3173 : void
3174 0 : DataReaderImpl::resume_sample_processing(const GUID_t& pub_id)
3175 : {
3176 0 : WriterInfo_rch info;
3177 : {
3178 0 : ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_);
3179 0 : WriterMapType::iterator where = writers_.find(pub_id);
3180 0 : if (writers_.end() != where) {
3181 0 : info = where->second;
3182 : }
3183 0 : }
3184 :
3185 0 : if (info) {
3186 0 : OPENDDS_MAP(SequenceNumber, ReceivedDataSample) to_deliver;
3187 : // Stop filtering these
3188 0 : if (info->check_end_historic_samples(end_historic_sweeper_.in(), to_deliver)) {
3189 0 : deliver_historic(to_deliver);
3190 0 : info->finished_delivering_historic();
3191 : }
3192 0 : }
3193 0 : }
3194 :
3195 0 : bool DataReaderImpl::check_historic(const ReceivedDataSample& sample)
3196 : {
3197 0 : ACE_WRITE_GUARD_RETURN(ACE_RW_Thread_Mutex, write_guard, writers_lock_, true);
3198 0 : WriterMapType::iterator iter = writers_.find(sample.header_.publication_id_);
3199 0 : if (iter != writers_.end()) {
3200 0 : const SequenceNumber& seq = sample.header_.sequence_;
3201 0 : SequenceNumber last_historic_seq;
3202 0 : if (iter->second->check_historic(seq, sample, last_historic_seq)) {
3203 0 : return false;
3204 : }
3205 0 : if (last_historic_seq != SequenceNumber::SEQUENCENUMBER_UNKNOWN()
3206 0 : && !sample.header_.historic_sample_
3207 0 : && seq <= last_historic_seq) {
3208 : // this sample must have been seen before the END_HISTORIC_SAMPLES control msg
3209 0 : return false;
3210 : }
3211 : }
3212 0 : return true;
3213 0 : }
3214 :
3215 0 : void DataReaderImpl::deliver_historic(OPENDDS_MAP(SequenceNumber, ReceivedDataSample)& samples)
3216 : {
3217 : typedef OPENDDS_MAP(SequenceNumber, ReceivedDataSample)::iterator iter_t;
3218 0 : const iter_t end = samples.end();
3219 0 : for (iter_t iter = samples.begin(); iter != end; ++iter) {
3220 0 : iter->second.header_.historic_sample_ = true;
3221 0 : data_received(iter->second);
3222 : }
3223 0 : }
3224 :
3225 : void
3226 0 : DataReaderImpl::add_link(const DataLink_rch& link, const GUID_t& peer)
3227 : {
3228 0 : if (this->qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS) {
3229 :
3230 0 : ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, writers_lock_);
3231 :
3232 0 : WriterMapType::iterator it = writers_.find(peer);
3233 0 : if (it != writers_.end()) {
3234 : // Schedule timer if necessary
3235 : // - only need to check reader qos - we know the writer must be >= reader
3236 0 : end_historic_sweeper_->schedule_timer(it->second);
3237 : }
3238 0 : }
3239 0 : TransportClient::add_link(link, peer);
3240 0 : OPENDDS_STRING type;
3241 : {
3242 0 : TransportImpl_rch impl = link->impl();
3243 0 : if (impl) {
3244 0 : type = impl->transport_type();
3245 : }
3246 0 : }
3247 :
3248 0 : if (type == "rtps_udp" || type == "multicast") {
3249 0 : resume_sample_processing(peer);
3250 : }
3251 0 : }
3252 :
3253 : void
3254 0 : DataReaderImpl::register_for_writer(const GUID_t& participant,
3255 : const GUID_t& readerid,
3256 : const GUID_t& writerid,
3257 : const TransportLocatorSeq& locators,
3258 : DiscoveryListener* listener)
3259 : {
3260 0 : TransportClient::register_for_writer(participant, readerid, writerid, locators, listener);
3261 0 : }
3262 :
3263 : void
3264 0 : DataReaderImpl::unregister_for_writer(const GUID_t& participant,
3265 : const GUID_t& readerid,
3266 : const GUID_t& writerid)
3267 : {
3268 0 : TransportClient::unregister_for_writer(participant, readerid, writerid);
3269 0 : }
3270 :
3271 : void
3272 0 : DataReaderImpl::update_locators(const GUID_t& writerId,
3273 : const TransportLocatorSeq& locators)
3274 : {
3275 : {
3276 0 : ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, writers_lock_);
3277 0 : WriterMapType::const_iterator iter = writers_.find(writerId);
3278 0 : if (iter == writers_.end()) {
3279 0 : return;
3280 : }
3281 0 : }
3282 0 : TransportClient::update_locators(writerId, locators);
3283 : }
3284 :
3285 : WeakRcHandle<ICE::Endpoint>
3286 0 : DataReaderImpl::get_ice_endpoint()
3287 : {
3288 0 : return TransportClient::get_ice_endpoint();
3289 : }
3290 :
3291 0 : DDS::ReturnCode_t DataReaderImpl::setup_deserialization()
3292 : {
3293 0 : bool xcdr1_mutable = false;
3294 0 : bool illegal_unaligned = false;
3295 0 : for (CORBA::ULong i = 0; i < qos_.representation.value.length(); ++i) {
3296 : Encoding::Kind encoding_kind;
3297 0 : if (repr_to_encoding_kind(qos_.representation.value[i], encoding_kind)) {
3298 0 : if (encoding_kind == Encoding::KIND_XCDR1 && type_support_->max_extensibility() == MUTABLE) {
3299 0 : xcdr1_mutable = true;
3300 0 : } else if (encoding_kind == Encoding::KIND_UNALIGNED_CDR && cdr_encapsulation()) {
3301 0 : illegal_unaligned = true;
3302 : } else {
3303 0 : decoding_modes_.insert(encoding_kind);
3304 : }
3305 0 : } else if (DCPS_debug_level) {
3306 0 : ACE_DEBUG((LM_WARNING, "(%P|%t) WARNING: "
3307 : "DataReaderImpl::setup_deserialization: "
3308 : "Encountered unsupported or unknown data representation: %C\n",
3309 : repr_to_string(qos_.representation.value[i]).c_str()));
3310 : }
3311 : }
3312 0 : if (decoding_modes_.empty()) {
3313 0 : if (DCPS_debug_level) {
3314 0 : DCPS::String error_message;
3315 0 : if (xcdr1_mutable) {
3316 0 : error_message = " Unsupported combination of XCDR1 and mutable";
3317 0 : } else if (illegal_unaligned) {
3318 0 : error_message = " Unaligned CDR is not allowed in rtps_udp transport";
3319 : }
3320 0 : ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: "
3321 : "DataReaderImpl::setup_deserialization: "
3322 : "Could not find a valid data representation.%C\n",
3323 : error_message.c_str()));
3324 0 : }
3325 0 : return DDS::RETCODE_ERROR;
3326 : }
3327 0 : if (DCPS_debug_level >= 2) {
3328 0 : OPENDDS_STRING encodings;
3329 0 : EncodingKinds::iterator it = decoding_modes_.begin();
3330 0 : for (; it != decoding_modes_.end(); ++it) {
3331 0 : if (!encodings.empty()) {
3332 0 : encodings += ", ";
3333 : }
3334 0 : encodings += Encoding::kind_to_string(*it);
3335 : }
3336 0 : ACE_DEBUG((LM_DEBUG, "(%P|%t) DataReaderImpl::setup_deserialization: "
3337 : "Setup successfully with the following data representation%C: %C\n",
3338 : encodings.size() != 1 ? "s" : "",
3339 : encodings.c_str()));
3340 0 : }
3341 :
3342 0 : return DDS::RETCODE_OK;
3343 : }
3344 :
3345 0 : void DataReaderImpl::accept_sample_processing(const SubscriptionInstance_rch& instance,
3346 : const DataSampleHeader& header,
3347 : bool is_new_instance)
3348 : {
3349 0 : bool accepted = true;
3350 : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
3351 0 : bool verify_coherent = false;
3352 : #endif
3353 0 : WriterInfo_rch writer;
3354 :
3355 0 : if (header.publication_id_.entityId.entityKind != ENTITYKIND_OPENDDS_NIL_WRITER) {
3356 0 : ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, writers_lock_);
3357 :
3358 0 : WriterMapType::iterator where = writers_.find(header.publication_id_);
3359 :
3360 0 : if (where != writers_.end()) {
3361 0 : if (header.coherent_change_) {
3362 :
3363 : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
3364 : // Received coherent change
3365 0 : where->second->coherent_change(header.group_coherent_, header.publisher_id_);
3366 0 : verify_coherent = true;
3367 : #endif
3368 0 : writer = where->second;
3369 : }
3370 : }
3371 : else {
3372 0 : ACE_DEBUG((LM_WARNING,
3373 : ACE_TEXT("(%P|%t) WARNING: DataReaderImpl::accept_sample_processing - ")
3374 : ACE_TEXT("subscription %C failed to find ")
3375 : ACE_TEXT("publication data for %C.\n"),
3376 : LogGuid(get_guid()).c_str(),
3377 : LogGuid(header.publication_id_).c_str()));
3378 : }
3379 0 : }
3380 :
3381 : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
3382 0 : if (verify_coherent) {
3383 0 : accepted = verify_coherent_changes_completion(writer.in());
3384 : }
3385 : #endif
3386 :
3387 0 : if (instance && deadline_queue_enabled_) {
3388 0 : instance->last_sample_tv_ = instance->cur_sample_tv_;
3389 0 : instance->cur_sample_tv_.set_to_now();
3390 :
3391 0 : if (is_new_instance) {
3392 0 : schedule_deadline(instance, false);
3393 : } else {
3394 0 : process_deadline(instance, MonotonicTimePoint::now(), false);
3395 : }
3396 : }
3397 :
3398 0 : if (accepted) {
3399 0 : notify_read_conditions();
3400 : }
3401 0 : }
3402 :
3403 : #if defined(OPENDDS_SECURITY)
3404 0 : DDS::Security::ParticipantCryptoHandle DataReaderImpl::get_crypto_handle() const
3405 : {
3406 0 : RcHandle<DomainParticipantImpl> participant = participant_servant_.lock();
3407 0 : return participant ? participant->crypto_handle() : DDS::HANDLE_NIL;
3408 0 : }
3409 : #endif
3410 :
3411 0 : EndHistoricSamplesMissedSweeper::EndHistoricSamplesMissedSweeper(ACE_Reactor* reactor,
3412 : ACE_thread_t owner,
3413 0 : DataReaderImpl* reader)
3414 : : ReactorInterceptor (reactor, owner)
3415 0 : , reader_(*reader)
3416 0 : { }
3417 :
3418 0 : EndHistoricSamplesMissedSweeper::~EndHistoricSamplesMissedSweeper()
3419 0 : { }
3420 :
3421 0 : void EndHistoricSamplesMissedSweeper::schedule_timer(OpenDDS::DCPS::RcHandle<OpenDDS::DCPS::WriterInfo>& info)
3422 : {
3423 0 : info->waiting_for_end_historic_samples(true);
3424 0 : execute_or_enqueue(make_rch<ScheduleCommand>(this, ref(info)));
3425 0 : }
3426 :
3427 0 : void EndHistoricSamplesMissedSweeper::cancel_timer(OpenDDS::DCPS::RcHandle<OpenDDS::DCPS::WriterInfo>& info)
3428 : {
3429 0 : info->waiting_for_end_historic_samples(false);
3430 0 : execute_or_enqueue(make_rch<CancelCommand>(this, ref(info)));
3431 0 : }
3432 :
3433 0 : int EndHistoricSamplesMissedSweeper::handle_timeout(
3434 : const ACE_Time_Value& ,
3435 : const void* arg)
3436 : {
3437 0 : ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager());
3438 :
3439 0 : WriterInfo* const info =
3440 : const_cast<WriterInfo*>(reinterpret_cast<const WriterInfo*>(arg));
3441 0 : const GUID_t pub_id = info->writer_id();
3442 :
3443 : {
3444 0 : ACE_Guard<ACE_Thread_Mutex> guard(this->mutex_);
3445 0 : info_set_.erase(rchandle_from(info));
3446 0 : }
3447 :
3448 0 : RcHandle<DataReaderImpl> reader = reader_.lock();
3449 0 : if (!reader)
3450 0 : return 0;
3451 :
3452 0 : if (DCPS_debug_level >= 1) {
3453 0 : ACE_DEBUG((LM_INFO, "(%P|%t) EndHistoricSamplesMissedSweeper::handle_timeout reader: %C waiting on writer: %C\n",
3454 : LogGuid(reader->get_guid()).c_str(),
3455 : LogGuid(pub_id).c_str()));
3456 : }
3457 :
3458 0 : reader->resume_sample_processing(pub_id);
3459 0 : return 0;
3460 0 : }
3461 :
3462 0 : void EndHistoricSamplesMissedSweeper::ScheduleCommand::execute()
3463 : {
3464 0 : static const ACE_Time_Value ten_seconds(10);
3465 0 : info_->schedule_historic_samples_timer(sweeper_, ten_seconds);
3466 0 : const bool insert_result = sweeper_->info_set_.insert(info_).second;
3467 :
3468 0 : if (insert_result && DCPS_debug_level) {
3469 0 : ACE_DEBUG((LM_INFO, "(%P|%t) EndHistoricSamplesMissedSweeper::ScheduleCommand::execute() - sweeper %@ is now scheduled\n", info_.in()));
3470 : }
3471 0 : }
3472 :
3473 0 : void EndHistoricSamplesMissedSweeper::CancelCommand::execute()
3474 : {
3475 0 : info_->cancel_historic_samples_timer(sweeper_);
3476 0 : const bool erase_result = sweeper_->info_set_.erase(info_) > 0;
3477 :
3478 0 : if (erase_result && DCPS_debug_level) {
3479 0 : ACE_DEBUG((LM_INFO, "(%P|%t) EndHistoricSamplesMissedSweeper::CancelCommand::execute() - sweeper %@ is no longer scheduled\n", info_.in()));
3480 : }
3481 0 : }
3482 :
3483 0 : void DataReaderImpl::transport_discovery_change()
3484 : {
3485 0 : populate_connection_info();
3486 0 : const TransportLocatorSeq& trans_conf_info = connection_info();
3487 0 : const GUID_t dp_id_copy = dp_id_;
3488 0 : Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
3489 0 : disco->update_subscription_locators(domain_id_,
3490 : dp_id_copy,
3491 0 : get_guid(),
3492 : trans_conf_info);
3493 0 : }
3494 :
3495 0 : void DataReaderImpl::OnDataOnReaders::execute()
3496 : {
3497 0 : RcHandle<SubscriberImpl> subscriber = subscriber_.lock();
3498 0 : RcHandle<DataReaderImpl> data_reader = data_reader_.lock();
3499 0 : if (!subscriber || !data_reader) {
3500 0 : return;
3501 : }
3502 :
3503 0 : if (set_reader_status_) {
3504 0 : data_reader->set_status_changed_flag(::DDS::DATA_AVAILABLE_STATUS, false);
3505 : }
3506 0 : subscriber->set_status_changed_flag(::DDS::DATA_ON_READERS_STATUS, false);
3507 :
3508 0 : if (call_) {
3509 0 : sub_listener_->on_data_on_readers(subscriber.in());
3510 : }
3511 0 : }
3512 :
3513 0 : void DataReaderImpl::OnDataAvailable::execute()
3514 : {
3515 0 : RcHandle<DataReaderImpl> data_reader = data_reader_.lock();
3516 :
3517 0 : if (data_reader && set_reader_status_) {
3518 0 : data_reader->set_status_changed_flag(::DDS::DATA_AVAILABLE_STATUS, false);
3519 : }
3520 :
3521 0 : if (data_reader && set_subscriber_status_) {
3522 0 : RcHandle<SubscriberImpl> subscriber = data_reader->get_subscriber_servant();
3523 0 : if (subscriber) {
3524 0 : subscriber->set_status_changed_flag(::DDS::DATA_ON_READERS_STATUS, false);
3525 : }
3526 0 : }
3527 :
3528 0 : if (call_ && data_reader) {
3529 0 : listener_->on_data_available(data_reader.in());
3530 : }
3531 0 : }
3532 :
3533 0 : void DataReaderImpl::initialize_lookup_maps()
3534 : {
3535 : // These all start at 1 (0 mask is bogus) and include the full mask (any)
3536 0 : for (CORBA::ULong is = 1; is <= MAX_SAMPLE_STATE_MASK; ++is) {
3537 0 : for (CORBA::ULong iv = 1; iv <= MAX_VIEW_STATE_MASK; ++iv) {
3538 0 : for (CORBA::ULong ii = 1; ii <= MAX_INSTANCE_STATE_MASK; ++ii) {
3539 0 : combined_state_lookup_[to_combined_states(is, iv, ii)] = HandleSet();
3540 : }
3541 : }
3542 : }
3543 : // catch-all for "bogus" lookups
3544 0 : combined_state_lookup_[0] = HandleSet();
3545 0 : }
3546 :
3547 0 : void DataReaderImpl::update_lookup_maps(const SubscriptionInstanceMapType::iterator& input)
3548 : {
3549 0 : for (LookupMap::iterator it = combined_state_lookup_.begin(); it != combined_state_lookup_.end(); ++it) {
3550 0 : if (it->first == 0) continue;
3551 : CORBA::ULong sample_states, view_states, instance_states;
3552 0 : split_combined_states(it->first, sample_states, view_states, instance_states);
3553 0 : if (input->second->matches(sample_states, view_states, instance_states)) {
3554 0 : it->second.insert(input->first);
3555 : } else {
3556 0 : it->second.erase(input->first);
3557 : }
3558 : }
3559 0 : }
3560 :
3561 0 : void DataReaderImpl::remove_from_lookup_maps(DDS::InstanceHandle_t handle)
3562 : {
3563 0 : for (LookupMap::iterator it = combined_state_lookup_.begin(), the_end = combined_state_lookup_.end(); it != the_end; ++it) {
3564 0 : if (it->first == 0) continue;
3565 0 : it->second.erase(handle);
3566 : }
3567 0 : }
3568 :
3569 0 : const DataReaderImpl::HandleSet& DataReaderImpl::lookup_matching_instances(CORBA::ULong sample_states, CORBA::ULong view_states, CORBA::ULong instance_states) const
3570 : {
3571 0 : const CORBA::ULong combined_states = to_combined_states(sample_states, view_states, instance_states);
3572 0 : LookupMap::const_iterator ci = combined_state_lookup_.find(combined_states);
3573 0 : OPENDDS_ASSERT(ci != combined_state_lookup_.end());
3574 0 : return ci->second;
3575 : }
3576 :
3577 0 : void DataReaderImpl::schedule_deadline(SubscriptionInstance_rch instance,
3578 : bool timer_called)
3579 : {
3580 : // Should be called with sample_lock_.
3581 0 : if (instance->deadline_ == MonotonicTimePoint::zero_value) {
3582 0 : instance->deadline_ = MonotonicTimePoint::now() + deadline_period_;
3583 0 : const bool schedule = deadline_queue_.empty();
3584 0 : deadline_queue_.insert(std::make_pair(instance->deadline_, instance));
3585 0 : if (!timer_called) {
3586 0 : if (schedule) {
3587 0 : deadline_task_->schedule(deadline_period_);
3588 0 : } else if (deadline_queue_.begin()->second == instance) {
3589 : // Moved to front.
3590 0 : deadline_task_->cancel();
3591 0 : deadline_task_->schedule(deadline_period_);
3592 : }
3593 : }
3594 : }
3595 0 : }
3596 :
3597 0 : void DataReaderImpl::cancel_deadline(SubscriptionInstance_rch instance)
3598 : {
3599 : // Should be called with sample_lock_.
3600 0 : if (instance->deadline_ != MonotonicTimePoint::zero_value) {
3601 0 : for (DeadlineQueue::iterator pos = deadline_queue_.lower_bound(instance->deadline_), limit = deadline_queue_.upper_bound(instance->deadline_); pos != limit; ++pos) {
3602 0 : if (pos->second == instance) {
3603 0 : deadline_queue_.erase(pos);
3604 0 : break;
3605 : }
3606 : }
3607 0 : instance->deadline_ = MonotonicTimePoint::zero_value;
3608 : }
3609 0 : }
3610 :
3611 0 : void DataReaderImpl::process_deadline(SubscriptionInstance_rch instance,
3612 : const MonotonicTimePoint& now,
3613 : bool timer_called)
3614 : {
3615 : // Should be called with sample_lock_.
3616 :
3617 0 : if (instance->deadline_ != MonotonicTimePoint::zero_value) {
3618 0 : bool missed = false;
3619 :
3620 0 : if (instance->cur_sample_tv_.is_zero()) { // not received any sample.
3621 0 : missed = true;
3622 :
3623 0 : } else if (timer_called) { // handle_timeout is called
3624 0 : missed = (now - instance->cur_sample_tv_) >= deadline_period_;
3625 :
3626 : } else { // upon receiving sample.
3627 0 : missed = (instance->cur_sample_tv_ - instance->last_sample_tv_) > deadline_period_;
3628 : }
3629 :
3630 0 : if (missed) {
3631 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex, monitor, sample_lock_);
3632 : // Only update the status upon timer is called and not
3633 : // when receiving a sample after the interval.
3634 : // Otherwise the counter is doubled.
3635 0 : if (timer_called) {
3636 0 : ++requested_deadline_missed_status_.total_count;
3637 0 : requested_deadline_missed_status_.total_count_change =
3638 0 : requested_deadline_missed_status_.total_count - last_deadline_missed_total_count_;
3639 0 : requested_deadline_missed_status_.last_instance_handle = instance->instance_handle_;
3640 :
3641 0 : set_status_changed_flag(DDS::REQUESTED_DEADLINE_MISSED_STATUS, true);
3642 :
3643 0 : DDS::DataReaderListener_var listener = listener_for(DDS::REQUESTED_DEADLINE_MISSED_STATUS);
3644 :
3645 : #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
3646 0 : if (instance->instance_state_->is_exclusive()) {
3647 0 : DataReaderImpl::OwnershipManagerPtr owner_manager = ownership_manager();
3648 0 : if (owner_manager)
3649 0 : owner_manager->remove_writers (instance->instance_handle_);
3650 0 : }
3651 : #endif
3652 :
3653 0 : if (!CORBA::is_nil(listener.in())) {
3654 : // Copy before releasing the lock.
3655 0 : DDS::RequestedDeadlineMissedStatus const status = requested_deadline_missed_status_;
3656 :
3657 : // Release the lock during the upcall.
3658 0 : ACE_GUARD(Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
3659 : // @todo Will this operation ever throw? If so we may want to
3660 : // catch all exceptions, and act accordingly.
3661 0 : listener->on_requested_deadline_missed(this, status);
3662 :
3663 : // We need to update the last total count value to our current total
3664 : // so that the next time we will calculate the correct total_count_change;
3665 0 : last_deadline_missed_total_count_ = requested_deadline_missed_status_.total_count;
3666 0 : }
3667 :
3668 0 : notify_status_condition();
3669 0 : }
3670 0 : }
3671 :
3672 : // This next part is without status_lock_ held to avoid reactor deadlock.
3673 0 : if (timer_called) {
3674 0 : instance->deadline_ = MonotonicTimePoint::zero_value;
3675 0 : schedule_deadline(instance, timer_called);
3676 : } else {
3677 0 : cancel_deadline(instance);
3678 0 : schedule_deadline(instance, timer_called);
3679 : }
3680 : }
3681 : }
3682 :
3683 0 : void DataReaderImpl::cancel_all_deadlines()
3684 : {
3685 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
3686 0 : deadline_queue_.clear();
3687 0 : deadline_task_->cancel();
3688 0 : }
3689 :
3690 0 : void DataReaderImpl::reset_deadline_period(const TimeDuration& deadline_period)
3691 : {
3692 0 : if (deadline_period_ != deadline_period) {
3693 0 : deadline_period_ = deadline_period;
3694 :
3695 0 : if (deadline_queue_enabled_) {
3696 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
3697 0 : const MonotonicTimePoint now = MonotonicTimePoint::now();
3698 0 : for (SubscriptionInstanceMapType::iterator iter = this->instances_.begin();
3699 0 : iter != this->instances_.end();
3700 0 : ++iter) {
3701 0 : if (iter->second->deadline_ != MonotonicTimePoint::zero_value) {
3702 0 : reschedule_deadline(iter->second, now);
3703 : }
3704 : }
3705 0 : }
3706 : }
3707 : }
3708 :
3709 0 : void DataReaderImpl::reschedule_deadline(SubscriptionInstance_rch instance,
3710 : const MonotonicTimePoint& now)
3711 : {
3712 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
3713 :
3714 : // So the datareader can call back into us.
3715 0 : if (instance->deadline_ != MonotonicTimePoint::zero_value) {
3716 :
3717 : // Remove.
3718 0 : for (DeadlineQueue::iterator pos = deadline_queue_.lower_bound(instance->deadline_), limit = deadline_queue_.upper_bound(instance->deadline_); pos != limit; ++pos) {
3719 0 : if (pos->second == instance) {
3720 0 : deadline_queue_.erase(pos);
3721 0 : break;
3722 : }
3723 : }
3724 :
3725 0 : instance->deadline_ = now + (deadline_period_ - (instance->deadline_ - now));
3726 :
3727 0 : const bool schedule = deadline_queue_.empty();
3728 0 : deadline_queue_.insert(std::make_pair(instance->deadline_, instance));
3729 0 : if (schedule) {
3730 0 : deadline_task_->schedule(deadline_period_);
3731 0 : } else if (deadline_queue_.begin()->second == instance) {
3732 : // Moved to front.
3733 0 : deadline_task_->cancel();
3734 0 : deadline_task_->schedule(deadline_period_);
3735 : }
3736 : }
3737 0 : }
3738 :
3739 0 : void DataReaderImpl::deadline_task(const MonotonicTimePoint& now)
3740 : {
3741 0 : ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager());
3742 :
3743 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
3744 0 : for (DeadlineQueue::iterator pos = deadline_queue_.begin(), limit = deadline_queue_.end(); pos != limit && pos->first <= now;) {
3745 0 : SubscriptionInstance_rch instance = pos->second;
3746 0 : deadline_queue_.erase(pos++);
3747 : // pos is no longer valid.
3748 0 : process_deadline(instance, now, true);
3749 0 : }
3750 :
3751 0 : if (!deadline_queue_.empty()) {
3752 0 : deadline_task_->schedule(deadline_queue_.begin()->first - now);
3753 : }
3754 0 : }
3755 :
3756 : } // namespace DCPS
3757 : } // namespace OpenDDS
3758 :
3759 : OPENDDS_END_VERSIONED_NAMESPACE_DECL
|