Line data Source code
1 : /*
2 : *
3 : *
4 : * Distributed under the OpenDDS License.
5 : * See: http://www.opendds.org/license.html
6 : */
7 :
8 : #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
9 : #include "ReplayerImpl.h"
10 : #include "FeatureDisabledQosCheck.h"
11 : #include "DomainParticipantImpl.h"
12 : #include "PublisherImpl.h"
13 : #include "Service_Participant.h"
14 : #include "GuidConverter.h"
15 : #include "TopicImpl.h"
16 : #include "PublicationInstance.h"
17 : #include "SendStateDataSampleList.h"
18 : #include "DataSampleElement.h"
19 : #include "Serializer.h"
20 : #include "Transient_Kludge.h"
21 : #include "DataDurabilityCache.h"
22 : #include "MonitorFactory.h"
23 : #include "TypeSupportImpl.h"
24 : #include "DCPS_Utils.h"
25 : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
26 : #include "CoherentChangeControl.h"
27 : #endif
28 : #include "AssociationData.h"
29 :
30 : #if !defined (DDS_HAS_MINIMUM_BIT)
31 : #include "BuiltInTopicUtils.h"
32 : #endif // !defined (DDS_HAS_MINIMUM_BIT)
33 :
34 : #include "Util.h"
35 :
36 : #include "transport/framework/EntryExit.h"
37 : #include "transport/framework/TransportExceptions.h"
38 : #include "transport/framework/TransportSendElement.h"
39 : #include "transport/framework/TransportCustomizedElement.h"
40 :
41 : #include "ace/Reactor.h"
42 : #include "ace/Auto_Ptr.h"
43 :
44 : #include <stdexcept>
45 :
46 : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
47 :
48 : namespace OpenDDS {
49 : namespace DCPS {
50 :
51 :
52 0 : ReplayerImpl::ReplayerImpl()
53 0 : : data_dropped_count_(0),
54 0 : data_delivered_count_(0),
55 0 : n_chunks_(TheServiceParticipant->n_chunks()),
56 0 : association_chunk_multiplier_(TheServiceParticipant->association_chunk_multiplier()),
57 0 : qos_(TheServiceParticipant->initial_DataWriterQos()),
58 0 : participant_servant_(0),
59 0 : topic_id_(GUID_UNKNOWN),
60 0 : topic_servant_(0),
61 0 : listener_mask_(DEFAULT_STATUS_MASK),
62 0 : domain_id_(0),
63 0 : publisher_servant_(0),
64 0 : publication_id_(GUID_UNKNOWN),
65 0 : sequence_number_(SequenceNumber::SEQUENCENUMBER_UNKNOWN()),
66 : // data_container_(0),
67 : // liveliness_lost_(false),
68 : // last_deadline_missed_total_count_(0),
69 0 : is_bit_(false),
70 0 : empty_condition_(lock_),
71 0 : pending_write_count_(0)
72 : {
73 : // liveliness_lost_status_.total_count = 0;
74 : // liveliness_lost_status_.total_count_change = 0;
75 : //
76 : // offered_deadline_missed_status_.total_count = 0;
77 : // offered_deadline_missed_status_.total_count_change = 0;
78 : // offered_deadline_missed_status_.last_instance_handle = DDS::HANDLE_NIL;
79 :
80 0 : offered_incompatible_qos_status_.total_count = 0;
81 0 : offered_incompatible_qos_status_.total_count_change = 0;
82 0 : offered_incompatible_qos_status_.last_policy_id = 0;
83 0 : offered_incompatible_qos_status_.policies.length(0);
84 :
85 0 : publication_match_status_.total_count = 0;
86 0 : publication_match_status_.total_count_change = 0;
87 0 : publication_match_status_.current_count = 0;
88 0 : publication_match_status_.current_count_change = 0;
89 0 : publication_match_status_.last_subscription_handle = DDS::HANDLE_NIL;
90 :
91 0 : }
92 :
93 : // This method is called when there are no longer any reference to the
94 : // the servant.
95 0 : ReplayerImpl::~ReplayerImpl()
96 : {
97 : DBG_ENTRY_LVL("ReplayerImpl","~ReplayerImpl",6);
98 0 : }
99 :
100 : // this method is called when delete_datawriter is called.
101 : DDS::ReturnCode_t
102 0 : ReplayerImpl::cleanup()
103 : {
104 :
105 : // // Unregister all registered instances prior to deletion.
106 : // // this->unregister_instances(SystemTimePoint::now().to_dds_time());
107 : //
108 : // // CORBA::String_var topic_name = this->get_Atopic_name();
109 : {
110 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, DDS::RETCODE_ERROR);
111 :
112 : // Wait for pending samples to drain prior to removing associations
113 : // and unregistering the publication.
114 0 : ThreadStatusManager& thread_status_manager = TheServiceParticipant->get_thread_status_manager();
115 0 : while (this->pending_write_count_) {
116 0 : this->empty_condition_.wait(thread_status_manager);
117 : }
118 :
119 : // Call remove association before unregistering the datawriter
120 : // with the transport, otherwise some callbacks resulted from
121 : // remove_association may lost.
122 0 : this->remove_all_associations();
123 :
124 : // release our Topic_var
125 0 : topic_objref_ = DDS::Topic::_nil();
126 0 : topic_servant_ = 0;
127 :
128 0 : }
129 :
130 : // not just unregister but remove any pending writes/sends.
131 : // this->unregister_all();
132 :
133 0 : Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
134 0 : if (!disco->remove_publication(
135 : this->domain_id_,
136 0 : this->participant_servant_->get_id(),
137 0 : this->publication_id_)) {
138 0 : ACE_ERROR_RETURN((LM_ERROR,
139 : ACE_TEXT("(%P|%t) ERROR: ")
140 : ACE_TEXT("PublisherImpl::delete_datawriter, ")
141 : ACE_TEXT("publication not removed from discovery.\n")),
142 : DDS::RETCODE_ERROR);
143 : }
144 0 : return DDS::RETCODE_OK;
145 0 : }
146 :
147 : void
148 0 : ReplayerImpl::init(
149 : DDS::Topic_ptr topic,
150 : TopicImpl * topic_servant,
151 : const DDS::DataWriterQos & qos,
152 : ReplayerListener_rch a_listener,
153 : const DDS::StatusMask & mask,
154 : OpenDDS::DCPS::DomainParticipantImpl * participant_servant,
155 : const DDS::PublisherQos& publisher_qos)
156 : {
157 : DBG_ENTRY_LVL("ReplayerImpl","init",6);
158 0 : topic_objref_ = DDS::Topic::_duplicate(topic);
159 0 : topic_servant_ = topic_servant;
160 0 : topic_name_ = topic_servant_->get_name();
161 0 : topic_id_ = topic_servant_->get_id();
162 0 : type_name_ = topic_servant_->get_type_name();
163 :
164 : #if !defined (DDS_HAS_MINIMUM_BIT)
165 0 : is_bit_ = topicIsBIT(topic_name_.in(), type_name_.in());
166 : #endif // !defined (DDS_HAS_MINIMUM_BIT)
167 :
168 0 : qos_ = qos;
169 0 : passed_qos_ = qos;
170 :
171 : //Note: OK to _duplicate(nil).
172 0 : listener_ = a_listener;
173 0 : listener_mask_ = mask;
174 :
175 : // Only store the participant pointer, since it is our "grand"
176 : // parent, we will exist as long as it does.
177 0 : participant_servant_ = participant_servant;
178 0 : domain_id_ = participant_servant_->get_domain_id();
179 :
180 0 : publisher_qos_ = publisher_qos;
181 0 : }
182 :
183 :
184 0 : DDS::ReturnCode_t ReplayerImpl::set_qos (const DDS::PublisherQos & publisher_qos,
185 : const DDS::DataWriterQos & qos)
186 : {
187 :
188 : OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK(publisher_qos, DDS::RETCODE_UNSUPPORTED);
189 :
190 0 : if (Qos_Helper::valid(publisher_qos) && Qos_Helper::consistent(publisher_qos)) {
191 0 : if (publisher_qos_ == publisher_qos)
192 0 : return DDS::RETCODE_OK;
193 :
194 : // for the not changeable qos, it can be changed before enable
195 0 : if (!Qos_Helper::changeable(publisher_qos_, publisher_qos) && enabled_) {
196 0 : return DDS::RETCODE_IMMUTABLE_POLICY;
197 :
198 : } else {
199 0 : publisher_qos_ = publisher_qos;
200 : }
201 : } else {
202 0 : return DDS::RETCODE_INCONSISTENT_POLICY;
203 : }
204 :
205 : OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
206 : OPENDDS_NO_OWNERSHIP_STRENGTH_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
207 : OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
208 : OPENDDS_NO_DURABILITY_SERVICE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
209 : OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
210 :
211 0 : if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
212 0 : if (qos_ == qos)
213 0 : return DDS::RETCODE_OK;
214 :
215 0 : if (!Qos_Helper::changeable(qos_, qos) && enabled_) {
216 0 : return DDS::RETCODE_IMMUTABLE_POLICY;
217 :
218 : } else {
219 0 : Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
220 : // DDS::PublisherQos publisherQos;
221 : // this->publisher_servant_->get_qos(publisherQos);
222 0 : DDS::PublisherQos publisherQos = this->publisher_qos_;
223 : const bool status
224 0 : = disco->update_publication_qos(this->participant_servant_->get_domain_id(),
225 0 : this->participant_servant_->get_id(),
226 0 : this->publication_id_,
227 : qos,
228 : publisherQos);
229 :
230 0 : if (!status) {
231 0 : ACE_ERROR_RETURN((LM_ERROR,
232 : ACE_TEXT("(%P|%t) DataWriterImpl::set_qos, ")
233 : ACE_TEXT("qos not updated.\n")),
234 : DDS::RETCODE_ERROR);
235 : }
236 0 : }
237 :
238 0 : if (!(qos_ == qos)) {
239 : // Reset the deadline timer if the period has changed.
240 : // if (qos_.deadline.period.sec != qos.deadline.period.sec
241 : // || qos_.deadline.period.nanosec != qos.deadline.period.nanosec) {
242 : // if (qos_.deadline.period.sec == DDS::DURATION_INFINITE_SEC
243 : // && qos_.deadline.period.nanosec == DDS::DURATION_INFINITE_NSEC) {
244 : // ACE_auto_ptr_reset(this->watchdog_,
245 : // new OfferedDeadlineWatchdog(
246 : // this->reactor_,
247 : // this->lock_,
248 : // qos.deadline,
249 : // this,
250 : // this,
251 : // this->offered_deadline_missed_status_,
252 : // this->last_deadline_missed_total_count_));
253 : //
254 : // } else if (qos.deadline.period.sec == DDS::DURATION_INFINITE_SEC
255 : // && qos.deadline.period.nanosec == DDS::DURATION_INFINITE_NSEC) {
256 : // this->watchdog_->cancel_all();
257 : // this->watchdog_.reset();
258 : //
259 : // } else {
260 : // this->watchdog_->reset_interval(
261 : // duration_to_time_value(qos.deadline.period));
262 : // }
263 : // }
264 :
265 0 : qos_ = qos;
266 : }
267 :
268 0 : return DDS::RETCODE_OK;
269 :
270 : } else {
271 0 : return DDS::RETCODE_INCONSISTENT_POLICY;
272 : }
273 : }
274 :
275 0 : DDS::ReturnCode_t ReplayerImpl::get_qos (DDS::PublisherQos & publisher_qos,
276 : DDS::DataWriterQos & qos)
277 : {
278 0 : qos = passed_qos_;
279 0 : publisher_qos = publisher_qos_;
280 0 : return DDS::RETCODE_OK;
281 : }
282 :
283 :
284 0 : DDS::ReturnCode_t ReplayerImpl::set_listener (const ReplayerListener_rch & a_listener,
285 : DDS::StatusMask mask)
286 : {
287 0 : listener_ = a_listener;
288 0 : listener_mask_ = mask;
289 0 : return DDS::RETCODE_OK;
290 : }
291 :
292 0 : ReplayerListener_rch ReplayerImpl::get_listener ()
293 : {
294 0 : return listener_;
295 : }
296 :
297 : DDS::ReturnCode_t
298 0 : ReplayerImpl::enable()
299 : {
300 : //According spec:
301 : // - Calling enable on an already enabled Entity returns OK and has no
302 : // effect.
303 : // - Calling enable on an Entity whose factory is not enabled will fail
304 : // and return PRECONDITION_NOT_MET.
305 :
306 0 : if (this->is_enabled()) {
307 0 : return DDS::RETCODE_OK;
308 : }
309 :
310 : // if (!this->publisher_servant_->is_enabled()) {
311 : // return DDS::RETCODE_PRECONDITION_NOT_MET;
312 : // }
313 : //
314 0 : const bool reliable = qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS;
315 :
316 0 : if (qos_.resource_limits.max_samples != DDS::LENGTH_UNLIMITED) {
317 0 : n_chunks_ = qos_.resource_limits.max_samples;
318 : }
319 : // +1 because we might allocate one before releasing another
320 : // TBD - see if this +1 can be removed.
321 0 : mb_allocator_.reset(new MessageBlockAllocator(n_chunks_ * association_chunk_multiplier_));
322 0 : db_allocator_.reset(new DataBlockAllocator(n_chunks_+1));
323 0 : header_allocator_.reset(new DataSampleHeaderAllocator(n_chunks_+1));
324 :
325 0 : sample_list_element_allocator_.reset(new DataSampleElementAllocator(2 * n_chunks_));
326 :
327 :
328 0 : if (DCPS_debug_level >= 2) {
329 0 : ACE_DEBUG((LM_DEBUG,
330 : "(%P|%t) ReplayerImpl::enable-mb"
331 : " Cached_Allocator_With_Overflow %x with %d chunks\n",
332 : mb_allocator_.get(),
333 : n_chunks_));
334 :
335 0 : ACE_DEBUG((LM_DEBUG,
336 : "(%P|%t) ReplayerImpl::enable-db"
337 : " Cached_Allocator_With_Overflow %x with %d chunks\n",
338 : db_allocator_.get(),
339 : n_chunks_));
340 :
341 0 : ACE_DEBUG((LM_DEBUG,
342 : "(%P|%t) ReplayerImpl::enable-header"
343 : " Cached_Allocator_With_Overflow %x with %d chunks\n",
344 : header_allocator_.get(),
345 : n_chunks_));
346 : }
347 :
348 0 : this->set_enabled();
349 :
350 : try {
351 0 : this->enable_transport(reliable,
352 0 : this->qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS);
353 :
354 0 : } catch (const Transport::Exception&) {
355 0 : ACE_ERROR((LM_ERROR,
356 : ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::enable, ")
357 : ACE_TEXT("Transport Exception.\n")));
358 0 : return DDS::RETCODE_ERROR;
359 :
360 0 : }
361 :
362 0 : const TransportLocatorSeq& trans_conf_info = connection_info();
363 :
364 :
365 0 : Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
366 :
367 0 : set_writer_effective_data_rep_qos(qos_.representation.value, cdr_encapsulation());
368 0 : if (!topic_servant_->check_data_representation(qos_.representation.value, true)) {
369 0 : return DDS::RETCODE_ERROR;
370 : }
371 :
372 0 : XTypes::TypeInformation type_info;
373 0 : type_info.minimal.typeid_with_size.typeobject_serialized_size = 0;
374 0 : type_info.minimal.dependent_typeid_count = 0;
375 0 : type_info.complete.typeid_with_size.typeobject_serialized_size = 0;
376 0 : type_info.complete.dependent_typeid_count = 0;
377 :
378 : this->publication_id_ =
379 0 : disco->add_publication(this->domain_id_,
380 0 : this->participant_servant_->get_id(),
381 0 : this->topic_servant_->get_id(),
382 0 : rchandle_from(this),
383 0 : this->qos_,
384 : trans_conf_info,
385 0 : this->publisher_qos_,
386 : type_info);
387 :
388 0 : if (this->publication_id_ == GUID_UNKNOWN) {
389 0 : ACE_ERROR((LM_ERROR,
390 : ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::enable, ")
391 : ACE_TEXT("add_publication returned invalid id.\n")));
392 0 : return DDS::RETCODE_ERROR;
393 : }
394 :
395 0 : return DDS::RETCODE_OK;
396 0 : }
397 :
398 :
399 :
400 : void
401 0 : ReplayerImpl::add_association(const GUID_t& yourId,
402 : const ReaderAssociation& reader,
403 : bool active)
404 : {
405 : DBG_ENTRY_LVL("ReplayerImpl", "add_association", 6);
406 :
407 0 : if (DCPS_debug_level >= 1) {
408 0 : ACE_DEBUG((LM_DEBUG,
409 : ACE_TEXT("(%P|%t) ReplayerImpl::add_association - ")
410 : ACE_TEXT("bit %d local %C remote %C\n"),
411 : is_bit_,
412 : LogGuid(yourId).c_str(),
413 : LogGuid(reader.readerId).c_str()));
414 : }
415 :
416 : // if (entity_deleted_) {
417 : // if (DCPS_debug_level >= 1)
418 : // ACE_DEBUG((LM_DEBUG,
419 : // ACE_TEXT("(%P|%t) ReplayerImpl::add_association")
420 : // ACE_TEXT(" This is a deleted datawriter, ignoring add.\n")));
421 : //
422 : // return;
423 : // }
424 :
425 0 : if (GUID_UNKNOWN == publication_id_) {
426 0 : publication_id_ = yourId;
427 : }
428 :
429 : {
430 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
431 0 : reader_info_.insert(std::make_pair(reader.readerId,
432 0 : ReaderInfo(TheServiceParticipant->publisher_content_filter() ? reader.filterExpression : "",
433 0 : reader.exprParams, participant_servant_,
434 0 : reader.readerQos.durability.kind > DDS::VOLATILE_DURABILITY_QOS)));
435 0 : }
436 :
437 0 : if (DCPS_debug_level > 4) {
438 0 : ACE_DEBUG((LM_DEBUG,
439 : ACE_TEXT("(%P|%t) ReplayerImpl::add_association(): ")
440 : ACE_TEXT("adding subscription to publication %C with priority %d.\n"),
441 : LogGuid(publication_id_).c_str(),
442 : qos_.transport_priority.value));
443 : }
444 :
445 0 : AssociationData data;
446 0 : data.remote_id_ = reader.readerId;
447 0 : data.remote_data_ = reader.readerTransInfo;
448 0 : data.discovery_locator_ = reader.readerDiscInfo;
449 0 : data.remote_transport_context_ = reader.transportContext;
450 0 : data.remote_reliable_ =
451 0 : (reader.readerQos.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS);
452 0 : data.remote_durable_ =
453 0 : (reader.readerQos.durability.kind > DDS::VOLATILE_DURABILITY_QOS);
454 :
455 0 : if (!this->associate(data, active)) {
456 : //FUTURE: inform inforepo and try again as passive peer
457 0 : if (DCPS_debug_level) {
458 0 : ACE_ERROR((LM_ERROR,
459 : ACE_TEXT("(%P|%t) ReplayerImpl::add_association: ")
460 : ACE_TEXT("ERROR: transport layer failed to associate.\n")));
461 : }
462 0 : return;
463 : }
464 :
465 0 : if (active) {
466 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
467 :
468 0 : association_complete_i(reader.readerId);
469 0 : }
470 0 : }
471 :
472 :
473 0 : ReplayerImpl::ReaderInfo::ReaderInfo(const char* filter,
474 : const DDS::StringSeq& params,
475 : DomainParticipantImpl* participant,
476 0 : bool durable)
477 0 : : expected_sequence_(SequenceNumber::SEQUENCENUMBER_UNKNOWN())
478 0 : , durable_(durable)
479 : {
480 : ACE_UNUSED_ARG(filter);
481 : ACE_UNUSED_ARG(params);
482 : ACE_UNUSED_ARG(participant);
483 0 : }
484 :
485 :
486 0 : ReplayerImpl::ReaderInfo::~ReaderInfo()
487 : {
488 0 : }
489 :
490 : void
491 0 : ReplayerImpl::association_complete_i(const GUID_t& remote_id)
492 : {
493 : DBG_ENTRY_LVL("ReplayerImpl", "association_complete_i", 6);
494 : // bool reader_durable = false;
495 : {
496 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
497 0 : if (OpenDDS::DCPS::insert(readers_, remote_id) == -1) {
498 0 : ACE_ERROR((LM_ERROR,
499 : ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::association_complete_i: ")
500 : ACE_TEXT("insert %C from pending failed.\n"),
501 : LogGuid(remote_id).c_str()));
502 : }
503 : // RepoIdToReaderInfoMap::const_iterator it = reader_info_.find(remote_id);
504 : // if (it != reader_info_.end()) {
505 : // reader_durable = it->second.durable_;
506 : // }
507 0 : }
508 :
509 0 : if (!is_bit_) {
510 :
511 0 : const DDS::InstanceHandle_t handle = participant_servant_->assign_handle(remote_id);
512 :
513 : {
514 : // protect publication_match_status_ and status changed flags.
515 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
516 :
517 : // update the publication_match_status_
518 0 : ++publication_match_status_.total_count;
519 0 : ++publication_match_status_.total_count_change;
520 0 : ++publication_match_status_.current_count;
521 0 : ++publication_match_status_.current_count_change;
522 :
523 0 : if (OpenDDS::DCPS::bind(id_to_handle_map_, remote_id, handle) != 0) {
524 0 : ACE_DEBUG((LM_WARNING,
525 : ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::association_complete_i: ")
526 : ACE_TEXT("id_to_handle_map_%C = 0x%x failed.\n"),
527 : LogGuid(remote_id).c_str(),
528 : handle));
529 0 : return;
530 :
531 0 : } else if (DCPS_debug_level > 4) {
532 0 : ACE_DEBUG((LM_DEBUG,
533 : ACE_TEXT("(%P|%t) ReplayerImpl::association_complete_i: ")
534 : ACE_TEXT("id_to_handle_map_%C = 0x%x.\n"),
535 : LogGuid(remote_id).c_str(),
536 : handle));
537 : }
538 :
539 0 : publication_match_status_.last_subscription_handle = handle;
540 :
541 0 : }
542 :
543 :
544 0 : if (listener_.in()) {
545 0 : listener_->on_replayer_matched(this,
546 0 : publication_match_status_);
547 :
548 : // TBD - why does the spec say to change this but not
549 : // change the ChangeFlagStatus after a listener call?
550 0 : publication_match_status_.total_count_change = 0;
551 0 : publication_match_status_.current_count_change = 0;
552 : }
553 :
554 : }
555 :
556 : }
557 :
558 : void
559 0 : ReplayerImpl::remove_associations(const ReaderIdSeq & readers,
560 : CORBA::Boolean notify_lost)
561 : {
562 0 : if (DCPS_debug_level >= 1) {
563 0 : ACE_DEBUG((LM_DEBUG,
564 : ACE_TEXT("(%P|%t) ReplayerImpl::remove_associations: ")
565 : ACE_TEXT("bit %d local %C remote %C num remotes %d\n"),
566 : is_bit_,
567 : LogGuid(publication_id_).c_str(),
568 : LogGuid(readers[0]).c_str(),
569 : readers.length()));
570 : }
571 :
572 0 : this->stop_associating(readers.get_buffer(), readers.length());
573 :
574 0 : ReaderIdSeq fully_associated_readers;
575 0 : CORBA::ULong fully_associated_len = 0;
576 0 : ReaderIdSeq rds;
577 0 : CORBA::ULong rds_len = 0;
578 0 : DDS::InstanceHandleSeq handles;
579 :
580 : {
581 : // Ensure the same acquisition order as in wait_for_acknowledgments().
582 : // ACE_GUARD(ACE_SYNCH_MUTEX, wfaGuard, this->wfaLock_);
583 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
584 :
585 : //Remove the readers from fully associated reader list.
586 : //If the supplied reader is not in the cached reader list then it is
587 : //already removed. We just need remove the readers in the list that have
588 : //not been removed.
589 :
590 0 : CORBA::ULong len = readers.length();
591 :
592 0 : for (CORBA::ULong i = 0; i < len; ++i) {
593 : //Remove the readers from fully associated reader list. If it's not
594 : //in there, the association_complete() is not called yet and remove it
595 : //from pending list.
596 :
597 0 : if (OpenDDS::DCPS::remove(readers_, readers[i]) == 0) {
598 0 : ++fully_associated_len;
599 0 : fully_associated_readers.length(fully_associated_len);
600 0 : fully_associated_readers [fully_associated_len - 1] = readers[i];
601 :
602 : // Remove this reader from the ACK sequence map if its there.
603 : // This is where we need to be holding the wfaLock_ obtained
604 : // above.
605 : RepoIdToSequenceMap::iterator where
606 0 : = this->idToSequence_.find(readers[i]);
607 :
608 0 : if (where != this->idToSequence_.end()) {
609 0 : this->idToSequence_.erase(where);
610 :
611 : // It is possible that this subscription was causing the wait
612 : // to continue, so give the opportunity to find out.
613 : // this->wfaCondition_.broadcast();
614 : }
615 :
616 0 : ++rds_len;
617 0 : rds.length(rds_len);
618 0 : rds [rds_len - 1] = readers[i];
619 : }
620 0 : reader_info_.erase(readers[i]);
621 : //else reader is already removed which indicates remove_association()
622 : //is called multiple times.
623 : }
624 :
625 0 : if (fully_associated_len > 0 && !is_bit_) {
626 : // The reader should be in the id_to_handle map at this time
627 0 : this->lookup_instance_handles(fully_associated_readers, handles);
628 :
629 0 : for (CORBA::ULong i = 0; i < fully_associated_len; ++i) {
630 0 : id_to_handle_map_.erase(fully_associated_readers[i]);
631 : }
632 : }
633 :
634 : // wfaGuard.release();
635 :
636 : // Mirror the PUBLICATION_MATCHED_STATUS processing from
637 : // association_complete() here.
638 0 : if (!this->is_bit_) {
639 :
640 : // Derive the change in the number of subscriptions reading this writer.
641 : int matchedSubscriptions =
642 0 : static_cast<int>(this->id_to_handle_map_.size());
643 0 : this->publication_match_status_.current_count_change =
644 0 : matchedSubscriptions - this->publication_match_status_.current_count;
645 :
646 : // Only process status if the number of subscriptions has changed.
647 0 : if (this->publication_match_status_.current_count_change != 0) {
648 0 : this->publication_match_status_.current_count = matchedSubscriptions;
649 :
650 : /// Section 7.1.4.1: total_count will not decrement.
651 :
652 : /// @TODO: Reconcile this with the verbiage in section 7.1.4.1
653 : /// TODO: Should rds_len really be fully_associated_len here??
654 0 : this->publication_match_status_.last_subscription_handle =
655 0 : handles[rds_len - 1];
656 :
657 :
658 0 : if (listener_.in()) {
659 0 : listener_->on_replayer_matched(
660 : this,
661 0 : this->publication_match_status_);
662 :
663 : // Listener consumes the change.
664 0 : this->publication_match_status_.total_count_change = 0;
665 0 : this->publication_match_status_.current_count_change = 0;
666 : }
667 :
668 : }
669 : }
670 0 : }
671 :
672 0 : for (CORBA::ULong i = 0; i < rds.length(); ++i) {
673 0 : this->disassociate(rds[i]);
674 : }
675 :
676 : // If this remove_association is invoked when the InfoRepo
677 : // detects a lost reader then make a callback to notify
678 : // subscription lost.
679 0 : if (notify_lost && handles.length() > 0) {
680 0 : this->notify_publication_lost(handles);
681 : }
682 :
683 0 : for (unsigned int i = 0; i < handles.length(); ++i) {
684 0 : participant_servant_->return_handle(handles[i]);
685 : }
686 0 : }
687 :
688 0 : void ReplayerImpl::remove_all_associations()
689 : {
690 0 : this->stop_associating();
691 :
692 0 : OpenDDS::DCPS::ReaderIdSeq readers;
693 : CORBA::ULong size;
694 : {
695 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, lock_);
696 :
697 0 : size = static_cast<CORBA::ULong>(readers_.size());
698 0 : readers.length(size);
699 :
700 0 : RepoIdSet::iterator itEnd = readers_.end();
701 0 : int i = 0;
702 :
703 0 : for (RepoIdSet::iterator it = readers_.begin(); it != itEnd; ++it) {
704 0 : readers[i++] = *it;
705 : }
706 0 : }
707 :
708 : try {
709 0 : if (0 < size) {
710 0 : CORBA::Boolean dont_notify_lost = false;
711 0 : this->remove_associations(readers, dont_notify_lost);
712 : }
713 :
714 0 : } catch (const CORBA::Exception&) {
715 0 : }
716 :
717 0 : transport_stop();
718 0 : }
719 :
720 : void
721 0 : ReplayerImpl::register_for_reader(const GUID_t& participant,
722 : const GUID_t& writerid,
723 : const GUID_t& readerid,
724 : const TransportLocatorSeq& locators,
725 : DiscoveryListener* listener)
726 : {
727 0 : TransportClient::register_for_reader(participant, writerid, readerid, locators, listener);
728 0 : }
729 :
730 : void
731 0 : ReplayerImpl::unregister_for_reader(const GUID_t& participant,
732 : const GUID_t& writerid,
733 : const GUID_t& readerid)
734 : {
735 0 : TransportClient::unregister_for_reader(participant, writerid, readerid);
736 0 : }
737 :
738 : void
739 0 : ReplayerImpl::update_incompatible_qos(const IncompatibleQosStatus& status)
740 : {
741 :
742 :
743 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
744 :
745 : // copy status and increment change
746 0 : offered_incompatible_qos_status_.total_count = status.total_count;
747 0 : offered_incompatible_qos_status_.total_count_change +=
748 0 : status.count_since_last_send;
749 0 : offered_incompatible_qos_status_.last_policy_id = status.last_policy_id;
750 0 : offered_incompatible_qos_status_.policies = status.policies;
751 :
752 0 : }
753 :
754 : void
755 0 : ReplayerImpl::update_subscription_params(const GUID_t& readerId,
756 : const DDS::StringSeq& params)
757 : {
758 : ACE_UNUSED_ARG(readerId);
759 : ACE_UNUSED_ARG(params);
760 0 : }
761 :
762 : bool
763 0 : ReplayerImpl::check_transport_qos(const TransportInst&)
764 : {
765 : // DataWriter does not impose any constraints on which transports
766 : // may be used based on QoS.
767 0 : return true;
768 : }
769 :
770 0 : GUID_t ReplayerImpl::get_guid() const
771 : {
772 0 : return this->publication_id_;
773 : }
774 :
775 : CORBA::Long
776 0 : ReplayerImpl::get_priority_value(const AssociationData&) const
777 : {
778 0 : return this->qos_.transport_priority.value;
779 : }
780 :
781 : void
782 0 : ReplayerImpl::data_delivered(const DataSampleElement* sample)
783 : {
784 : DBG_ENTRY_LVL("ReplayerImpl","data_delivered",6);
785 0 : if (!(sample->get_pub_id() == this->publication_id_)) {
786 0 : ACE_ERROR((LM_ERROR,
787 : ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::data_delivered: ")
788 : ACE_TEXT(" The publication id %C from delivered element ")
789 : ACE_TEXT("does not match the datawriter's id %C\n"),
790 : LogGuid(sample->get_pub_id()).c_str(),
791 : LogGuid(publication_id_).c_str()));
792 0 : return;
793 : }
794 0 : DataSampleElement* elem = const_cast<DataSampleElement*>(sample);
795 : // this->data_container_->data_delivered(sample);
796 0 : ACE_DES_FREE(elem, sample_list_element_allocator_->free, DataSampleElement);
797 0 : ++data_delivered_count_;
798 :
799 : {
800 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
801 0 : if (--pending_write_count_ == 0) {
802 0 : empty_condition_.notify_all();
803 : }
804 0 : }
805 : }
806 :
807 : void
808 0 : ReplayerImpl::control_delivered(const Message_Block_Ptr& sample)
809 : {
810 : ACE_UNUSED_ARG(sample);
811 0 : }
812 :
813 : void
814 0 : ReplayerImpl::data_dropped(const DataSampleElement* sample,
815 : bool dropped_by_transport)
816 : {
817 : DBG_ENTRY_LVL("ReplayerImpl","data_dropped",6);
818 : // this->data_container_->data_dropped(element, dropped_by_transport);
819 : ACE_UNUSED_ARG(dropped_by_transport);
820 0 : DataSampleElement* elem = const_cast<DataSampleElement*>(sample);
821 0 : ACE_DES_FREE(elem, sample_list_element_allocator_->free, DataSampleElement);
822 0 : ++data_dropped_count_;
823 : {
824 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
825 0 : if ((--pending_write_count_) == 0) {
826 0 : empty_condition_.notify_all();
827 : }
828 0 : }
829 : }
830 :
831 : void
832 0 : ReplayerImpl::control_dropped(const Message_Block_Ptr& sample,
833 : bool /* dropped_by_transport */)
834 : {
835 : ACE_UNUSED_ARG(sample);
836 0 : }
837 :
838 : void
839 0 : ReplayerImpl::notify_publication_disconnected(const ReaderIdSeq& subids)
840 : {
841 : ACE_UNUSED_ARG(subids);
842 0 : }
843 :
844 : void
845 0 : ReplayerImpl::notify_publication_reconnected(const ReaderIdSeq& subids)
846 : {
847 : ACE_UNUSED_ARG(subids);
848 0 : }
849 :
850 : void
851 0 : ReplayerImpl::notify_publication_lost(const ReaderIdSeq& subids)
852 : {
853 : ACE_UNUSED_ARG(subids);
854 0 : }
855 :
856 : void
857 0 : ReplayerImpl::notify_publication_lost(const DDS::InstanceHandleSeq& handles)
858 : {
859 : ACE_UNUSED_ARG(handles);
860 0 : }
861 :
862 :
863 : void
864 0 : ReplayerImpl::retrieve_inline_qos_data(TransportSendListener::InlineQosData& qos_data) const
865 : {
866 0 : qos_data.pub_qos = this->publisher_qos_;
867 0 : qos_data.dw_qos = this->qos_;
868 0 : qos_data.topic_name = this->topic_name_.in();
869 0 : }
870 :
871 : DDS::ReturnCode_t
872 0 : ReplayerImpl::write (const RawDataSample* samples,
873 : int num_samples,
874 : DDS::InstanceHandle_t* reader_ih_ptr)
875 : {
876 : DBG_ENTRY_LVL("ReplayerImpl","write",6);
877 :
878 : OpenDDS::DCPS::GUID_t repo_id;
879 0 : if (reader_ih_ptr) {
880 0 : repo_id = this->participant_servant_->get_repoid(*reader_ih_ptr);
881 0 : if (repo_id == GUID_UNKNOWN) {
882 0 : ACE_ERROR_RETURN((LM_ERROR,
883 : ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::write: ")
884 : ACE_TEXT("Invalid reader instance handle (%d)\n"), *reader_ih_ptr),
885 : DDS::RETCODE_ERROR);
886 : }
887 : }
888 :
889 0 : SendStateDataSampleList list;
890 :
891 0 : for (int i = 0; i < num_samples; ++i) {
892 0 : DataSampleElement* element = 0;
893 :
894 0 : ACE_NEW_MALLOC_RETURN(
895 : element,
896 : static_cast<DataSampleElement*>(
897 : sample_list_element_allocator_->malloc(
898 : sizeof(DataSampleElement))),
899 : DataSampleElement(publication_id_,
900 : this,
901 : PublicationInstance_rch()),
902 : DDS::RETCODE_ERROR);
903 :
904 0 : element->get_header().byte_order_ = samples[i].sample_byte_order_;
905 0 : element->get_header().publication_id_ = this->publication_id_;
906 0 : list.enqueue_tail(element);
907 0 : Message_Block_Ptr temp;
908 0 : Message_Block_Ptr sample(samples[i].sample_->duplicate());
909 0 : DDS::ReturnCode_t ret = create_sample_data_message(move(sample),
910 : element->get_header(),
911 : temp,
912 0 : samples[i].source_timestamp_,
913 : false);
914 0 : element->set_sample(move(temp));
915 0 : if (reader_ih_ptr) {
916 0 : element->set_num_subs(1);
917 0 : element->set_sub_id(0, repo_id);
918 : }
919 :
920 0 : if (ret != DDS::RETCODE_OK) {
921 : // we need to free the list
922 0 : while (list.dequeue(element)) {
923 0 : ACE_DES_FREE(element, sample_list_element_allocator_->free, DataSampleElement);
924 : }
925 :
926 0 : return ret;
927 : }
928 0 : }
929 :
930 : {
931 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, DDS::RETCODE_ERROR);
932 0 : ++pending_write_count_;
933 0 : }
934 :
935 0 : this->send(list);
936 :
937 0 : for (RepoIdToReaderInfoMap::iterator iter = reader_info_.begin(),
938 0 : end = reader_info_.end(); iter != end; ++iter) {
939 0 : iter->second.expected_sequence_ = sequence_number_;
940 : }
941 :
942 0 : return DDS::RETCODE_OK;
943 0 : }
944 :
945 : DDS::ReturnCode_t
946 0 : ReplayerImpl::write(const RawDataSample& sample)
947 : {
948 0 : return this->write(&sample, 1, 0);
949 : }
950 :
951 : DDS::ReturnCode_t
952 0 : ReplayerImpl::create_sample_data_message(Message_Block_Ptr data,
953 : DataSampleHeader& header_data,
954 : Message_Block_Ptr& message,
955 : const DDS::Time_t& source_timestamp,
956 : bool content_filter)
957 : {
958 0 : header_data.message_id_ = SAMPLE_DATA;
959 0 : header_data.coherent_change_ = content_filter;
960 :
961 0 : header_data.content_filter_ = false;
962 0 : header_data.cdr_encapsulation_ = this->cdr_encapsulation();
963 0 : header_data.message_length_ = static_cast<ACE_UINT32>(data->total_length());
964 0 : header_data.sequence_repair_ = need_sequence_repair();
965 0 : if (this->sequence_number_ == SequenceNumber::SEQUENCENUMBER_UNKNOWN()) {
966 0 : this->sequence_number_ = SequenceNumber();
967 : } else {
968 0 : ++this->sequence_number_;
969 : }
970 0 : header_data.sequence_ = this->sequence_number_;
971 0 : header_data.source_timestamp_sec_ = source_timestamp.sec;
972 0 : header_data.source_timestamp_nanosec_ = source_timestamp.nanosec;
973 :
974 0 : if (qos_.lifespan.duration.sec != DDS::DURATION_INFINITE_SEC
975 0 : || qos_.lifespan.duration.nanosec != DDS::DURATION_INFINITE_NSEC) {
976 0 : header_data.lifespan_duration_ = true;
977 0 : header_data.lifespan_duration_sec_ = qos_.lifespan.duration.sec;
978 0 : header_data.lifespan_duration_nanosec_ = qos_.lifespan.duration.nanosec;
979 : }
980 :
981 : // header_data.publication_id_ = publication_id_;
982 : // header_data.publisher_id_ = this->publisher_servant_->publisher_id_;
983 : ACE_Message_Block* tmp;
984 0 : ACE_NEW_MALLOC_RETURN(tmp,
985 : static_cast<ACE_Message_Block*>(
986 : mb_allocator_->malloc(sizeof(ACE_Message_Block))),
987 : ACE_Message_Block(DataSampleHeader::get_max_serialized_size(),
988 : ACE_Message_Block::MB_DATA,
989 : data.release(), //cont
990 : 0, //data
991 : header_allocator_.get(), //alloc_strategy
992 : 0, //locking_strategy
993 : ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
994 : ACE_Time_Value::zero,
995 : ACE_Time_Value::max_time,
996 : db_allocator_.get(),
997 : mb_allocator_.get()),
998 : DDS::RETCODE_ERROR);
999 0 : message.reset(tmp);
1000 0 : *message << header_data;
1001 0 : return DDS::RETCODE_OK;
1002 : }
1003 :
1004 : void
1005 0 : ReplayerImpl::lookup_instance_handles(const ReaderIdSeq& ids,
1006 : DDS::InstanceHandleSeq & hdls)
1007 : {
1008 0 : CORBA::ULong const num_rds = ids.length();
1009 :
1010 0 : if (DCPS_debug_level > 9) {
1011 0 : OPENDDS_STRING separator;
1012 0 : OPENDDS_STRING buffer;
1013 :
1014 0 : for (CORBA::ULong i = 0; i < num_rds; ++i) {
1015 0 : buffer += separator + LogGuid(ids[i]).conv_;
1016 0 : separator = ", ";
1017 : }
1018 :
1019 0 : ACE_DEBUG((LM_DEBUG,
1020 : ACE_TEXT("(%P|%t) ReplayerImpl::lookup_instance_handles: ")
1021 : ACE_TEXT("searching for handles for reader Ids: %C.\n"),
1022 : buffer.c_str()));
1023 0 : }
1024 :
1025 0 : hdls.length(num_rds);
1026 :
1027 0 : for (CORBA::ULong i = 0; i < num_rds; ++i) {
1028 0 : hdls[i] = participant_servant_->lookup_handle(ids[i]);
1029 : }
1030 0 : }
1031 :
1032 : bool
1033 0 : ReplayerImpl::need_sequence_repair() const
1034 : {
1035 0 : for (RepoIdToReaderInfoMap::const_iterator it = reader_info_.begin(),
1036 0 : end = reader_info_.end(); it != end; ++it) {
1037 0 : if (it->second.expected_sequence_ != sequence_number_) {
1038 0 : return true;
1039 : }
1040 : }
1041 0 : return false;
1042 : }
1043 :
1044 : DDS::InstanceHandle_t
1045 0 : ReplayerImpl::get_instance_handle()
1046 : {
1047 0 : return get_entity_instance_handle(publication_id_, rchandle_from(participant_servant_));
1048 : }
1049 :
1050 : DDS::ReturnCode_t
1051 0 : ReplayerImpl::write_to_reader (DDS::InstanceHandle_t subscription,
1052 : const RawDataSample& sample )
1053 : {
1054 0 : return write(&sample, 1, &subscription);
1055 : }
1056 :
1057 : DDS::ReturnCode_t
1058 0 : ReplayerImpl::write_to_reader (DDS::InstanceHandle_t subscription,
1059 : const RawDataSampleList& samples )
1060 : {
1061 0 : if (!samples.empty())
1062 0 : return write(&samples[0], static_cast<int>(samples.size()), &subscription);
1063 0 : return DDS::RETCODE_ERROR;
1064 : }
1065 :
1066 : } // namespace DCPS
1067 : } // namespace
1068 :
1069 : OPENDDS_END_VERSIONED_NAMESPACE_DECL
|