Line data Source code
1 : /*
2 : * Distributed under the OpenDDS License.
3 : * See: http://www.opendds.org/license.html
4 : */
5 :
6 : #include <DCPS/DdsDcps_pch.h> // Only the _pch include should start with DCPS/
7 :
8 : #include "DataWriterImpl.h"
9 :
10 : #include "FeatureDisabledQosCheck.h"
11 : #include "DomainParticipantImpl.h"
12 : #include "PublisherImpl.h"
13 : #include "Service_Participant.h"
14 : #include "GuidConverter.h"
15 : #include "TopicImpl.h"
16 : #include "PublicationInstance.h"
17 : #include "Serializer.h"
18 : #include "Transient_Kludge.h"
19 : #include "DataDurabilityCache.h"
20 : #include "MonitorFactory.h"
21 : #include "SendStateDataSampleList.h"
22 : #include "DataSampleElement.h"
23 : #include "Util.h"
24 : #include "DCPS_Utils.h"
25 : #include "XTypes/TypeObject.h"
26 : #include "TypeSupportImpl.h"
27 : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
28 : # include "CoherentChangeControl.h"
29 : #endif
30 : #include "AssociationData.h"
31 : #include "transport/framework/EntryExit.h"
32 : #include "transport/framework/TransportExceptions.h"
33 : #include "transport/framework/TransportRegistry.h"
34 : #ifndef DDS_HAS_MINIMUM_BIT
35 : # include "BuiltInTopicUtils.h"
36 : #endif
37 :
38 : #ifndef DDS_HAS_MINIMUM_BIT
39 : # include <dds/DdsDcpsCoreTypeSupportC.h>
40 : #endif // !defined (DDS_HAS_MINIMUM_BIT)
41 : #include <dds/DdsDcpsCoreC.h>
42 : #include <dds/DdsDcpsGuidTypeSupportImpl.h>
43 :
44 : #include <ace/Reactor.h>
45 : #include <ace/Auto_Ptr.h>
46 :
47 : #include <stdexcept>
48 :
49 : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
50 :
51 : namespace OpenDDS {
52 : namespace DCPS {
53 :
54 : //TBD - add check for enabled in most methods.
55 : // currently this is not needed because auto_enable_created_entities
56 : // cannot be false.
57 :
58 0 : DataWriterImpl::DataWriterImpl()
59 0 : : data_dropped_count_(0)
60 0 : , data_delivered_count_(0)
61 0 : , controlTracker("DataWriterImpl")
62 0 : , n_chunks_(TheServiceParticipant->n_chunks())
63 0 : , association_chunk_multiplier_(TheServiceParticipant->association_chunk_multiplier())
64 0 : , qos_(TheServiceParticipant->initial_DataWriterQos())
65 0 : , skip_serialize_(false)
66 0 : , db_lock_pool_(new DataBlockLockPool((unsigned long)TheServiceParticipant->n_chunks()))
67 0 : , topic_id_(GUID_UNKNOWN)
68 0 : , topic_servant_(0)
69 0 : , type_support_(0)
70 0 : , listener_mask_(DEFAULT_STATUS_MASK)
71 0 : , domain_id_(0)
72 0 : , publication_id_(GUID_UNKNOWN)
73 0 : , sequence_number_(SequenceNumber::SEQUENCENUMBER_UNKNOWN())
74 0 : , coherent_(false)
75 0 : , coherent_samples_(0)
76 0 : , liveliness_lost_(false)
77 0 : , reactor_(0)
78 0 : , liveliness_check_interval_(TimeDuration::max_value)
79 0 : , last_deadline_missed_total_count_(0)
80 0 : , is_bit_(false)
81 0 : , min_suspended_transaction_id_(0)
82 0 : , max_suspended_transaction_id_(0)
83 0 : , liveliness_asserted_(false)
84 0 : , liveness_timer_(make_rch<LivenessTimer>(ref(*this)))
85 : {
86 0 : liveliness_lost_status_.total_count = 0;
87 0 : liveliness_lost_status_.total_count_change = 0;
88 :
89 0 : offered_deadline_missed_status_.total_count = 0;
90 0 : offered_deadline_missed_status_.total_count_change = 0;
91 0 : offered_deadline_missed_status_.last_instance_handle = DDS::HANDLE_NIL;
92 :
93 0 : offered_incompatible_qos_status_.total_count = 0;
94 0 : offered_incompatible_qos_status_.total_count_change = 0;
95 0 : offered_incompatible_qos_status_.last_policy_id = 0;
96 0 : offered_incompatible_qos_status_.policies.length(0);
97 :
98 0 : publication_match_status_.total_count = 0;
99 0 : publication_match_status_.total_count_change = 0;
100 0 : publication_match_status_.current_count = 0;
101 0 : publication_match_status_.current_count_change = 0;
102 0 : publication_match_status_.last_subscription_handle = DDS::HANDLE_NIL;
103 :
104 0 : monitor_.reset(TheServiceParticipant->monitor_factory_->create_data_writer_monitor(this));
105 0 : periodic_monitor_.reset(TheServiceParticipant->monitor_factory_->create_data_writer_periodic_monitor(this));
106 0 : }
107 :
108 : // This method is called when there are no longer any reference to the
109 : // the servant.
110 0 : DataWriterImpl::~DataWriterImpl()
111 : {
112 : DBG_ENTRY_LVL("DataWriterImpl", "~DataWriterImpl", 6);
113 : #ifndef OPENDDS_SAFETY_PROFILE
114 0 : RcHandle<DomainParticipantImpl> participant = participant_servant_.lock();
115 0 : if (participant) {
116 0 : XTypes::TypeLookupService_rch type_lookup_service = participant->get_type_lookup_service();
117 0 : if (type_lookup_service) {
118 0 : type_lookup_service->remove_guid_from_dynamic_map(publication_id_);
119 : }
120 0 : }
121 : #endif
122 0 : }
123 :
124 : // this method is called when delete_datawriter is called.
125 : void
126 0 : DataWriterImpl::cleanup()
127 : {
128 : // As first step set our listener to nill which will prevent us from calling
129 : // back onto the listener at the moment the related DDS entity has been
130 : // deleted
131 0 : set_listener(0, NO_STATUS_MASK);
132 0 : topic_servant_ = 0;
133 0 : type_support_ = 0;
134 0 : }
135 :
136 : void
137 0 : DataWriterImpl::init(
138 : TopicImpl* topic_servant,
139 : const DDS::DataWriterQos& qos,
140 : DDS::DataWriterListener_ptr a_listener,
141 : const DDS::StatusMask& mask,
142 : WeakRcHandle<DomainParticipantImpl> participant_servant,
143 : PublisherImpl* publisher_servant)
144 : {
145 : DBG_ENTRY_LVL("DataWriterImpl", "init", 6);
146 0 : topic_servant_ = topic_servant;
147 0 : type_support_ = dynamic_cast<TypeSupportImpl*>(topic_servant->get_type_support());
148 0 : topic_name_ = topic_servant_->get_name();
149 0 : topic_id_ = topic_servant_->get_id();
150 0 : type_name_ = topic_servant_->get_type_name();
151 :
152 : #if !defined (DDS_HAS_MINIMUM_BIT)
153 0 : is_bit_ = topicIsBIT(topic_name_.in(), type_name_.in());
154 : #endif // !defined (DDS_HAS_MINIMUM_BIT)
155 :
156 0 : qos_ = qos;
157 0 : passed_qos_ = qos;
158 :
159 0 : set_listener(a_listener, mask);
160 :
161 : // Only store the participant pointer, since it is our "grand"
162 : // parent, we will exist as long as it does.
163 0 : participant_servant_ = participant_servant;
164 :
165 0 : RcHandle<DomainParticipantImpl> participant = participant_servant.lock();
166 0 : domain_id_ = participant->get_domain_id();
167 :
168 : // Only store the publisher pointer, since it is our parent, we will
169 : // exist as long as it does.
170 0 : publisher_servant_ = *publisher_servant;
171 :
172 0 : this->reactor_ = TheServiceParticipant->timer();
173 0 : }
174 :
175 : DDS::InstanceHandle_t
176 0 : DataWriterImpl::get_instance_handle()
177 : {
178 0 : const RcHandle<DomainParticipantImpl> participant = participant_servant_.lock();
179 0 : return get_entity_instance_handle(publication_id_, participant);
180 0 : }
181 :
182 : DDS::InstanceHandle_t
183 0 : DataWriterImpl::get_next_handle()
184 : {
185 0 : RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock();
186 0 : if (participant) {
187 0 : return participant->assign_handle();
188 : }
189 0 : return DDS::HANDLE_NIL;
190 0 : }
191 :
192 0 : void DataWriterImpl::return_handle(DDS::InstanceHandle_t handle)
193 : {
194 0 : const RcHandle<DomainParticipantImpl> participant = participant_servant_.lock();
195 0 : if (participant) {
196 0 : participant->return_handle(handle);
197 : }
198 0 : }
199 :
200 : RcHandle<BitSubscriber>
201 0 : DataWriterImpl::get_builtin_subscriber_proxy() const
202 : {
203 0 : RcHandle<DomainParticipantImpl> participant_servant = participant_servant_.lock();
204 0 : if (participant_servant) {
205 0 : return participant_servant->get_builtin_subscriber_proxy();
206 : }
207 :
208 0 : return RcHandle<BitSubscriber>();
209 0 : }
210 :
211 : void
212 0 : DataWriterImpl::add_association(const GUID_t& yourId,
213 : const ReaderAssociation& reader,
214 : bool active)
215 : {
216 : DBG_ENTRY_LVL("DataWriterImpl", "add_association", 6);
217 :
218 0 : if (DCPS_debug_level) {
219 0 : ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DataWriterImpl::add_association - ")
220 : ACE_TEXT("bit %d local %C remote %C\n"), is_bit_,
221 : LogGuid(yourId).c_str(),
222 : LogGuid(reader.readerId).c_str()));
223 : }
224 :
225 0 : if (get_deleted()) {
226 0 : if (DCPS_debug_level)
227 0 : ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DataWriterImpl::add_association")
228 : ACE_TEXT(" This is a deleted datawriter, ignoring add.\n")));
229 :
230 0 : return;
231 : }
232 :
233 0 : check_and_set_repo_id(yourId);
234 :
235 : {
236 0 : ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
237 0 : reader_info_.insert(std::make_pair(reader.readerId,
238 0 : ReaderInfo(reader.filterClassName,
239 0 : TheServiceParticipant->publisher_content_filter() ? reader.filterExpression : "",
240 0 : reader.exprParams, participant_servant_,
241 0 : reader.readerQos.durability.kind > DDS::VOLATILE_DURABILITY_QOS)));
242 0 : }
243 :
244 0 : if (DCPS_debug_level > 4) {
245 0 : ACE_DEBUG((LM_DEBUG,
246 : ACE_TEXT("(%P|%t) DataWriterImpl::add_association(): ")
247 : ACE_TEXT("adding subscription to publication %C with priority %d.\n"),
248 : LogGuid(get_guid()).c_str(),
249 : qos_.transport_priority.value));
250 : }
251 :
252 0 : AssociationData data;
253 0 : data.remote_id_ = reader.readerId;
254 0 : data.remote_data_ = reader.readerTransInfo;
255 0 : data.discovery_locator_ = reader.readerDiscInfo;
256 0 : data.participant_discovered_at_ = reader.participantDiscoveredAt;
257 0 : data.remote_transport_context_ = reader.transportContext;
258 0 : data.remote_reliable_ =
259 0 : (reader.readerQos.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS);
260 0 : data.remote_durable_ =
261 0 : (reader.readerQos.durability.kind > DDS::VOLATILE_DURABILITY_QOS);
262 :
263 0 : if (associate(data, active)) {
264 0 : const Observer_rch observer = get_observer(Observer::e_ASSOCIATED);
265 0 : if (observer) {
266 0 : observer->on_associated(this, data.remote_id_);
267 : }
268 0 : } else {
269 : //FUTURE: inform inforepo and try again as passive peer
270 0 : if (DCPS_debug_level) {
271 0 : ACE_ERROR((LM_ERROR,
272 : ACE_TEXT("(%P|%t) DataWriterImpl::add_association: ")
273 : ACE_TEXT("ERROR: transport layer failed to associate.\n")));
274 : }
275 : }
276 0 : }
277 :
278 : void
279 0 : DataWriterImpl::transport_assoc_done(int flags, const GUID_t& remote_id)
280 : {
281 : DBG_ENTRY_LVL("DataWriterImpl", "transport_assoc_done", 6);
282 :
283 0 : if (!(flags & ASSOC_OK)) {
284 0 : if (DCPS_debug_level) {
285 0 : ACE_ERROR((LM_ERROR,
286 : ACE_TEXT("(%P|%t) DataWriterImpl::transport_assoc_done: ")
287 : ACE_TEXT("ERROR: transport layer failed to associate %C\n"),
288 : LogGuid(remote_id).c_str()));
289 : }
290 :
291 0 : return;
292 : }
293 :
294 0 : ACE_Guard<ACE_Recursive_Thread_Mutex> guard(lock_);
295 :
296 0 : if (DCPS_debug_level) {
297 0 : ACE_DEBUG((LM_INFO,
298 : ACE_TEXT("(%P|%t) DataWriterImpl::transport_assoc_done: ")
299 : ACE_TEXT("writer %C succeeded in associating with reader %C\n"),
300 : LogGuid(publication_id_).c_str(),
301 : LogGuid(remote_id).c_str()));
302 : }
303 :
304 0 : if (flags & ASSOC_ACTIVE) {
305 :
306 : // Have we already received an association_complete() callback?
307 0 : if (DCPS_debug_level) {
308 0 : ACE_DEBUG((LM_DEBUG,
309 : ACE_TEXT("(%P|%t) DataWriterImpl::transport_assoc_done: ")
310 : ACE_TEXT("writer %C reader %C calling association_complete_i\n"),
311 : LogGuid(publication_id_).c_str(),
312 : LogGuid(remote_id).c_str()));
313 : }
314 0 : association_complete_i(remote_id);
315 :
316 : } else {
317 : // In the current implementation, DataWriter is always active, so this
318 : // code will not be applicable.
319 0 : if (DCPS_debug_level) {
320 0 : ACE_ERROR((LM_ERROR,
321 : ACE_TEXT("(%P|%t) DataWriterImpl::transport_assoc_done: ")
322 : ACE_TEXT("ERROR: DataWriter (%C) should always be active in current implementation\n"),
323 : LogGuid(publication_id_).c_str()));
324 : }
325 : }
326 0 : }
327 :
328 0 : DataWriterImpl::ReaderInfo::ReaderInfo(const char* filterClassName,
329 : const char* filter,
330 : const DDS::StringSeq& params,
331 : WeakRcHandle<DomainParticipantImpl> participant,
332 0 : bool durable)
333 : #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
334 0 : : participant_(participant)
335 0 : , filter_class_name_(filterClassName)
336 0 : , filter_(filter)
337 0 : , expression_params_(params)
338 0 : , expected_sequence_(SequenceNumber::SEQUENCENUMBER_UNKNOWN())
339 0 : , durable_(durable)
340 : {
341 0 : RcHandle<DomainParticipantImpl> part = participant_.lock();
342 0 : if (part && *filter) {
343 0 : eval_ = part->get_filter_eval(filter);
344 : }
345 0 : }
346 : #else
347 : : expected_sequence_(SequenceNumber::SEQUENCENUMBER_UNKNOWN())
348 : , durable_(durable)
349 : {
350 : ACE_UNUSED_ARG(filterClassName);
351 : ACE_UNUSED_ARG(filter);
352 : ACE_UNUSED_ARG(params);
353 : ACE_UNUSED_ARG(participant);
354 : }
355 : #endif // OPENDDS_NO_CONTENT_FILTERED_TOPIC
356 :
357 0 : DataWriterImpl::ReaderInfo::~ReaderInfo()
358 : {
359 : #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
360 0 : eval_ = RcHandle<FilterEvaluator>();
361 0 : RcHandle<DomainParticipantImpl> participant = participant_.lock();
362 0 : if (participant && !filter_.empty()) {
363 0 : participant->deref_filter_eval(filter_.c_str());
364 : }
365 :
366 : #endif // OPENDDS_NO_CONTENT_FILTERED_TOPIC
367 0 : }
368 :
369 : void
370 0 : DataWriterImpl::association_complete_i(const GUID_t& remote_id)
371 : {
372 : DBG_ENTRY_LVL("DataWriterImpl", "association_complete_i", 6);
373 :
374 0 : bool reader_durable = false;
375 : #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
376 0 : OPENDDS_STRING filterClassName;
377 0 : RcHandle<FilterEvaluator> eval;
378 0 : DDS::StringSeq expression_params;
379 : #endif
380 : {
381 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
382 :
383 0 : if (DCPS_debug_level >= 1) {
384 0 : ACE_DEBUG((LM_DEBUG,
385 : ACE_TEXT("(%P|%t) DataWriterImpl::association_complete_i - ")
386 : ACE_TEXT("bit %d local %C remote %C\n"),
387 : is_bit_,
388 : LogGuid(this->publication_id_).c_str(),
389 : LogGuid(remote_id).c_str()));
390 : }
391 :
392 0 : if (insert(readers_, remote_id) == -1) {
393 0 : ACE_ERROR((LM_ERROR,
394 : ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::association_complete_i: ")
395 : ACE_TEXT("insert %C from pending failed.\n"),
396 : LogGuid(remote_id).c_str()));
397 : }
398 0 : }
399 : {
400 0 : ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
401 0 : RepoIdToReaderInfoMap::const_iterator it = reader_info_.find(remote_id);
402 :
403 0 : if (it != reader_info_.end()) {
404 0 : reader_durable = it->second.durable_;
405 : #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
406 0 : filterClassName = it->second.filter_class_name_;
407 0 : eval = it->second.eval_;
408 0 : expression_params = it->second.expression_params_;
409 : #endif
410 : }
411 0 : }
412 :
413 0 : if (this->monitor_) {
414 0 : this->monitor_->report();
415 : }
416 :
417 0 : if (!is_bit_) {
418 :
419 0 : RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock();
420 :
421 0 : if (!participant)
422 0 : return;
423 :
424 0 : data_container_->add_reader_acks(remote_id, get_max_sn());
425 :
426 0 : const DDS::InstanceHandle_t handle = participant->assign_handle(remote_id);
427 :
428 : {
429 : // protect publication_match_status_ and status changed flags.
430 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
431 :
432 0 : if (DCPS::bind(id_to_handle_map_, remote_id, handle) != 0) {
433 0 : ACE_DEBUG((LM_WARNING,
434 : ACE_TEXT("(%P|%t) WARNING: DataWriterImpl::association_complete_i: ")
435 : ACE_TEXT("id_to_handle_map_%C = 0x%x failed.\n"),
436 : LogGuid(remote_id).c_str(),
437 : handle));
438 0 : return;
439 :
440 0 : } else if (DCPS_debug_level > 4) {
441 0 : ACE_DEBUG((LM_DEBUG,
442 : ACE_TEXT("(%P|%t) DataWriterImpl::association_complete_i: ")
443 : ACE_TEXT("id_to_handle_map_%C = 0x%x.\n"),
444 : LogGuid(remote_id).c_str(),
445 : handle));
446 : }
447 :
448 0 : ++publication_match_status_.total_count;
449 0 : ++publication_match_status_.total_count_change;
450 0 : ++publication_match_status_.current_count;
451 0 : ++publication_match_status_.current_count_change;
452 0 : publication_match_status_.last_subscription_handle = handle;
453 0 : set_status_changed_flag(DDS::PUBLICATION_MATCHED_STATUS, true);
454 0 : }
455 :
456 : DDS::DataWriterListener_var listener =
457 0 : listener_for(DDS::PUBLICATION_MATCHED_STATUS);
458 :
459 0 : if (!CORBA::is_nil(listener.in())) {
460 :
461 0 : listener->on_publication_matched(this, publication_match_status_);
462 :
463 : // TBD - why does the spec say to change this but not
464 : // change the ChangeFlagStatus after a listener call?
465 0 : publication_match_status_.total_count_change = 0;
466 0 : publication_match_status_.current_count_change = 0;
467 : }
468 :
469 0 : notify_status_condition();
470 0 : } else {
471 0 : data_container_->add_reader_acks(remote_id, get_max_sn());
472 : }
473 :
474 : // Support DURABILITY QoS
475 0 : if (reader_durable) {
476 : // Tell the WriteDataContainer to resend all sending/sent
477 : // samples.
478 0 : this->data_container_->reenqueue_all(remote_id, this->qos_.lifespan
479 : #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
480 0 : , filterClassName, eval.in(), expression_params
481 : #endif
482 : );
483 :
484 : // Acquire the data writer container lock to avoid deadlock. The
485 : // thread calling association_complete() has to acquire lock in the
486 : // same order as the write()/register() operation.
487 :
488 : // Since the thread calling association_complete() is the ORB
489 : // thread, it may have some performance penalty. If the
490 : // performance is an issue, we may need a new thread to handle the
491 : // data_available() calls.
492 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex,
493 : guard,
494 : this->get_lock());
495 :
496 0 : SendStateDataSampleList list = this->get_resend_data();
497 : {
498 0 : ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
499 : // Update the reader's expected sequence
500 : SequenceNumber& seq =
501 0 : reader_info_.find(remote_id)->second.expected_sequence_;
502 :
503 0 : for (SendStateDataSampleList::iterator list_el = list.begin();
504 0 : list_el != list.end(); ++list_el) {
505 0 : list_el->get_header().historic_sample_ = true;
506 :
507 0 : if (list_el->get_header().sequence_ > seq) {
508 0 : seq = list_el->get_header().sequence_;
509 : }
510 : }
511 0 : }
512 :
513 0 : RcHandle<PublisherImpl> publisher = this->publisher_servant_.lock();
514 0 : if (!publisher || publisher->is_suspended()) {
515 0 : this->available_data_list_.enqueue_tail(list);
516 :
517 : } else {
518 0 : if (DCPS_debug_level >= 4) {
519 0 : ACE_DEBUG((LM_INFO, "(%P|%t) Sending historic samples\n"));
520 : }
521 :
522 0 : const Encoding encoding(Encoding::KIND_UNALIGNED_CDR);
523 0 : size_t size = 0;
524 0 : serialized_size(encoding, size, remote_id);
525 : Message_Block_Ptr data(
526 : new ACE_Message_Block(size, ACE_Message_Block::MB_DATA, 0, 0, 0,
527 0 : get_db_lock()));
528 0 : Serializer ser(data.get(), encoding);
529 0 : ser << remote_id;
530 :
531 0 : DataSampleHeader header;
532 : Message_Block_Ptr end_historic_samples(
533 : create_control_message(
534 0 : END_HISTORIC_SAMPLES, header, move(data),
535 0 : SystemTimePoint::now().to_dds_time()));
536 :
537 0 : this->controlTracker.message_sent();
538 0 : guard.release();
539 0 : ACE_Reverse_Lock<ACE_Recursive_Thread_Mutex> rev_lock(lock_);
540 0 : ACE_Guard<ACE_Reverse_Lock<ACE_Recursive_Thread_Mutex> > rev_guard(rev_lock);
541 0 : SendControlStatus ret = send_w_control(list, header, move(end_historic_samples), remote_id);
542 0 : if (ret == SEND_CONTROL_ERROR) {
543 0 : ACE_ERROR((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: ")
544 : ACE_TEXT("DataWriterImpl::association_complete_i: ")
545 : ACE_TEXT("send_w_control failed.\n")));
546 0 : this->controlTracker.message_dropped();
547 : }
548 0 : }
549 0 : }
550 0 : }
551 :
552 : void
553 0 : DataWriterImpl::remove_associations(const ReaderIdSeq & readers,
554 : CORBA::Boolean notify_lost)
555 : {
556 0 : if (readers.length() == 0) {
557 0 : return;
558 : }
559 :
560 0 : const Observer_rch observer = get_observer(Observer::e_DISASSOCIATED);
561 0 : if (observer) {
562 0 : for (CORBA::ULong i = 0; i < readers.length(); ++i) {
563 0 : observer->on_disassociated(this, readers[i]);
564 : }
565 : }
566 :
567 0 : if (DCPS_debug_level >= 1) {
568 0 : ACE_DEBUG((LM_DEBUG,
569 : ACE_TEXT("(%P|%t) DataWriterImpl::remove_associations: ")
570 : ACE_TEXT("bit %d local %C remote %C num remotes %d\n"),
571 : is_bit_,
572 : LogGuid(publication_id_).c_str(),
573 : LogGuid(readers[0]).c_str(),
574 : readers.length()));
575 : }
576 :
577 : // stop pending associations for these reader ids
578 0 : this->stop_associating(readers.get_buffer(), readers.length());
579 :
580 0 : ReaderIdSeq fully_associated_readers;
581 0 : CORBA::ULong fully_associated_len = 0;
582 0 : ReaderIdSeq rds;
583 0 : CORBA::ULong rds_len = 0;
584 0 : DDS::InstanceHandleSeq handles;
585 :
586 0 : ACE_GUARD(ACE_Thread_Mutex, wait_guard, sync_unreg_rem_assocs_lock_);
587 : {
588 : // Ensure the same acquisition order as in wait_for_acknowledgments().
589 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
590 : //Remove the readers from fully associated reader list.
591 : //If the supplied reader is not in the cached reader list then it is
592 : //already removed. We just need remove the readers in the list that have
593 : //not been removed.
594 :
595 0 : CORBA::ULong len = readers.length();
596 :
597 0 : for (CORBA::ULong i = 0; i < len; ++i) {
598 : //Remove the readers from fully associated reader list. If it's not
599 : //in there, the association_complete() is not called yet and remove it
600 : //from pending list.
601 :
602 0 : if (remove(readers_, readers[i]) == 0) {
603 0 : ++ fully_associated_len;
604 0 : fully_associated_readers.length(fully_associated_len);
605 0 : fully_associated_readers [fully_associated_len - 1] = readers[i];
606 :
607 0 : ++ rds_len;
608 0 : rds.length(rds_len);
609 0 : rds [rds_len - 1] = readers[i];
610 : }
611 :
612 0 : data_container_->remove_reader_acks(readers[i]);
613 :
614 0 : ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
615 0 : reader_info_.erase(readers[i]);
616 : //else reader is already removed which indicates remove_association()
617 : //is called multiple times.
618 0 : }
619 :
620 0 : if (fully_associated_len > 0 && !is_bit_) {
621 : // The reader should be in the id_to_handle map at this time
622 0 : this->lookup_instance_handles(fully_associated_readers, handles);
623 :
624 0 : for (CORBA::ULong i = 0; i < fully_associated_len; ++i) {
625 0 : id_to_handle_map_.erase(fully_associated_readers[i]);
626 : }
627 : }
628 :
629 : // Mirror the PUBLICATION_MATCHED_STATUS processing from
630 : // association_complete() here.
631 0 : if (!this->is_bit_) {
632 :
633 : // Derive the change in the number of subscriptions reading this writer.
634 : int matchedSubscriptions =
635 0 : static_cast<int>(this->id_to_handle_map_.size());
636 0 : this->publication_match_status_.current_count_change =
637 0 : matchedSubscriptions - this->publication_match_status_.current_count;
638 :
639 : // Only process status if the number of subscriptions has changed.
640 0 : if (this->publication_match_status_.current_count_change != 0) {
641 0 : this->publication_match_status_.current_count = matchedSubscriptions;
642 :
643 : /// Section 7.1.4.1: total_count will not decrement.
644 :
645 : /// @TODO: Reconcile this with the verbiage in section 7.1.4.1
646 0 : this->publication_match_status_.last_subscription_handle =
647 0 : handles[fully_associated_len - 1];
648 :
649 0 : set_status_changed_flag(DDS::PUBLICATION_MATCHED_STATUS, true);
650 :
651 : DDS::DataWriterListener_var listener =
652 0 : this->listener_for(DDS::PUBLICATION_MATCHED_STATUS);
653 :
654 0 : if (!CORBA::is_nil(listener.in())) {
655 0 : listener->on_publication_matched(this, this->publication_match_status_);
656 :
657 : // Listener consumes the change.
658 0 : this->publication_match_status_.total_count_change = 0;
659 0 : this->publication_match_status_.current_count_change = 0;
660 : }
661 :
662 0 : this->notify_status_condition();
663 0 : }
664 : }
665 0 : }
666 :
667 0 : for (CORBA::ULong i = 0; i < rds.length(); ++i) {
668 0 : this->disassociate(rds[i]);
669 : }
670 :
671 : // If this remove_association is invoked when the InfoRepo
672 : // detects a lost reader then make a callback to notify
673 : // subscription lost.
674 0 : if (notify_lost && handles.length() > 0) {
675 0 : this->notify_publication_lost(handles);
676 : }
677 :
678 0 : const RcHandle<DomainParticipantImpl> participant = participant_servant_.lock();
679 0 : for (unsigned int i = 0; i < handles.length(); ++i) {
680 0 : participant->return_handle(handles[i]);
681 : }
682 0 : }
683 :
684 0 : void DataWriterImpl::replay_durable_data_for(const GUID_t& remote_id)
685 : {
686 : DBG_ENTRY_LVL("DataWriterImpl", "replay_durable_data_for", 6);
687 :
688 0 : bool reader_durable = false;
689 : #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
690 0 : OPENDDS_STRING filterClassName;
691 0 : RcHandle<FilterEvaluator> eval;
692 0 : DDS::StringSeq expression_params;
693 : #endif
694 :
695 : {
696 0 : ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
697 0 : RepoIdToReaderInfoMap::const_iterator it = reader_info_.find(remote_id);
698 :
699 0 : if (it != reader_info_.end()) {
700 0 : reader_durable = it->second.durable_;
701 : #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
702 0 : filterClassName = it->second.filter_class_name_;
703 0 : eval = it->second.eval_;
704 0 : expression_params = it->second.expression_params_;
705 : #endif
706 : }
707 0 : }
708 :
709 : // Support DURABILITY QoS
710 0 : if (reader_durable) {
711 : // Tell the WriteDataContainer to resend all sending/sent
712 : // samples.
713 0 : this->data_container_->reenqueue_all(remote_id, this->qos_.lifespan
714 : #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
715 0 : , filterClassName, eval.in(), expression_params
716 : #endif
717 : );
718 :
719 : // Acquire the data writer container lock to avoid deadlock. The
720 : // thread calling association_complete() has to acquire lock in the
721 : // same order as the write()/register() operation.
722 :
723 : // Since the thread calling association_complete() is the ORB
724 : // thread, it may have some performance penalty. If the
725 : // performance is an issue, we may need a new thread to handle the
726 : // data_available() calls.
727 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex,
728 : guard,
729 : this->get_lock());
730 :
731 0 : SendStateDataSampleList list = this->get_resend_data();
732 : {
733 0 : ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
734 : // Update the reader's expected sequence
735 : SequenceNumber& seq =
736 0 : reader_info_.find(remote_id)->second.expected_sequence_;
737 :
738 0 : for (SendStateDataSampleList::iterator list_el = list.begin();
739 0 : list_el != list.end(); ++list_el) {
740 0 : list_el->get_header().historic_sample_ = true;
741 :
742 0 : if (list_el->get_header().sequence_ > seq) {
743 0 : seq = list_el->get_header().sequence_;
744 : }
745 : }
746 0 : }
747 :
748 0 : RcHandle<PublisherImpl> publisher = this->publisher_servant_.lock();
749 0 : if (!publisher || publisher->is_suspended()) {
750 0 : this->available_data_list_.enqueue_tail(list);
751 :
752 : } else {
753 0 : if (DCPS_debug_level >= 4) {
754 0 : ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) DataWriterImpl::replay_durable_data_for: Sending historic samples\n")));
755 : }
756 :
757 0 : const Encoding encoding(Encoding::KIND_UNALIGNED_CDR);
758 0 : size_t size = 0;
759 0 : serialized_size(encoding, size, remote_id);
760 : Message_Block_Ptr data(
761 : new ACE_Message_Block(size, ACE_Message_Block::MB_DATA, 0, 0, 0,
762 0 : get_db_lock()));
763 0 : Serializer ser(data.get(), encoding);
764 0 : ser << remote_id;
765 :
766 0 : DataSampleHeader header;
767 0 : Message_Block_Ptr end_historic_samples(create_control_message(END_HISTORIC_SAMPLES, header, move(data),
768 0 : SystemTimePoint::now().to_dds_time()));
769 :
770 0 : this->controlTracker.message_sent();
771 0 : guard.release();
772 0 : const SendControlStatus ret = send_w_control(list, header, move(end_historic_samples), remote_id);
773 0 : if (ret == SEND_CONTROL_ERROR) {
774 0 : ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
775 : ACE_TEXT("DataWriterImpl::replay_durable_data_for: ")
776 : ACE_TEXT("send_w_control failed.\n")));
777 0 : this->controlTracker.message_dropped();
778 : }
779 0 : }
780 0 : }
781 0 : }
782 :
783 0 : void DataWriterImpl::remove_all_associations()
784 : {
785 : DBG_ENTRY_LVL("DataWriterImpl", "remove_all_associations", 6);
786 : // stop pending associations
787 0 : this->stop_associating();
788 :
789 0 : ReaderIdSeq readers;
790 : CORBA::ULong size;
791 : {
792 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, lock_);
793 :
794 0 : size = static_cast<CORBA::ULong>(readers_.size());
795 0 : readers.length(size);
796 :
797 0 : RepoIdSet::iterator itEnd = readers_.end();
798 0 : int i = 0;
799 :
800 0 : for (RepoIdSet::iterator it = readers_.begin(); it != itEnd; ++it) {
801 0 : readers[i ++] = *it;
802 : }
803 0 : }
804 :
805 : try {
806 0 : if (0 < size) {
807 0 : CORBA::Boolean dont_notify_lost = false;
808 :
809 0 : this->remove_associations(readers, dont_notify_lost);
810 : }
811 :
812 0 : } catch (const CORBA::Exception&) {
813 0 : ACE_DEBUG((LM_WARNING,
814 : ACE_TEXT("(%P|%t) WARNING: DataWriterImpl::remove_all_associations() - ")
815 : ACE_TEXT("caught exception from remove_associations.\n")));
816 0 : }
817 :
818 0 : transport_stop();
819 0 : }
820 :
821 : void
822 0 : DataWriterImpl::register_for_reader(const GUID_t& participant,
823 : const GUID_t& writerid,
824 : const GUID_t& readerid,
825 : const TransportLocatorSeq& locators,
826 : DiscoveryListener* listener)
827 : {
828 0 : TransportClient::register_for_reader(participant, writerid, readerid, locators, listener);
829 0 : }
830 :
831 : void
832 0 : DataWriterImpl::unregister_for_reader(const GUID_t& participant,
833 : const GUID_t& writerid,
834 : const GUID_t& readerid)
835 : {
836 0 : TransportClient::unregister_for_reader(participant, writerid, readerid);
837 0 : }
838 :
839 : void
840 0 : DataWriterImpl::update_locators(const GUID_t& readerId,
841 : const TransportLocatorSeq& locators)
842 : {
843 : {
844 0 : ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, reader_info_lock_);
845 0 : RepoIdToReaderInfoMap::const_iterator iter = reader_info_.find(readerId);
846 0 : if (iter == reader_info_.end()) {
847 0 : return;
848 : }
849 0 : }
850 0 : TransportClient::update_locators(readerId, locators);
851 : }
852 :
853 : void
854 0 : DataWriterImpl::update_incompatible_qos(const IncompatibleQosStatus& status)
855 : {
856 : DDS::DataWriterListener_var listener =
857 0 : listener_for(DDS::OFFERED_INCOMPATIBLE_QOS_STATUS);
858 :
859 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
860 :
861 : #if 0
862 :
863 : if (this->offered_incompatible_qos_status_.total_count == status.total_count) {
864 : // This test should make the method idempotent.
865 : return;
866 : }
867 :
868 : #endif
869 :
870 0 : set_status_changed_flag(DDS::OFFERED_INCOMPATIBLE_QOS_STATUS, true);
871 :
872 : // copy status and increment change
873 0 : offered_incompatible_qos_status_.total_count = status.total_count;
874 0 : offered_incompatible_qos_status_.total_count_change +=
875 0 : status.count_since_last_send;
876 0 : offered_incompatible_qos_status_.last_policy_id = status.last_policy_id;
877 0 : offered_incompatible_qos_status_.policies = status.policies;
878 :
879 0 : if (!CORBA::is_nil(listener.in())) {
880 0 : listener->on_offered_incompatible_qos(this, offered_incompatible_qos_status_);
881 :
882 : // TBD - Why does the spec say to change this but not change the
883 : // ChangeFlagStatus after a listener call?
884 0 : offered_incompatible_qos_status_.total_count_change = 0;
885 : }
886 :
887 0 : notify_status_condition();
888 0 : }
889 :
890 : void
891 0 : DataWriterImpl::update_subscription_params(const GUID_t& readerId,
892 : const DDS::StringSeq& params)
893 : {
894 : #ifdef OPENDDS_NO_CONTENT_FILTERED_TOPIC
895 : ACE_UNUSED_ARG(readerId);
896 : ACE_UNUSED_ARG(params);
897 : #else
898 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
899 0 : ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
900 0 : RepoIdToReaderInfoMap::iterator iter = reader_info_.find(readerId);
901 :
902 0 : if (iter != reader_info_.end()) {
903 0 : iter->second.expression_params_ = params;
904 :
905 0 : } else if (DCPS_debug_level > 4 &&
906 0 : TheServiceParticipant->publisher_content_filter()) {
907 0 : ACE_DEBUG((LM_WARNING,
908 : ACE_TEXT("(%P|%t) WARNING: DataWriterImpl::update_subscription_params()")
909 : ACE_TEXT(" - writer: %C has no info about reader: %C\n"),
910 : LogGuid(this->publication_id_).c_str(), LogGuid(readerId).c_str()));
911 : }
912 :
913 : #endif
914 0 : }
915 :
916 0 : DDS::ReturnCode_t DataWriterImpl::set_qos(const DDS::DataWriterQos& qos)
917 : {
918 : OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
919 : OPENDDS_NO_OWNERSHIP_STRENGTH_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
920 : OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
921 : OPENDDS_NO_DURABILITY_SERVICE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
922 : OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
923 :
924 0 : DDS::DataWriterQos new_qos = qos;
925 0 : new_qos.representation.value = qos_.representation.value;
926 0 : if (Qos_Helper::valid(new_qos) && Qos_Helper::consistent(new_qos)) {
927 0 : if (qos_ == new_qos)
928 0 : return DDS::RETCODE_OK;
929 :
930 0 : if (enabled_) {
931 0 : if (!Qos_Helper::changeable(qos_, new_qos)) {
932 0 : return DDS::RETCODE_IMMUTABLE_POLICY;
933 : }
934 :
935 0 : Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
936 0 : DDS::PublisherQos publisherQos;
937 0 : RcHandle<PublisherImpl> publisher = this->publisher_servant_.lock();
938 :
939 0 : bool status = false;
940 0 : if (publisher) {
941 0 : publisher->get_qos(publisherQos);
942 : status
943 0 : = disco->update_publication_qos(domain_id_,
944 0 : dp_id_,
945 0 : this->publication_id_,
946 : new_qos,
947 : publisherQos);
948 : }
949 0 : if (!status) {
950 0 : ACE_ERROR_RETURN((LM_ERROR,
951 : ACE_TEXT("(%P|%t) DataWriterImpl::set_qos, ")
952 : ACE_TEXT("qos not updated.\n")),
953 : DDS::RETCODE_ERROR);
954 : }
955 :
956 0 : if (!(qos_ == new_qos)) {
957 0 : data_container_->set_deadline_period(TimeDuration(qos.deadline.period));
958 0 : qos_ = new_qos;
959 : }
960 0 : }
961 :
962 0 : qos_ = new_qos;
963 0 : passed_qos_ = qos;
964 :
965 0 : const Observer_rch observer = get_observer(Observer::e_QOS_CHANGED);
966 0 : if (observer) {
967 0 : observer->on_qos_changed(this);
968 : }
969 :
970 0 : return DDS::RETCODE_OK;
971 :
972 0 : } else {
973 0 : return DDS::RETCODE_INCONSISTENT_POLICY;
974 : }
975 0 : }
976 :
977 : DDS::ReturnCode_t
978 0 : DataWriterImpl::get_qos(DDS::DataWriterQos & qos)
979 : {
980 0 : qos = passed_qos_;
981 0 : return DDS::RETCODE_OK;
982 : }
983 :
984 : DDS::ReturnCode_t
985 0 : DataWriterImpl::set_listener(DDS::DataWriterListener_ptr a_listener,
986 : DDS::StatusMask mask)
987 : {
988 0 : ACE_Guard<ACE_Thread_Mutex> g(listener_mutex_);
989 0 : listener_mask_ = mask;
990 : //note: OK to duplicate a nil object ref
991 0 : listener_ = DDS::DataWriterListener::_duplicate(a_listener);
992 0 : return DDS::RETCODE_OK;
993 0 : }
994 :
995 : DDS::DataWriterListener_ptr
996 0 : DataWriterImpl::get_listener()
997 : {
998 0 : ACE_Guard<ACE_Thread_Mutex> g(listener_mutex_);
999 0 : return DDS::DataWriterListener::_duplicate(listener_.in());
1000 0 : }
1001 :
1002 : DataWriterListener_ptr
1003 0 : DataWriterImpl::get_ext_listener()
1004 : {
1005 0 : ACE_Guard<ACE_Thread_Mutex> g(listener_mutex_);
1006 0 : return DataWriterListener::_narrow(listener_.in());
1007 0 : }
1008 :
1009 : DDS::Topic_ptr
1010 0 : DataWriterImpl::get_topic()
1011 : {
1012 0 : return DDS::Topic::_duplicate(topic_servant_.get());
1013 : }
1014 :
1015 : bool
1016 0 : DataWriterImpl::should_ack() const
1017 : {
1018 : // N.B. It may be worthwhile to investigate a more efficient
1019 : // heuristic for determining if a writer should send SAMPLE_ACK
1020 : // control samples. Perhaps based on a sequence number delta?
1021 0 : return this->readers_.size() != 0;
1022 : }
1023 :
1024 : DataWriterImpl::AckToken
1025 0 : DataWriterImpl::create_ack_token(DDS::Duration_t max_wait) const
1026 : {
1027 0 : const SequenceNumber sn = get_max_sn();
1028 0 : if (DCPS_debug_level > 0) {
1029 0 : ACE_DEBUG((LM_DEBUG,
1030 : ACE_TEXT("(%P|%t) DataWriterImpl::create_ack_token() - ")
1031 : ACE_TEXT("for sequence %q\n"),
1032 : sn.getValue()));
1033 : }
1034 0 : return AckToken(max_wait, sn);
1035 : }
1036 :
1037 :
1038 :
1039 : DDS::ReturnCode_t
1040 0 : DataWriterImpl::send_request_ack()
1041 : {
1042 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
1043 : guard,
1044 : get_lock(),
1045 : DDS::RETCODE_ERROR);
1046 :
1047 :
1048 0 : DataSampleElement* element = 0;
1049 0 : DDS::ReturnCode_t ret = this->data_container_->obtain_buffer_for_control(element);
1050 :
1051 0 : if (ret != DDS::RETCODE_OK) {
1052 0 : ACE_ERROR_RETURN((LM_ERROR,
1053 : ACE_TEXT("(%P|%t) ERROR: ")
1054 : ACE_TEXT("DataWriterImpl::send_request_ack: ")
1055 : ACE_TEXT("obtain_buffer_for_control returned %d.\n"),
1056 : ret),
1057 : ret);
1058 : }
1059 :
1060 0 : Message_Block_Ptr blk;
1061 : // Add header with the registration sample data.
1062 : Message_Block_Ptr sample(
1063 : create_control_message(
1064 : REQUEST_ACK,
1065 : element->get_header(),
1066 0 : move(blk),
1067 0 : SystemTimePoint::now().to_dds_time()));
1068 :
1069 0 : element->set_sample(move(sample));
1070 :
1071 0 : ret = this->data_container_->enqueue_control(element);
1072 :
1073 0 : if (ret != DDS::RETCODE_OK) {
1074 0 : data_container_->release_buffer(element);
1075 0 : ACE_ERROR_RETURN((LM_ERROR,
1076 : ACE_TEXT("(%P|%t) ERROR: ")
1077 : ACE_TEXT("DataWriterImpl::send_request_ack: ")
1078 : ACE_TEXT("enqueue_control failed.\n")),
1079 : ret);
1080 : }
1081 :
1082 :
1083 0 : send_all_to_flush_control(guard);
1084 :
1085 0 : return DDS::RETCODE_OK;
1086 0 : }
1087 :
1088 : DDS::ReturnCode_t
1089 0 : DataWriterImpl::wait_for_acknowledgments(const DDS::Duration_t& max_wait)
1090 : {
1091 0 : if (this->qos_.reliability.kind != DDS::RELIABLE_RELIABILITY_QOS)
1092 0 : return DDS::RETCODE_OK;
1093 :
1094 0 : DDS::ReturnCode_t ret = send_request_ack();
1095 :
1096 0 : if (ret != DDS::RETCODE_OK)
1097 0 : return ret;
1098 :
1099 0 : DataWriterImpl::AckToken token = create_ack_token(max_wait);
1100 0 : if (DCPS_debug_level) {
1101 0 : ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) DataWriterImpl::wait_for_acknowledgments")
1102 : ACE_TEXT(" waiting for acknowledgment of sequence %q at %T\n"),
1103 : token.sequence_.getValue()));
1104 : }
1105 0 : return wait_for_specific_ack(token);
1106 0 : }
1107 :
1108 : DDS::ReturnCode_t
1109 0 : DataWriterImpl::wait_for_specific_ack(const AckToken& token)
1110 : {
1111 0 : return this->data_container_->wait_ack_of_seq(token.deadline(), token.deadline_is_infinite(), token.sequence_);
1112 : }
1113 :
1114 : DDS::Publisher_ptr
1115 0 : DataWriterImpl::get_publisher()
1116 : {
1117 0 : return publisher_servant_.lock()._retn();
1118 : }
1119 :
1120 : DDS::ReturnCode_t
1121 0 : DataWriterImpl::get_liveliness_lost_status(
1122 : DDS::LivelinessLostStatus & status)
1123 : {
1124 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
1125 : guard,
1126 : this->lock_,
1127 : DDS::RETCODE_ERROR);
1128 0 : set_status_changed_flag(DDS::LIVELINESS_LOST_STATUS, false);
1129 0 : status = liveliness_lost_status_;
1130 0 : liveliness_lost_status_.total_count_change = 0;
1131 0 : return DDS::RETCODE_OK;
1132 0 : }
1133 :
1134 : DDS::ReturnCode_t
1135 0 : DataWriterImpl::get_offered_deadline_missed_status(
1136 : DDS::OfferedDeadlineMissedStatus & status)
1137 : {
1138 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
1139 : guard,
1140 : this->lock_,
1141 : DDS::RETCODE_ERROR);
1142 :
1143 0 : set_status_changed_flag(DDS::OFFERED_DEADLINE_MISSED_STATUS, false);
1144 :
1145 0 : this->offered_deadline_missed_status_.total_count_change =
1146 0 : this->offered_deadline_missed_status_.total_count
1147 0 : - this->last_deadline_missed_total_count_;
1148 :
1149 : // Update for next status check.
1150 0 : this->last_deadline_missed_total_count_ =
1151 0 : this->offered_deadline_missed_status_.total_count;
1152 :
1153 0 : status = offered_deadline_missed_status_;
1154 :
1155 0 : this->offered_deadline_missed_status_.total_count_change = 0;
1156 :
1157 0 : return DDS::RETCODE_OK;
1158 0 : }
1159 :
1160 : DDS::ReturnCode_t
1161 0 : DataWriterImpl::get_offered_incompatible_qos_status(
1162 : DDS::OfferedIncompatibleQosStatus & status)
1163 : {
1164 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
1165 : guard,
1166 : this->lock_,
1167 : DDS::RETCODE_ERROR);
1168 0 : set_status_changed_flag(DDS::OFFERED_INCOMPATIBLE_QOS_STATUS, false);
1169 0 : status = offered_incompatible_qos_status_;
1170 0 : offered_incompatible_qos_status_.total_count_change = 0;
1171 0 : return DDS::RETCODE_OK;
1172 0 : }
1173 :
1174 : DDS::ReturnCode_t
1175 0 : DataWriterImpl::get_publication_matched_status(
1176 : DDS::PublicationMatchedStatus & status)
1177 : {
1178 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
1179 : guard,
1180 : this->lock_,
1181 : DDS::RETCODE_ERROR);
1182 0 : set_status_changed_flag(DDS::PUBLICATION_MATCHED_STATUS, false);
1183 0 : status = publication_match_status_;
1184 0 : publication_match_status_.total_count_change = 0;
1185 0 : publication_match_status_.current_count_change = 0;
1186 0 : return DDS::RETCODE_OK;
1187 0 : }
1188 :
1189 : DDS::ReturnCode_t
1190 0 : DataWriterImpl::assert_liveliness()
1191 : {
1192 0 : switch (this->qos_.liveliness.kind) {
1193 0 : case DDS::AUTOMATIC_LIVELINESS_QOS:
1194 : // Do nothing.
1195 0 : break;
1196 0 : case DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS:
1197 : {
1198 0 : RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock();
1199 0 : if (participant) {
1200 0 : return participant->assert_liveliness();
1201 : }
1202 0 : }
1203 0 : break;
1204 0 : case DDS::MANUAL_BY_TOPIC_LIVELINESS_QOS:
1205 0 : if (!send_liveliness(MonotonicTimePoint::now())) {
1206 0 : return DDS::RETCODE_ERROR;
1207 : }
1208 0 : break;
1209 : }
1210 :
1211 0 : return DDS::RETCODE_OK;
1212 : }
1213 :
1214 : DDS::ReturnCode_t
1215 0 : DataWriterImpl::assert_liveliness_by_participant()
1216 : {
1217 : // This operation is called by participant.
1218 0 : if (this->qos_.liveliness.kind == DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS) {
1219 : // Set a flag indicating that we should send a liveliness message on the timer if necessary.
1220 0 : liveliness_asserted_ = true;
1221 : }
1222 :
1223 0 : return DDS::RETCODE_OK;
1224 : }
1225 :
1226 : TimeDuration
1227 0 : DataWriterImpl::liveliness_check_interval(DDS::LivelinessQosPolicyKind kind)
1228 : {
1229 0 : if (this->qos_.liveliness.kind == kind) {
1230 0 : return liveliness_check_interval_;
1231 : } else {
1232 0 : return TimeDuration::max_value;
1233 : }
1234 : }
1235 :
1236 : bool
1237 0 : DataWriterImpl::participant_liveliness_activity_after(const MonotonicTimePoint& tv)
1238 : {
1239 0 : ACE_Guard<ACE_Recursive_Thread_Mutex> guard(lock_);
1240 0 : if (this->qos_.liveliness.kind == DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS) {
1241 0 : return last_liveliness_activity_time_ > tv;
1242 : } else {
1243 0 : return false;
1244 : }
1245 0 : }
1246 :
1247 : DDS::ReturnCode_t
1248 0 : DataWriterImpl::get_matched_subscriptions(
1249 : DDS::InstanceHandleSeq & subscription_handles)
1250 : {
1251 0 : if (!enabled_) {
1252 0 : ACE_ERROR_RETURN((LM_ERROR,
1253 : ACE_TEXT("(%P|%t) ERROR: ")
1254 : ACE_TEXT("DataWriterImpl::get_matched_subscriptions: ")
1255 : ACE_TEXT(" Entity is not enabled.\n")),
1256 : DDS::RETCODE_NOT_ENABLED);
1257 : }
1258 :
1259 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
1260 : guard,
1261 : this->lock_,
1262 : DDS::RETCODE_ERROR);
1263 :
1264 : // Copy out the handles for the current set of subscriptions.
1265 0 : int index = 0;
1266 0 : subscription_handles.length(
1267 0 : static_cast<CORBA::ULong>(this->id_to_handle_map_.size()));
1268 :
1269 0 : for (RepoIdToHandleMap::iterator
1270 0 : current = this->id_to_handle_map_.begin();
1271 0 : current != this->id_to_handle_map_.end();
1272 0 : ++current, ++index) {
1273 0 : subscription_handles[index] = current->second;
1274 : }
1275 :
1276 0 : return DDS::RETCODE_OK;
1277 0 : }
1278 :
1279 : #if !defined (DDS_HAS_MINIMUM_BIT)
1280 : DDS::ReturnCode_t
1281 0 : DataWriterImpl::get_matched_subscription_data(
1282 : DDS::SubscriptionBuiltinTopicData & subscription_data,
1283 : DDS::InstanceHandle_t subscription_handle)
1284 : {
1285 0 : if (!enabled_) {
1286 0 : ACE_ERROR_RETURN((LM_ERROR,
1287 : ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::")
1288 : ACE_TEXT("get_matched_subscription_data: ")
1289 : ACE_TEXT("Entity is not enabled.\n")),
1290 : DDS::RETCODE_NOT_ENABLED);
1291 : }
1292 0 : RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock();
1293 :
1294 0 : DDS::ReturnCode_t ret = DDS::RETCODE_ERROR;
1295 0 : DDS::SubscriptionBuiltinTopicDataSeq data;
1296 :
1297 0 : if (participant) {
1298 0 : ret = instance_handle_to_bit_data<DDS::SubscriptionBuiltinTopicDataDataReader_var>(
1299 : participant.in(),
1300 : BUILT_IN_SUBSCRIPTION_TOPIC,
1301 : subscription_handle,
1302 : data);
1303 : }
1304 :
1305 0 : if (ret == DDS::RETCODE_OK) {
1306 0 : subscription_data = data[0];
1307 : }
1308 :
1309 0 : return ret;
1310 0 : }
1311 : #endif // !defined (DDS_HAS_MINIMUM_BIT)
1312 :
1313 : DDS::ReturnCode_t
1314 0 : DataWriterImpl::enable()
1315 : {
1316 : //According spec:
1317 : // - Calling enable on an already enabled Entity returns OK and has no
1318 : // effect.
1319 : // - Calling enable on an Entity whose factory is not enabled will fail
1320 : // and return PRECONDITION_NOT_MET.
1321 :
1322 0 : if (this->is_enabled()) {
1323 0 : return DDS::RETCODE_OK;
1324 : }
1325 :
1326 0 : RcHandle<PublisherImpl> publisher = this->publisher_servant_.lock();
1327 0 : if (!publisher || !publisher->is_enabled()) {
1328 0 : return DDS::RETCODE_PRECONDITION_NOT_MET;
1329 : }
1330 :
1331 0 : if (!topic_servant_->is_enabled()) {
1332 0 : return DDS::RETCODE_PRECONDITION_NOT_MET;
1333 : }
1334 :
1335 0 : RcHandle<DomainParticipantImpl> participant = participant_servant_.lock();
1336 0 : if (participant) {
1337 0 : dp_id_ = participant->get_id();
1338 : }
1339 :
1340 : // Note: do configuration based on QoS in enable() because
1341 : // before enable is called the QoS can be changed -- even
1342 : // for Changeable=NO
1343 :
1344 : // Configure WriteDataContainer constructor parameters from qos.
1345 :
1346 0 : const bool reliable = qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS;
1347 :
1348 0 : CORBA::Long const max_samples_per_instance =
1349 0 : (qos_.resource_limits.max_samples_per_instance == DDS::LENGTH_UNLIMITED)
1350 0 : ? 0x7fffffff : qos_.resource_limits.max_samples_per_instance;
1351 :
1352 0 : CORBA::Long max_instances = 0, max_total_samples = 0;
1353 :
1354 0 : if (qos_.resource_limits.max_samples != DDS::LENGTH_UNLIMITED) {
1355 0 : n_chunks_ = qos_.resource_limits.max_samples;
1356 :
1357 0 : if (qos_.resource_limits.max_instances == DDS::LENGTH_UNLIMITED ||
1358 0 : (qos_.resource_limits.max_samples < qos_.resource_limits.max_instances)
1359 0 : || (qos_.resource_limits.max_samples <
1360 0 : (qos_.resource_limits.max_instances * max_samples_per_instance))) {
1361 0 : max_total_samples = reliable ? qos_.resource_limits.max_samples : 0;
1362 : }
1363 : }
1364 :
1365 0 : if (reliable && qos_.resource_limits.max_instances != DDS::LENGTH_UNLIMITED)
1366 0 : max_instances = qos_.resource_limits.max_instances;
1367 :
1368 0 : const CORBA::Long history_depth =
1369 0 : (qos_.history.kind == DDS::KEEP_ALL_HISTORY_QOS ||
1370 0 : qos_.history.depth == DDS::LENGTH_UNLIMITED) ? 0x7fffffff : qos_.history.depth;
1371 :
1372 0 : const CORBA::Long max_durable_per_instance =
1373 0 : qos_.durability.kind == DDS::VOLATILE_DURABILITY_QOS ? 0 : history_depth;
1374 :
1375 : #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
1376 : // Get data durability cache if DataWriter QoS requires durable
1377 : // samples. Publisher servant retains ownership of the cache.
1378 : DataDurabilityCache* const durability_cache =
1379 0 : TheServiceParticipant->get_data_durability_cache(qos_.durability);
1380 : #endif
1381 :
1382 : //Note: the QoS used to set n_chunks_ is Changeable=No so
1383 : // it is OK that we cannot change the size of our allocators.
1384 0 : data_container_ = RcHandle<WriteDataContainer>(
1385 : new WriteDataContainer(
1386 : this,
1387 : max_samples_per_instance,
1388 : history_depth,
1389 : max_durable_per_instance,
1390 0 : qos_.reliability.max_blocking_time,
1391 : n_chunks_,
1392 : domain_id_,
1393 : topic_name_,
1394 0 : get_type_name(),
1395 : #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
1396 : durability_cache,
1397 0 : qos_.durability_service,
1398 : #endif
1399 : max_instances,
1400 : max_total_samples,
1401 0 : lock_,
1402 0 : offered_deadline_missed_status_,
1403 0 : last_deadline_missed_total_count_),
1404 0 : keep_count());
1405 :
1406 : // +1 because we might allocate one before releasing another
1407 : // TBD - see if this +1 can be removed.
1408 0 : mb_allocator_.reset(new MessageBlockAllocator(n_chunks_ * association_chunk_multiplier_));
1409 0 : db_allocator_.reset(new DataBlockAllocator(n_chunks_+1));
1410 0 : header_allocator_.reset(new DataSampleHeaderAllocator(n_chunks_+1));
1411 :
1412 0 : if (DCPS_debug_level >= 2) {
1413 0 : ACE_DEBUG((LM_DEBUG,
1414 : "(%P|%t) DataWriterImpl::enable-mb"
1415 : " Cached_Allocator_With_Overflow %x with %B chunks\n",
1416 : mb_allocator_.get(),
1417 : n_chunks_));
1418 :
1419 0 : ACE_DEBUG((LM_DEBUG,
1420 : "(%P|%t) DataWriterImpl::enable-db"
1421 : " Cached_Allocator_With_Overflow %x with %B chunks\n",
1422 : db_allocator_.get(),
1423 : n_chunks_));
1424 :
1425 0 : ACE_DEBUG((LM_DEBUG,
1426 : "(%P|%t) DataWriterImpl::enable-header"
1427 : " Cached_Allocator_With_Overflow %x with %B chunks\n",
1428 : header_allocator_.get(),
1429 : n_chunks_));
1430 : }
1431 :
1432 0 : if (qos_.liveliness.lease_duration.sec != DDS::DURATION_INFINITE_SEC &&
1433 0 : qos_.liveliness.lease_duration.nanosec != DDS::DURATION_INFINITE_NSEC) {
1434 : // Must be at least 1 micro second.
1435 : liveliness_check_interval_ = std::max(
1436 0 : TimeDuration(qos_.liveliness.lease_duration) * (TheServiceParticipant->liveliness_factor() / 100.0),
1437 0 : TimeDuration(0, 1));
1438 :
1439 0 : if (reactor_->schedule_timer(liveness_timer_.in(),
1440 : 0,
1441 : liveliness_check_interval_.value(),
1442 0 : liveliness_check_interval_.value()) == -1) {
1443 0 : ACE_ERROR((LM_ERROR,
1444 : ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::enable: %p.\n"),
1445 : ACE_TEXT("schedule_timer")));
1446 :
1447 : }
1448 : }
1449 :
1450 0 : if (!participant) {
1451 0 : return DDS::RETCODE_ERROR;
1452 : }
1453 :
1454 0 : participant->add_adjust_liveliness_timers(this);
1455 :
1456 0 : data_container_->set_deadline_period(TimeDuration(qos_.deadline.period));
1457 :
1458 0 : Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
1459 0 : disco->pre_writer(this);
1460 :
1461 0 : this->set_enabled();
1462 :
1463 : try {
1464 0 : this->enable_transport(reliable,
1465 0 : this->qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS);
1466 :
1467 0 : } catch (const Transport::Exception&) {
1468 0 : ACE_ERROR((LM_ERROR,
1469 : ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::enable, ")
1470 : ACE_TEXT("Transport Exception.\n")));
1471 0 : data_container_->shutdown_ = true;
1472 0 : return DDS::RETCODE_ERROR;
1473 0 : }
1474 :
1475 : // Must be done after transport enabled.
1476 0 : set_writer_effective_data_rep_qos(qos_.representation.value, cdr_encapsulation());
1477 0 : if (!topic_servant_->check_data_representation(qos_.representation.value, true)) {
1478 0 : data_container_->shutdown_ = true;
1479 0 : return DDS::RETCODE_ERROR;
1480 : }
1481 :
1482 : // Done after enable_transport so we know its swap_bytes.
1483 0 : const DDS::ReturnCode_t setup_serialization_result = setup_serialization();
1484 0 : if (setup_serialization_result != DDS::RETCODE_OK) {
1485 0 : data_container_->shutdown_ = true;
1486 0 : return setup_serialization_result;
1487 : }
1488 :
1489 0 : const TransportLocatorSeq& trans_conf_info = connection_info();
1490 0 : DDS::PublisherQos pub_qos;
1491 0 : publisher->get_qos(pub_qos);
1492 :
1493 0 : XTypes::TypeInformation type_info;
1494 0 : type_support_->to_type_info(type_info);
1495 :
1496 0 : XTypes::TypeLookupService_rch type_lookup_service = participant->get_type_lookup_service();
1497 0 : type_support_->add_types(type_lookup_service);
1498 :
1499 : const GUID_t publication_id =
1500 0 : disco->add_publication(this->domain_id_,
1501 0 : this->dp_id_,
1502 0 : this->topic_servant_->get_id(),
1503 0 : rchandle_from(this),
1504 0 : this->qos_,
1505 : trans_conf_info,
1506 : pub_qos,
1507 : type_info);
1508 :
1509 0 : ACE_Guard<ACE_Recursive_Thread_Mutex> guard(lock_);
1510 0 : publication_id_ = publication_id;
1511 :
1512 0 : if (publication_id_ == GUID_UNKNOWN) {
1513 0 : if (DCPS_debug_level >= 1) {
1514 0 : ACE_DEBUG((LM_WARNING, "(%P|%t) WARNING: DataWriterImpl::enable: "
1515 : "add_publication failed\n"));
1516 : }
1517 0 : data_container_->shutdown_ = true;
1518 0 : return DDS::RETCODE_ERROR;
1519 : }
1520 :
1521 : #if defined(OPENDDS_SECURITY)
1522 0 : security_config_ = participant->get_security_config();
1523 0 : participant_permissions_handle_ = participant->permissions_handle();
1524 0 : dynamic_type_ = type_support_->get_type();
1525 : #endif
1526 :
1527 0 : if (DCPS_debug_level >= 2) {
1528 0 : ACE_DEBUG((LM_DEBUG, "(%P|%t) DataWriterImpl::enable: "
1529 : "got GUID %C, publishing to topic name \"%C\" type \"%C\"\n",
1530 : LogGuid(publication_id_).c_str(),
1531 : topic_servant_->topic_name(), topic_servant_->type_name()));
1532 : }
1533 :
1534 0 : this->data_container_->publication_id_ = this->publication_id_;
1535 :
1536 0 : guard.release();
1537 :
1538 : const DDS::ReturnCode_t writer_enabled_result =
1539 0 : publisher->writer_enabled(topic_name_.in(), this);
1540 :
1541 0 : if (this->monitor_) {
1542 0 : this->monitor_->report();
1543 : }
1544 :
1545 : #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
1546 :
1547 : // Move cached data from the durability cache to the unsent data
1548 : // queue.
1549 0 : if (durability_cache != 0) {
1550 :
1551 0 : if (!durability_cache->get_data(this->domain_id_,
1552 : this->topic_name_,
1553 : get_type_name(),
1554 : this,
1555 0 : this->mb_allocator_.get(),
1556 0 : this->db_allocator_.get(),
1557 0 : this->qos_.lifespan)) {
1558 0 : ACE_ERROR((LM_ERROR,
1559 : ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::enable: ")
1560 : ACE_TEXT("unable to retrieve durable data\n")));
1561 : }
1562 : }
1563 :
1564 : #endif
1565 :
1566 0 : if (writer_enabled_result == DDS::RETCODE_OK) {
1567 0 : const Observer_rch observer = get_observer(Observer::e_ENABLED);
1568 0 : if (observer) {
1569 0 : observer->on_enabled(this);
1570 : }
1571 0 : }
1572 :
1573 0 : return writer_enabled_result;
1574 0 : }
1575 :
1576 : void
1577 0 : DataWriterImpl::send_all_to_flush_control(ACE_Guard<ACE_Recursive_Thread_Mutex>& guard)
1578 : {
1579 : DBG_ENTRY_LVL("DataWriterImpl","send_all_to_flush_control",6);
1580 :
1581 0 : SendStateDataSampleList list;
1582 :
1583 0 : ACE_UINT64 transaction_id = this->get_unsent_data(list);
1584 :
1585 0 : controlTracker.message_sent();
1586 :
1587 : //need to release guard to call down to transport
1588 0 : guard.release();
1589 :
1590 0 : this->send(list, transaction_id);
1591 0 : }
1592 :
1593 : DDS::ReturnCode_t
1594 0 : DataWriterImpl::register_instance_i(DDS::InstanceHandle_t& handle,
1595 : Message_Block_Ptr data,
1596 : const DDS::Time_t& source_timestamp)
1597 : {
1598 : DBG_ENTRY_LVL("DataWriterImpl","register_instance_i",6);
1599 :
1600 0 : if (!enabled_) {
1601 0 : ACE_ERROR_RETURN((LM_ERROR,
1602 : ACE_TEXT("(%P|%t) ERROR: ")
1603 : ACE_TEXT("DataWriterImpl::register_instance_i: ")
1604 : ACE_TEXT("Entity is not enabled.\n")),
1605 : DDS::RETCODE_NOT_ENABLED);
1606 : }
1607 :
1608 0 : DDS::ReturnCode_t ret = data_container_->register_instance(handle, data);
1609 0 : if (ret != DDS::RETCODE_OK) {
1610 0 : ACE_ERROR_RETURN((LM_ERROR,
1611 : ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::register_instance_i: ")
1612 : ACE_TEXT("register instance with container failed, returned <%C>.\n"),
1613 : retcode_to_string(ret)),
1614 : ret);
1615 : }
1616 :
1617 0 : if (this->monitor_) {
1618 0 : this->monitor_->report();
1619 : }
1620 :
1621 0 : DataSampleElement* element = 0;
1622 0 : ret = this->data_container_->obtain_buffer_for_control(element);
1623 0 : if (ret != DDS::RETCODE_OK) {
1624 0 : ACE_ERROR_RETURN((LM_ERROR,
1625 : ACE_TEXT("(%P|%t) ERROR: ")
1626 : ACE_TEXT("DataWriterImpl::register_instance_i: ")
1627 : ACE_TEXT("obtain_buffer_for_control failed, returned <%C>.\n"),
1628 : retcode_to_string(ret)),
1629 : ret);
1630 : }
1631 :
1632 : // Add header with the registration sample data.
1633 : Message_Block_Ptr sample(
1634 : create_control_message(
1635 : INSTANCE_REGISTRATION,
1636 : element->get_header(),
1637 0 : move(data),
1638 0 : source_timestamp));
1639 :
1640 0 : element->set_sample(move(sample));
1641 :
1642 0 : ret = this->data_container_->enqueue_control(element);
1643 :
1644 0 : if (ret != DDS::RETCODE_OK) {
1645 0 : data_container_->release_buffer(element);
1646 0 : ACE_ERROR_RETURN((LM_ERROR,
1647 : ACE_TEXT("(%P|%t) ERROR: ")
1648 : ACE_TEXT("DataWriterImpl::register_instance_i: ")
1649 : ACE_TEXT("enqueue_control failed, returned <%C>\n"),
1650 : retcode_to_string(ret)),
1651 : ret);
1652 : }
1653 :
1654 0 : return ret;
1655 0 : }
1656 :
1657 : DDS::ReturnCode_t
1658 0 : DataWriterImpl::register_instance_from_durable_data(
1659 : DDS::InstanceHandle_t& handle,
1660 : Message_Block_Ptr data,
1661 : const DDS::Time_t& source_timestamp)
1662 : {
1663 : DBG_ENTRY_LVL("DataWriterImpl","register_instance_from_durable_data",6);
1664 :
1665 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
1666 : guard,
1667 : get_lock(),
1668 : DDS::RETCODE_ERROR);
1669 :
1670 0 : const DDS::ReturnCode_t ret = register_instance_i(handle, move(data), source_timestamp);
1671 0 : if (ret != DDS::RETCODE_OK) {
1672 0 : ACE_ERROR_RETURN((LM_ERROR,
1673 : ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::register_instance_from_durable_data: ")
1674 : ACE_TEXT("register instance with container failed, returned <%C>.\n"),
1675 : retcode_to_string(ret)),
1676 : ret);
1677 : }
1678 :
1679 0 : send_all_to_flush_control(guard);
1680 :
1681 0 : return ret;
1682 0 : }
1683 :
1684 : DDS::ReturnCode_t
1685 0 : DataWriterImpl::unregister_instance_i(DDS::InstanceHandle_t handle,
1686 : const DDS::Time_t& source_timestamp)
1687 : {
1688 : DBG_ENTRY_LVL("DataWriterImpl","unregister_instance_i",6);
1689 :
1690 0 : if (!enabled_) {
1691 0 : ACE_ERROR_RETURN((LM_ERROR,
1692 : ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::unregister_instance_i: ")
1693 : ACE_TEXT("Entity is not enabled.\n")),
1694 : DDS::RETCODE_NOT_ENABLED);
1695 : }
1696 :
1697 : // According to spec 1.2, autodispose_unregistered_instances true causes
1698 : // dispose on the instance prior to calling unregister operation.
1699 0 : if (this->qos_.writer_data_lifecycle.autodispose_unregistered_instances) {
1700 0 : return this->dispose_and_unregister(handle, source_timestamp);
1701 : }
1702 :
1703 0 : DDS::ReturnCode_t ret = DDS::RETCODE_ERROR;
1704 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, get_lock(), ret);
1705 0 : Message_Block_Ptr unregistered_sample_data;
1706 0 : ret = this->data_container_->unregister(handle, unregistered_sample_data);
1707 :
1708 0 : if (ret != DDS::RETCODE_OK) {
1709 0 : ACE_ERROR_RETURN((LM_ERROR,
1710 : ACE_TEXT("(%P|%t) ERROR: ")
1711 : ACE_TEXT("DataWriterImpl::unregister_instance_i: ")
1712 : ACE_TEXT("unregister with container failed.\n")),
1713 : ret);
1714 : }
1715 :
1716 0 : DataSampleElement* element = 0;
1717 0 : ret = this->data_container_->obtain_buffer_for_control(element);
1718 :
1719 0 : if (ret != DDS::RETCODE_OK) {
1720 0 : ACE_ERROR_RETURN((LM_ERROR,
1721 : ACE_TEXT("(%P|%t) ERROR: ")
1722 : ACE_TEXT("DataWriterImpl::unregister_instance_i: ")
1723 : ACE_TEXT("obtain_buffer_for_control returned %d.\n"),
1724 : ret),
1725 : ret);
1726 : }
1727 :
1728 : Message_Block_Ptr sample(create_control_message(UNREGISTER_INSTANCE,
1729 : element->get_header(),
1730 0 : move(unregistered_sample_data),
1731 0 : source_timestamp));
1732 0 : element->set_sample(move(sample));
1733 :
1734 0 : ret = this->data_container_->enqueue_control(element);
1735 :
1736 0 : if (ret != DDS::RETCODE_OK) {
1737 0 : data_container_->release_buffer(element);
1738 0 : ACE_ERROR_RETURN((LM_ERROR,
1739 : ACE_TEXT("(%P|%t) ERROR: ")
1740 : ACE_TEXT("DataWriterImpl::unregister_instance_i: ")
1741 : ACE_TEXT("enqueue_control failed.\n")),
1742 : ret);
1743 : }
1744 :
1745 0 : send_all_to_flush_control(guard);
1746 0 : return DDS::RETCODE_OK;
1747 0 : }
1748 :
1749 : DDS::ReturnCode_t
1750 0 : DataWriterImpl::dispose_and_unregister(DDS::InstanceHandle_t handle,
1751 : const DDS::Time_t& source_timestamp)
1752 : {
1753 : DBG_ENTRY_LVL("DataWriterImpl", "dispose_and_unregister", 6);
1754 :
1755 0 : DDS::ReturnCode_t ret = DDS::RETCODE_ERROR;
1756 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, get_lock(), ret);
1757 :
1758 0 : Message_Block_Ptr data_sample;
1759 0 : ret = this->data_container_->dispose(handle, data_sample);
1760 :
1761 0 : if (ret != DDS::RETCODE_OK) {
1762 0 : ACE_ERROR_RETURN((LM_ERROR,
1763 : ACE_TEXT("(%P|%t) ERROR: ")
1764 : ACE_TEXT("DataWriterImpl::dispose_and_unregister: ")
1765 : ACE_TEXT("dispose on container failed.\n")),
1766 : ret);
1767 : }
1768 :
1769 0 : ret = this->data_container_->unregister(handle, data_sample, false);
1770 :
1771 0 : if (ret != DDS::RETCODE_OK) {
1772 0 : ACE_ERROR_RETURN((LM_ERROR,
1773 : ACE_TEXT("(%P|%t) ERROR: ")
1774 : ACE_TEXT("DataWriterImpl::dispose_and_unregister: ")
1775 : ACE_TEXT("unregister with container failed.\n")),
1776 : ret);
1777 : }
1778 :
1779 0 : DataSampleElement* element = 0;
1780 0 : ret = this->data_container_->obtain_buffer_for_control(element);
1781 :
1782 0 : if (ret != DDS::RETCODE_OK) {
1783 0 : ACE_ERROR_RETURN((LM_ERROR,
1784 : ACE_TEXT("(%P|%t) ERROR: ")
1785 : ACE_TEXT("DataWriterImpl::dispose_and_unregister: ")
1786 : ACE_TEXT("obtain_buffer_for_control returned %d.\n"),
1787 : ret),
1788 : ret);
1789 : }
1790 :
1791 : Message_Block_Ptr sample(create_control_message(DISPOSE_UNREGISTER_INSTANCE,
1792 : element->get_header(),
1793 0 : move(data_sample),
1794 0 : source_timestamp));
1795 0 : element->set_sample(move(sample));
1796 :
1797 0 : ret = this->data_container_->enqueue_control(element);
1798 :
1799 0 : if (ret != DDS::RETCODE_OK) {
1800 0 : data_container_->release_buffer(element);
1801 0 : ACE_ERROR_RETURN((LM_ERROR,
1802 : ACE_TEXT("(%P|%t) ERROR: ")
1803 : ACE_TEXT("DataWriterImpl::dispose_and_unregister: ")
1804 : ACE_TEXT("enqueue_control failed.\n")),
1805 : ret);
1806 : }
1807 :
1808 0 : send_all_to_flush_control(guard);
1809 0 : return DDS::RETCODE_OK;
1810 0 : }
1811 :
1812 : void
1813 0 : DataWriterImpl::unregister_instances(const DDS::Time_t& source_timestamp)
1814 : {
1815 0 : ACE_GUARD(ACE_Thread_Mutex, guard, sync_unreg_rem_assocs_lock_);
1816 :
1817 0 : while (!this->data_container_->instances_.empty()) {
1818 0 : this->unregister_instance_i(this->data_container_->instances_.begin()->first, source_timestamp);
1819 : }
1820 0 : }
1821 :
1822 : DDS::ReturnCode_t
1823 0 : DataWriterImpl::write(Message_Block_Ptr data,
1824 : DDS::InstanceHandle_t handle,
1825 : const DDS::Time_t& source_timestamp,
1826 : GUIDSeq* filter_out,
1827 : const void* real_data)
1828 : {
1829 : DBG_ENTRY_LVL("DataWriterImpl","write",6);
1830 :
1831 0 : ACE_Guard<ACE_Recursive_Thread_Mutex> guard(lock_);
1832 :
1833 : // take ownership of sequence allocated in FooDWImpl::write_w_timestamp()
1834 0 : GUIDSeq_var filter_out_var(filter_out);
1835 :
1836 0 : if (!enabled_) {
1837 0 : ACE_ERROR_RETURN((LM_ERROR,
1838 : ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::write: ")
1839 : ACE_TEXT("Entity is not enabled.\n")),
1840 : DDS::RETCODE_NOT_ENABLED);
1841 : }
1842 :
1843 0 : ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
1844 : dc_guard,
1845 : get_lock(),
1846 : DDS::RETCODE_ERROR);
1847 :
1848 0 : DataSampleElement* element = 0;
1849 0 : DDS::ReturnCode_t ret = this->data_container_->obtain_buffer(element, handle);
1850 :
1851 0 : if (ret == DDS::RETCODE_TIMEOUT) {
1852 0 : return ret; // silent for timeout
1853 :
1854 0 : } else if (ret != DDS::RETCODE_OK) {
1855 0 : ACE_ERROR_RETURN((LM_ERROR,
1856 : ACE_TEXT("(%P|%t) ERROR: ")
1857 : ACE_TEXT("DataWriterImpl::write: ")
1858 : ACE_TEXT("obtain_buffer returned %d.\n"),
1859 : ret),
1860 : ret);
1861 : }
1862 :
1863 0 : Message_Block_Ptr temp;
1864 0 : ret = create_sample_data_message(move(data),
1865 : handle,
1866 : element->get_header(),
1867 : temp,
1868 : source_timestamp,
1869 : (filter_out != 0));
1870 0 : element->set_sample(move(temp));
1871 :
1872 0 : if (ret != DDS::RETCODE_OK) {
1873 0 : data_container_->release_buffer(element);
1874 0 : return ret;
1875 : }
1876 :
1877 0 : element->set_filter_out(filter_out_var._retn()); // ownership passed to element
1878 :
1879 0 : ret = this->data_container_->enqueue(element, handle);
1880 :
1881 0 : if (ret != DDS::RETCODE_OK) {
1882 0 : data_container_->release_buffer(element);
1883 0 : ACE_ERROR_RETURN((LM_ERROR,
1884 : ACE_TEXT("(%P|%t) ERROR: ")
1885 : ACE_TEXT("DataWriterImpl::write: ")
1886 : ACE_TEXT("enqueue failed.\n")),
1887 : ret);
1888 : }
1889 0 : last_liveliness_activity_time_.set_to_now();
1890 :
1891 0 : track_sequence_number(filter_out);
1892 :
1893 0 : if (this->coherent_) {
1894 0 : ++this->coherent_samples_;
1895 : }
1896 0 : SendStateDataSampleList list;
1897 :
1898 0 : ACE_UINT64 transaction_id = this->get_unsent_data(list);
1899 :
1900 0 : RcHandle<PublisherImpl> publisher = this->publisher_servant_.lock();
1901 0 : if (!publisher || publisher->is_suspended()) {
1902 0 : if (min_suspended_transaction_id_ == 0) {
1903 : //provides transaction id for lower bound of suspended transactions
1904 : //or transaction id for single suspended write transaction
1905 0 : min_suspended_transaction_id_ = transaction_id;
1906 : } else {
1907 : //when multiple write transactions have suspended, provides the upper bound
1908 : //for suspended transactions.
1909 0 : max_suspended_transaction_id_ = transaction_id;
1910 : }
1911 0 : this->available_data_list_.enqueue_tail(list);
1912 :
1913 : } else {
1914 0 : dc_guard.release();
1915 0 : guard.release();
1916 0 : this->send(list, transaction_id);
1917 : }
1918 :
1919 0 : const ValueDispatcher* vd = get_value_dispatcher();
1920 0 : const Observer_rch observer = get_observer(Observer::e_SAMPLE_SENT);
1921 0 : if (observer && real_data && vd) {
1922 0 : Observer::Sample s(handle, element->get_header().instance_state(), source_timestamp, element->get_header().sequence_, real_data, *vd);
1923 0 : observer->on_sample_sent(this, s);
1924 : }
1925 :
1926 0 : return DDS::RETCODE_OK;
1927 0 : }
1928 :
1929 : void
1930 0 : DataWriterImpl::track_sequence_number(GUIDSeq* filter_out)
1931 : {
1932 0 : const SequenceNumber sn = get_max_sn();
1933 0 : ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
1934 :
1935 : #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
1936 : // Track individual expected sequence numbers in ReaderInfo
1937 0 : RepoIdSet excluded;
1938 :
1939 0 : if (filter_out && !reader_info_.empty()) {
1940 0 : const GUID_t* buf = filter_out->get_buffer();
1941 0 : excluded.insert(buf, buf + filter_out->length());
1942 : }
1943 :
1944 0 : for (RepoIdToReaderInfoMap::iterator iter = reader_info_.begin(),
1945 0 : end = reader_info_.end(); iter != end; ++iter) {
1946 : // If not excluding this reader, update expected sequence
1947 0 : if (excluded.count(iter->first) == 0) {
1948 0 : iter->second.expected_sequence_ = sn;
1949 : }
1950 : }
1951 :
1952 : #else
1953 : ACE_UNUSED_ARG(filter_out);
1954 : for (RepoIdToReaderInfoMap::iterator iter = reader_info_.begin(),
1955 : end = reader_info_.end(); iter != end; ++iter) {
1956 : iter->second.expected_sequence_ = sn;
1957 : }
1958 :
1959 : #endif // OPENDDS_NO_CONTENT_FILTERED_TOPIC
1960 :
1961 0 : }
1962 :
1963 : void
1964 0 : DataWriterImpl::send_suspended_data()
1965 : {
1966 : //this serves to get TransportClient's max_transaction_id_seen_
1967 : //to the correct value for this list of transactions
1968 0 : if (max_suspended_transaction_id_ != 0) {
1969 0 : this->send(this->available_data_list_, max_suspended_transaction_id_);
1970 0 : max_suspended_transaction_id_ = 0;
1971 : }
1972 :
1973 : //this serves to actually have the send proceed in
1974 : //sending the samples to the datalinks by passing it
1975 : //the min_suspended_transaction_id_ which should be the
1976 : //TransportClient's expected_transaction_id_
1977 0 : this->send(this->available_data_list_, min_suspended_transaction_id_);
1978 0 : min_suspended_transaction_id_ = 0;
1979 0 : this->available_data_list_.reset();
1980 0 : }
1981 :
1982 : DDS::ReturnCode_t
1983 0 : DataWriterImpl::dispose(DDS::InstanceHandle_t handle,
1984 : const DDS::Time_t & source_timestamp)
1985 : {
1986 : DBG_ENTRY_LVL("DataWriterImpl","dispose",6);
1987 :
1988 0 : if (!enabled_) {
1989 0 : ACE_ERROR_RETURN((LM_ERROR,
1990 : ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::dispose: ")
1991 : ACE_TEXT("Entity is not enabled.\n")),
1992 : DDS::RETCODE_NOT_ENABLED);
1993 : }
1994 :
1995 0 : DDS::ReturnCode_t ret = DDS::RETCODE_ERROR;
1996 :
1997 0 : ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, guard, get_lock(), ret);
1998 :
1999 0 : Message_Block_Ptr registered_sample_data;
2000 0 : ret = this->data_container_->dispose(handle, registered_sample_data);
2001 :
2002 0 : if (ret != DDS::RETCODE_OK) {
2003 0 : ACE_ERROR_RETURN((LM_ERROR,
2004 : ACE_TEXT("(%P|%t) ERROR: ")
2005 : ACE_TEXT("DataWriterImpl::dispose: ")
2006 : ACE_TEXT("dispose failed.\n")),
2007 : ret);
2008 : }
2009 :
2010 0 : DataSampleElement* element = 0;
2011 0 : ret = this->data_container_->obtain_buffer_for_control(element);
2012 :
2013 0 : if (ret != DDS::RETCODE_OK) {
2014 0 : ACE_ERROR_RETURN((LM_ERROR,
2015 : ACE_TEXT("(%P|%t) ERROR: ")
2016 : ACE_TEXT("DataWriterImpl::dispose: ")
2017 : ACE_TEXT("obtain_buffer_for_control returned %d.\n"),
2018 : ret),
2019 : ret);
2020 : }
2021 :
2022 : Message_Block_Ptr sample(create_control_message(DISPOSE_INSTANCE,
2023 : element->get_header(),
2024 0 : move(registered_sample_data),
2025 0 : source_timestamp));
2026 0 : element->set_sample(move(sample));
2027 :
2028 0 : ret = this->data_container_->enqueue_control(element);
2029 :
2030 0 : if (ret != DDS::RETCODE_OK) {
2031 0 : data_container_->release_buffer(element);
2032 0 : ACE_ERROR_RETURN((LM_ERROR,
2033 : ACE_TEXT("(%P|%t) ERROR: ")
2034 : ACE_TEXT("DataWriterImpl::dispose: ")
2035 : ACE_TEXT("enqueue_control failed.\n")),
2036 : ret);
2037 : }
2038 :
2039 0 : send_all_to_flush_control(guard);
2040 :
2041 0 : return DDS::RETCODE_OK;
2042 0 : }
2043 :
2044 : DDS::ReturnCode_t
2045 0 : DataWriterImpl::num_samples(DDS::InstanceHandle_t handle,
2046 : size_t& size)
2047 : {
2048 0 : return data_container_->num_samples(handle, size);
2049 : }
2050 :
2051 : void
2052 0 : DataWriterImpl::unregister_all()
2053 : {
2054 0 : data_container_->unregister_all();
2055 0 : }
2056 :
2057 : GUID_t
2058 0 : DataWriterImpl::get_dp_id()
2059 : {
2060 0 : return dp_id_;
2061 : }
2062 :
2063 : char const *
2064 0 : DataWriterImpl::get_type_name() const
2065 : {
2066 0 : return type_name_.in();
2067 : }
2068 :
2069 : ACE_Message_Block*
2070 0 : DataWriterImpl::create_control_message(MessageId message_id,
2071 : DataSampleHeader& header_data,
2072 : Message_Block_Ptr data,
2073 : const DDS::Time_t& source_timestamp)
2074 : {
2075 0 : header_data.message_id_ = message_id;
2076 0 : header_data.byte_order_ =
2077 0 : this->swap_bytes() ? !ACE_CDR_BYTE_ORDER : ACE_CDR_BYTE_ORDER;
2078 0 : header_data.coherent_change_ = false;
2079 :
2080 0 : if (data) {
2081 0 : header_data.message_length_ = static_cast<ACE_UINT32>(data->total_length());
2082 : }
2083 :
2084 0 : header_data.sequence_ = SequenceNumber::SEQUENCENUMBER_UNKNOWN();
2085 0 : header_data.sequence_repair_ = false; // set below
2086 0 : header_data.source_timestamp_sec_ = source_timestamp.sec;
2087 0 : header_data.source_timestamp_nanosec_ = source_timestamp.nanosec;
2088 0 : header_data.publication_id_ = publication_id_;
2089 :
2090 0 : RcHandle<PublisherImpl> publisher = this->publisher_servant_.lock();
2091 0 : if (!publisher) {
2092 0 : return 0;
2093 : }
2094 :
2095 0 : header_data.publisher_id_ = publisher->publisher_id_;
2096 :
2097 0 : ACE_Guard<ACE_Thread_Mutex> guard(sn_lock_);
2098 0 : SequenceNumber sequence = sequence_number_;
2099 0 : if (message_id == INSTANCE_REGISTRATION
2100 0 : || message_id == DISPOSE_INSTANCE
2101 0 : || message_id == UNREGISTER_INSTANCE
2102 0 : || message_id == DISPOSE_UNREGISTER_INSTANCE
2103 0 : || message_id == REQUEST_ACK) {
2104 :
2105 0 : header_data.sequence_repair_ = need_sequence_repair();
2106 0 : header_data.sequence_ = get_next_sn_i();
2107 0 : header_data.key_fields_only_ = true;
2108 0 : sequence = sequence_number_;
2109 : }
2110 0 : guard.release();
2111 :
2112 0 : ACE_Message_Block* message = 0;
2113 0 : ACE_NEW_MALLOC_RETURN(message,
2114 : static_cast<ACE_Message_Block*>(
2115 : mb_allocator_->malloc(sizeof(ACE_Message_Block))),
2116 : ACE_Message_Block(
2117 : DataSampleHeader::get_max_serialized_size(),
2118 : ACE_Message_Block::MB_DATA,
2119 : header_data.message_length_ ? data.release() : 0, //cont
2120 : 0, //data
2121 : 0, //allocator_strategy
2122 : get_db_lock(), //locking_strategy
2123 : ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
2124 : ACE_Time_Value::zero,
2125 : ACE_Time_Value::max_time,
2126 : db_allocator_.get(),
2127 : mb_allocator_.get()),
2128 : 0);
2129 :
2130 0 : *message << header_data;
2131 :
2132 : // If we incremented sequence number for this control message
2133 0 : if (header_data.sequence_ != SequenceNumber::SEQUENCENUMBER_UNKNOWN()) {
2134 0 : ACE_GUARD_RETURN(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_, 0);
2135 : // Update the expected sequence number for all readers
2136 0 : RepoIdToReaderInfoMap::iterator reader;
2137 :
2138 0 : for (reader = reader_info_.begin(); reader != reader_info_.end(); ++reader) {
2139 0 : reader->second.expected_sequence_ = sequence;
2140 : }
2141 0 : }
2142 0 : if (DCPS_debug_level >= 4) {
2143 0 : ACE_DEBUG((LM_DEBUG,
2144 : ACE_TEXT("(%P|%t) DataWriterImpl::create_control_message: ")
2145 : ACE_TEXT("from publication %C sending control sample: %C .\n"),
2146 : LogGuid(publication_id_).c_str(),
2147 : to_string(header_data).c_str()));
2148 : }
2149 0 : return message;
2150 0 : }
2151 :
2152 : DDS::ReturnCode_t
2153 0 : DataWriterImpl::create_sample_data_message(Message_Block_Ptr data,
2154 : DDS::InstanceHandle_t instance_handle,
2155 : DataSampleHeader& header_data,
2156 : Message_Block_Ptr& message,
2157 : const DDS::Time_t& source_timestamp,
2158 : bool content_filter)
2159 : {
2160 : PublicationInstance_rch instance =
2161 0 : data_container_->get_handle_instance(instance_handle);
2162 :
2163 0 : if (0 == instance) {
2164 0 : ACE_ERROR_RETURN((LM_ERROR,
2165 : ACE_TEXT("(%P|%t) DataWriterImpl::create_sample_data_message ")
2166 : ACE_TEXT("failed to find instance for handle %d\n"),
2167 : instance_handle),
2168 : DDS::RETCODE_ERROR);
2169 : }
2170 :
2171 0 : header_data.message_id_ = SAMPLE_DATA;
2172 0 : header_data.byte_order_ =
2173 0 : this->swap_bytes() ? !ACE_CDR_BYTE_ORDER : ACE_CDR_BYTE_ORDER;
2174 0 : header_data.coherent_change_ = this->coherent_;
2175 :
2176 0 : RcHandle<PublisherImpl> publisher = this->publisher_servant_.lock();
2177 :
2178 0 : if (!publisher) {
2179 0 : return DDS::RETCODE_ERROR;
2180 : }
2181 :
2182 : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
2183 0 : header_data.group_coherent_ =
2184 0 : publisher->qos_.presentation.access_scope
2185 0 : == DDS::GROUP_PRESENTATION_QOS;
2186 : #endif
2187 0 : header_data.content_filter_ = content_filter;
2188 0 : header_data.cdr_encapsulation_ = this->cdr_encapsulation();
2189 0 : header_data.message_length_ = static_cast<ACE_UINT32>(data->total_length());
2190 : {
2191 0 : ACE_Guard<ACE_Thread_Mutex> guard(sn_lock_);
2192 0 : header_data.sequence_repair_ = need_sequence_repair();
2193 0 : header_data.sequence_ = get_next_sn_i();
2194 0 : }
2195 0 : header_data.source_timestamp_sec_ = source_timestamp.sec;
2196 0 : header_data.source_timestamp_nanosec_ = source_timestamp.nanosec;
2197 :
2198 0 : if (qos_.lifespan.duration.sec != DDS::DURATION_INFINITE_SEC
2199 0 : || qos_.lifespan.duration.nanosec != DDS::DURATION_INFINITE_NSEC) {
2200 0 : header_data.lifespan_duration_ = true;
2201 0 : header_data.lifespan_duration_sec_ = qos_.lifespan.duration.sec;
2202 0 : header_data.lifespan_duration_nanosec_ = qos_.lifespan.duration.nanosec;
2203 : }
2204 :
2205 0 : header_data.publication_id_ = publication_id_;
2206 0 : header_data.publisher_id_ = publisher->publisher_id_;
2207 :
2208 : ACE_Message_Block* tmp_message;
2209 0 : ACE_NEW_MALLOC_RETURN(tmp_message,
2210 : static_cast<ACE_Message_Block*>(
2211 : mb_allocator_->malloc(sizeof(ACE_Message_Block))),
2212 : ACE_Message_Block(DataSampleHeader::get_max_serialized_size(),
2213 : ACE_Message_Block::MB_DATA,
2214 : data.release(), //cont
2215 : 0, //data
2216 : header_allocator_.get(), //alloc_strategy
2217 : get_db_lock(), //locking_strategy
2218 : ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
2219 : ACE_Time_Value::zero,
2220 : ACE_Time_Value::max_time,
2221 : db_allocator_.get(),
2222 : mb_allocator_.get()),
2223 : DDS::RETCODE_ERROR);
2224 0 : message.reset(tmp_message);
2225 0 : *message << header_data;
2226 0 : if (DCPS_debug_level >= 4) {
2227 0 : ACE_DEBUG((LM_DEBUG,
2228 : ACE_TEXT("(%P|%t) DataWriterImpl::create_sample_data_message: ")
2229 : ACE_TEXT("from publication %C sending data sample: %C .\n"),
2230 : LogGuid(publication_id_).c_str(),
2231 : to_string(header_data).c_str()));
2232 : }
2233 0 : return DDS::RETCODE_OK;
2234 0 : }
2235 :
2236 : void
2237 0 : DataWriterImpl::data_delivered(const DataSampleElement* sample)
2238 : {
2239 : DBG_ENTRY_LVL("DataWriterImpl","data_delivered",6);
2240 :
2241 0 : if (!(sample->get_pub_id() == this->publication_id_)) {
2242 0 : ACE_ERROR((LM_ERROR,
2243 : ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::data_delivered: ")
2244 : ACE_TEXT("The publication id %C from delivered element ")
2245 : ACE_TEXT("does not match the datawriter's id %C\n"),
2246 : LogGuid(sample->get_pub_id()).c_str(),
2247 : LogGuid(publication_id_).c_str()));
2248 0 : return;
2249 : }
2250 : //provided for statistics tracking in tests
2251 0 : ++data_delivered_count_;
2252 :
2253 0 : this->data_container_->data_delivered(sample);
2254 : }
2255 :
2256 : void
2257 0 : DataWriterImpl::control_delivered(const Message_Block_Ptr&)
2258 : {
2259 : DBG_ENTRY_LVL("DataWriterImpl","control_delivered",6);
2260 0 : controlTracker.message_delivered();
2261 0 : }
2262 :
2263 : RcHandle<EntityImpl>
2264 0 : DataWriterImpl::parent() const
2265 : {
2266 0 : return this->publisher_servant_.lock();
2267 : }
2268 :
2269 : #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
2270 : bool
2271 0 : DataWriterImpl::filter_out(const DataSampleElement& elt,
2272 : const OPENDDS_STRING& filterClassName,
2273 : const FilterEvaluator& evaluator,
2274 : const DDS::StringSeq& expression_params) const
2275 : {
2276 0 : if (!type_support_) {
2277 0 : if (log_level >= LogLevel::Error) {
2278 0 : ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: DataWriterImpl::filter_out: Could not cast type support, not filtering\n"));
2279 : }
2280 0 : return false;
2281 : }
2282 :
2283 0 : if (filterClassName == "DDSSQL" ||
2284 0 : filterClassName == "OPENDDSSQL") {
2285 0 : if (!elt.get_header().valid_data() && evaluator.has_non_key_fields(*type_support_)) {
2286 0 : return true;
2287 : }
2288 : try {
2289 0 : return !evaluator.eval(elt.get_sample()->cont(), encoding_mode_.encoding(),
2290 0 : *type_support_, expression_params);
2291 0 : } catch (const std::runtime_error&) {
2292 : // if the eval fails, the throws will do the logging
2293 : // return false here so that the sample is not filtered
2294 0 : return false;
2295 0 : }
2296 : } else {
2297 0 : return false;
2298 : }
2299 : }
2300 : #endif
2301 :
2302 : bool
2303 0 : DataWriterImpl::check_transport_qos(const TransportInst&)
2304 : {
2305 : // DataWriter does not impose any constraints on which transports
2306 : // may be used based on QoS.
2307 0 : return true;
2308 : }
2309 :
2310 : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
2311 :
2312 : bool
2313 0 : DataWriterImpl::coherent_changes_pending()
2314 : {
2315 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
2316 : guard,
2317 : get_lock(),
2318 : false);
2319 :
2320 0 : return this->coherent_;
2321 0 : }
2322 :
2323 : void
2324 0 : DataWriterImpl::begin_coherent_changes()
2325 : {
2326 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex,
2327 : guard,
2328 : get_lock());
2329 :
2330 0 : this->coherent_ = true;
2331 0 : }
2332 :
2333 : void
2334 0 : DataWriterImpl::end_coherent_changes(const GroupCoherentSamples& group_samples)
2335 : {
2336 : // PublisherImpl::pi_lock_ should be held.
2337 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex,
2338 : guard,
2339 : get_lock());
2340 :
2341 0 : CoherentChangeControl end_msg;
2342 0 : end_msg.coherent_samples_.num_samples_ = this->coherent_samples_;
2343 0 : end_msg.coherent_samples_.last_sample_ = get_max_sn();
2344 :
2345 0 : RcHandle<PublisherImpl> publisher = this->publisher_servant_.lock();
2346 :
2347 0 : if (publisher) {
2348 : end_msg.group_coherent_
2349 0 : = publisher->qos_.presentation.access_scope == DDS::GROUP_PRESENTATION_QOS;
2350 : }
2351 :
2352 0 : if (publisher && end_msg.group_coherent_) {
2353 0 : end_msg.publisher_id_ = publisher->publisher_id_;
2354 0 : end_msg.group_coherent_samples_ = group_samples;
2355 : }
2356 :
2357 : Message_Block_Ptr data(
2358 : new ACE_Message_Block(
2359 0 : end_msg.get_max_serialized_size(),
2360 : ACE_Message_Block::MB_DATA,
2361 : 0, // cont
2362 : 0, // data
2363 : 0, // alloc_strategy
2364 0 : get_db_lock()));
2365 :
2366 : Serializer serializer(data.get(), Encoding::KIND_UNALIGNED_CDR,
2367 0 : this->swap_bytes());
2368 :
2369 0 : serializer << end_msg;
2370 :
2371 0 : DataSampleHeader header;
2372 : Message_Block_Ptr control(
2373 : create_control_message(
2374 0 : END_COHERENT_CHANGES, header, move(data),
2375 0 : SystemTimePoint::now().to_dds_time()));
2376 :
2377 0 : this->coherent_ = false;
2378 0 : this->coherent_samples_ = 0;
2379 :
2380 0 : guard.release();
2381 0 : if (this->send_control(header, move(control)) == SEND_CONTROL_ERROR) {
2382 0 : ACE_ERROR((LM_ERROR,
2383 : ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::end_coherent_changes:")
2384 : ACE_TEXT(" unable to send END_COHERENT_CHANGES control message!\n")));
2385 : }
2386 0 : }
2387 :
2388 : #endif // OPENDDS_NO_OBJECT_MODEL_PROFILE
2389 :
2390 : void
2391 0 : DataWriterImpl::data_dropped(const DataSampleElement* element,
2392 : bool dropped_by_transport)
2393 : {
2394 : DBG_ENTRY_LVL("DataWriterImpl","data_dropped",6);
2395 :
2396 : //provided for statistics tracking in tests
2397 0 : ++data_dropped_count_;
2398 :
2399 0 : this->data_container_->data_dropped(element, dropped_by_transport);
2400 0 : }
2401 :
2402 : void
2403 0 : DataWriterImpl::control_dropped(const Message_Block_Ptr&,
2404 : bool /* dropped_by_transport */)
2405 : {
2406 : DBG_ENTRY_LVL("DataWriterImpl","control_dropped",6);
2407 0 : controlTracker.message_dropped();
2408 0 : }
2409 :
2410 : DDS::DataWriterListener_ptr
2411 0 : DataWriterImpl::listener_for(DDS::StatusKind kind)
2412 : {
2413 : // per 2.1.4.3.1 Listener Access to Plain Communication Status
2414 : // use this entities factory if listener is mask not enabled
2415 : // for this kind.
2416 0 : RcHandle<PublisherImpl> publisher = publisher_servant_.lock();
2417 0 : if (!publisher)
2418 0 : return 0;
2419 :
2420 0 : ACE_Guard<ACE_Thread_Mutex> g(listener_mutex_);
2421 0 : if (CORBA::is_nil(listener_.in()) || (listener_mask_ & kind) == 0) {
2422 0 : g.release();
2423 0 : return publisher->listener_for(kind);
2424 :
2425 : } else {
2426 0 : return DDS::DataWriterListener::_duplicate(listener_.in());
2427 : }
2428 0 : }
2429 :
2430 : int
2431 0 : DataWriterImpl::handle_timeout(const ACE_Time_Value& tv,
2432 : const void* /* arg */)
2433 : {
2434 0 : ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager());
2435 :
2436 0 : const MonotonicTimePoint now(tv);
2437 0 : bool liveliness_lost = false;
2438 :
2439 0 : ACE_Guard<ACE_Recursive_Thread_Mutex> guard(lock_);
2440 :
2441 0 : TimeDuration elapsed = now - last_liveliness_activity_time_;
2442 :
2443 : // Do we need to send a liveliness message?
2444 0 : if (elapsed >= liveliness_check_interval_) {
2445 0 : switch (this->qos_.liveliness.kind) {
2446 0 : case DDS::AUTOMATIC_LIVELINESS_QOS:
2447 0 : if (!send_liveliness(now)) {
2448 0 : liveliness_lost = true;
2449 : }
2450 0 : break;
2451 :
2452 0 : case DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS:
2453 0 : if (liveliness_asserted_) {
2454 0 : if (!send_liveliness(now)) {
2455 0 : liveliness_lost = true;
2456 : }
2457 : }
2458 0 : break;
2459 :
2460 0 : case DDS::MANUAL_BY_TOPIC_LIVELINESS_QOS:
2461 : // Do nothing.
2462 0 : break;
2463 : }
2464 : }
2465 : else {
2466 : // Reschedule.
2467 0 : if (reactor_->cancel_timer(liveness_timer_.in()) == -1) {
2468 0 : ACE_ERROR((LM_ERROR,
2469 : ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::handle_timeout: %p.\n"),
2470 : ACE_TEXT("cancel_timer")));
2471 : }
2472 0 : if (reactor_->schedule_timer(liveness_timer_.in(), 0,
2473 0 : (liveliness_check_interval_ - elapsed).value(),
2474 0 : liveliness_check_interval_.value()) == -1)
2475 : {
2476 0 : ACE_ERROR((LM_ERROR,
2477 : ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::handle_timeout: %p.\n"),
2478 : ACE_TEXT("schedule_timer")));
2479 : }
2480 0 : return 0;
2481 : }
2482 :
2483 0 : liveliness_asserted_ = false;
2484 0 : elapsed = now - last_liveliness_activity_time_;
2485 :
2486 : // Have we lost liveliness?
2487 0 : if (elapsed >= TimeDuration(qos_.liveliness.lease_duration)) {
2488 0 : liveliness_lost = true;
2489 : }
2490 :
2491 0 : if (!this->liveliness_lost_ && liveliness_lost) {
2492 0 : ++ this->liveliness_lost_status_.total_count;
2493 0 : ++ this->liveliness_lost_status_.total_count_change;
2494 :
2495 : DDS::DataWriterListener_var listener =
2496 0 : listener_for(DDS::LIVELINESS_LOST_STATUS);
2497 :
2498 0 : if (!CORBA::is_nil(listener.in())) {
2499 : {
2500 0 : ACE_Reverse_Lock<ACE_Recursive_Thread_Mutex> rev_lock(lock_);
2501 0 : ACE_Guard<ACE_Reverse_Lock<ACE_Recursive_Thread_Mutex> > rev_guard(rev_lock);
2502 0 : listener->on_liveliness_lost(this, this->liveliness_lost_status_);
2503 0 : }
2504 0 : this->liveliness_lost_status_.total_count_change = 0;
2505 : }
2506 0 : }
2507 :
2508 0 : this->liveliness_lost_ = liveliness_lost;
2509 0 : return 0;
2510 0 : }
2511 :
2512 : bool
2513 0 : DataWriterImpl::send_liveliness(const MonotonicTimePoint& now)
2514 : {
2515 0 : if (this->qos_.liveliness.kind == DDS::MANUAL_BY_TOPIC_LIVELINESS_QOS ||
2516 0 : !TheServiceParticipant->get_discovery(domain_id_)->supports_liveliness()) {
2517 0 : DataSampleHeader header;
2518 0 : Message_Block_Ptr empty;
2519 : Message_Block_Ptr liveliness_msg(
2520 : create_control_message(
2521 0 : DATAWRITER_LIVELINESS, header, move(empty),
2522 0 : SystemTimePoint::now().to_dds_time()));
2523 :
2524 0 : if (this->send_control(header, move(liveliness_msg)) == SEND_CONTROL_ERROR) {
2525 0 : ACE_ERROR_RETURN((LM_ERROR,
2526 : ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::send_liveliness: ")
2527 : ACE_TEXT("send_control failed.\n")),
2528 : false);
2529 : }
2530 0 : }
2531 0 : last_liveliness_activity_time_ = now;
2532 0 : return true;
2533 : }
2534 :
2535 : void
2536 0 : DataWriterImpl::prepare_to_delete()
2537 : {
2538 0 : const Observer_rch observer = get_observer(Observer::e_DELETED);
2539 0 : if (observer) {
2540 0 : observer->on_deleted(this);
2541 : }
2542 :
2543 0 : this->set_deleted(true);
2544 0 : this->stop_associating();
2545 0 : this->terminate_send_if_suspended();
2546 :
2547 : #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
2548 : // Trigger data to be persisted, i.e. made durable, if so
2549 : // configured. This needs be called before unregister_instances
2550 : // because unregister_instances may cause instance dispose.
2551 0 : if (!persist_data() && DCPS_debug_level >= 2) {
2552 0 : ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::prepare_to_delete: ")
2553 : ACE_TEXT("failed to make data durable.\n")));
2554 : }
2555 : #endif
2556 :
2557 : // Unregister all registered instances prior to deletion.
2558 0 : unregister_instances(SystemTimePoint::now().to_dds_time());
2559 0 : }
2560 :
2561 : PublicationInstance_rch
2562 0 : DataWriterImpl::get_handle_instance(DDS::InstanceHandle_t handle)
2563 : {
2564 :
2565 0 : if (0 != data_container_) {
2566 0 : return data_container_->get_handle_instance(handle);
2567 : }
2568 :
2569 0 : return PublicationInstance_rch();
2570 : }
2571 :
2572 : void
2573 0 : DataWriterImpl::notify_publication_disconnected(const ReaderIdSeq& subids)
2574 : {
2575 : DBG_ENTRY_LVL("DataWriterImpl","notify_publication_disconnected",6);
2576 :
2577 0 : if (!is_bit_) {
2578 : // Narrow to DDS::DCPS::DataWriterListener. If a DDS::DataWriterListener
2579 : // is given to this DataWriter then narrow() fails.
2580 0 : DataWriterListener_var the_listener = get_ext_listener();
2581 :
2582 0 : if (!CORBA::is_nil(the_listener.in())) {
2583 0 : PublicationDisconnectedStatus status;
2584 : // Since this callback may come after remove_association which
2585 : // removes the reader from id_to_handle map, we can ignore this
2586 : // error.
2587 0 : this->lookup_instance_handles(subids,
2588 : status.subscription_handles);
2589 0 : the_listener->on_publication_disconnected(this, status);
2590 0 : }
2591 0 : }
2592 0 : }
2593 :
2594 : void
2595 0 : DataWriterImpl::notify_publication_reconnected(const ReaderIdSeq& subids)
2596 : {
2597 : DBG_ENTRY_LVL("DataWriterImpl","notify_publication_reconnected",6);
2598 :
2599 0 : if (!is_bit_) {
2600 : // Narrow to DDS::DCPS::DataWriterListener. If a
2601 : // DDS::DataWriterListener is given to this DataWriter then
2602 : // narrow() fails.
2603 0 : DataWriterListener_var the_listener = get_ext_listener();
2604 :
2605 0 : if (!CORBA::is_nil(the_listener.in())) {
2606 0 : PublicationDisconnectedStatus status;
2607 :
2608 : // If it's reconnected then the reader should be in id_to_handle
2609 0 : this->lookup_instance_handles(subids, status.subscription_handles);
2610 :
2611 0 : the_listener->on_publication_reconnected(this, status);
2612 0 : }
2613 0 : }
2614 0 : }
2615 :
2616 : void
2617 0 : DataWriterImpl::notify_publication_lost(const ReaderIdSeq& subids)
2618 : {
2619 : DBG_ENTRY_LVL("DataWriterImpl","notify_publication_lost",6);
2620 :
2621 0 : if (!is_bit_) {
2622 : // Narrow to DDS::DCPS::DataWriterListener. If a
2623 : // DDS::DataWriterListener is given to this DataWriter then
2624 : // narrow() fails.
2625 0 : DataWriterListener_var the_listener = get_ext_listener();
2626 :
2627 0 : if (!CORBA::is_nil(the_listener.in())) {
2628 0 : PublicationLostStatus status;
2629 :
2630 : // Since this callback may come after remove_association which removes
2631 : // the reader from id_to_handle map, we can ignore this error.
2632 0 : this->lookup_instance_handles(subids,
2633 : status.subscription_handles);
2634 0 : the_listener->on_publication_lost(this, status);
2635 0 : }
2636 0 : }
2637 0 : }
2638 :
2639 : void
2640 0 : DataWriterImpl::notify_publication_lost(const DDS::InstanceHandleSeq& handles)
2641 : {
2642 : DBG_ENTRY_LVL("DataWriterImpl","notify_publication_lost",6);
2643 :
2644 0 : if (!is_bit_) {
2645 : // Narrow to DDS::DCPS::DataWriterListener. If a
2646 : // DDS::DataWriterListener is given to this DataWriter then
2647 : // narrow() fails.
2648 0 : DataWriterListener_var the_listener = get_ext_listener();
2649 :
2650 0 : if (!CORBA::is_nil(the_listener.in())) {
2651 0 : PublicationLostStatus status;
2652 :
2653 0 : CORBA::ULong len = handles.length();
2654 0 : status.subscription_handles.length(len);
2655 :
2656 0 : for (CORBA::ULong i = 0; i < len; ++ i) {
2657 0 : status.subscription_handles[i] = handles[i];
2658 : }
2659 :
2660 0 : the_listener->on_publication_lost(this, status);
2661 0 : }
2662 0 : }
2663 0 : }
2664 :
2665 :
2666 : void
2667 0 : DataWriterImpl::lookup_instance_handles(const ReaderIdSeq& ids,
2668 : DDS::InstanceHandleSeq & hdls)
2669 : {
2670 0 : CORBA::ULong const num_rds = ids.length();
2671 0 : RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock();
2672 :
2673 0 : if (!participant)
2674 0 : return;
2675 :
2676 0 : if (DCPS_debug_level > 9) {
2677 0 : OPENDDS_STRING separator;
2678 0 : OPENDDS_STRING buffer;
2679 :
2680 0 : for (CORBA::ULong i = 0; i < num_rds; ++i) {
2681 0 : buffer += separator + LogGuid(ids[i]).conv_;
2682 0 : separator = ", ";
2683 : }
2684 :
2685 0 : ACE_DEBUG((LM_DEBUG,
2686 : ACE_TEXT("(%P|%t) DataWriterImpl::lookup_instance_handles: ")
2687 : ACE_TEXT("searching for handles for reader Ids: %C.\n"),
2688 : buffer.c_str()));
2689 0 : }
2690 :
2691 0 : hdls.length(num_rds);
2692 :
2693 0 : for (CORBA::ULong i = 0; i < num_rds; ++i) {
2694 0 : hdls[i] = participant->lookup_handle(ids[i]);
2695 : }
2696 0 : }
2697 :
2698 : #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
2699 : bool
2700 0 : DataWriterImpl::persist_data()
2701 : {
2702 0 : return this->data_container_->persist_data();
2703 : }
2704 : #endif
2705 :
2706 0 : void DataWriterImpl::wait_pending()
2707 : {
2708 0 : if (!TransportRegistry::instance()->released()) {
2709 0 : data_container_->wait_pending(wait_pending_deadline_);
2710 0 : controlTracker.wait_messages_pending("DataWriterImpl::wait_pending", wait_pending_deadline_);
2711 : }
2712 0 : }
2713 :
2714 : void
2715 0 : DataWriterImpl::get_instance_handles(InstanceHandleVec& instance_handles)
2716 : {
2717 0 : this->data_container_->get_instance_handles(instance_handles);
2718 0 : }
2719 :
2720 : void
2721 0 : DataWriterImpl::get_readers(RepoIdSet& readers)
2722 : {
2723 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
2724 0 : readers = this->readers_;
2725 0 : }
2726 :
2727 : void
2728 0 : DataWriterImpl::retrieve_inline_qos_data(TransportSendListener::InlineQosData& qos_data) const
2729 : {
2730 0 : RcHandle<PublisherImpl> publisher = this->publisher_servant_.lock();
2731 0 : if (publisher) {
2732 0 : publisher->get_qos(qos_data.pub_qos);
2733 : }
2734 0 : qos_data.dw_qos = this->qos_;
2735 0 : qos_data.topic_name = this->topic_name_.in();
2736 0 : }
2737 :
2738 : #if defined(OPENDDS_SECURITY)
2739 0 : DDS::Security::ParticipantCryptoHandle DataWriterImpl::get_crypto_handle() const
2740 : {
2741 0 : RcHandle<DomainParticipantImpl> participant = participant_servant_.lock();
2742 0 : return participant ? participant->crypto_handle() : DDS::HANDLE_NIL;
2743 0 : }
2744 : #endif
2745 :
2746 : bool
2747 0 : DataWriterImpl::need_sequence_repair()
2748 : {
2749 0 : ACE_GUARD_RETURN(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_, false);
2750 0 : return need_sequence_repair_i();
2751 0 : }
2752 :
2753 : bool
2754 0 : DataWriterImpl::need_sequence_repair_i() const
2755 : {
2756 0 : for (RepoIdToReaderInfoMap::const_iterator it = reader_info_.begin(),
2757 0 : end = reader_info_.end(); it != end; ++it) {
2758 0 : if (it->second.expected_sequence_ != sequence_number_) {
2759 0 : return true;
2760 : }
2761 : }
2762 :
2763 0 : return false;
2764 : }
2765 :
2766 : SendControlStatus
2767 0 : DataWriterImpl::send_control(const DataSampleHeader& header,
2768 : Message_Block_Ptr msg)
2769 : {
2770 0 : controlTracker.message_sent();
2771 :
2772 0 : SendControlStatus status = TransportClient::send_control(header, move(msg));
2773 :
2774 0 : if (status != SEND_CONTROL_OK) {
2775 0 : controlTracker.message_dropped();
2776 : }
2777 :
2778 0 : return status;
2779 : }
2780 :
2781 : WeakRcHandle<ICE::Endpoint>
2782 0 : DataWriterImpl::get_ice_endpoint()
2783 : {
2784 0 : return TransportClient::get_ice_endpoint();
2785 : }
2786 :
2787 0 : void DataWriterImpl::set_wait_pending_deadline(const MonotonicTimePoint& deadline)
2788 : {
2789 0 : wait_pending_deadline_ = deadline;
2790 0 : }
2791 :
2792 0 : int LivenessTimer::handle_timeout(const ACE_Time_Value& tv, const void* arg)
2793 : {
2794 0 : ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager());
2795 :
2796 0 : DataWriterImpl_rch writer = this->writer_.lock();
2797 0 : if (writer) {
2798 0 : writer->handle_timeout(tv, arg);
2799 : } else {
2800 0 : this->reactor()->cancel_timer(this);
2801 : }
2802 0 : return 0;
2803 0 : }
2804 :
2805 0 : void DataWriterImpl::transport_discovery_change()
2806 : {
2807 0 : populate_connection_info();
2808 0 : const TransportLocatorSeq& trans_conf_info = connection_info();
2809 :
2810 0 : ACE_Guard<ACE_Recursive_Thread_Mutex> guard(lock_);
2811 0 : const GUID_t dp_id_copy = dp_id_;
2812 0 : const GUID_t publication_id_copy = publication_id_;
2813 0 : const int domain_id = domain_id_;
2814 0 : guard.release();
2815 :
2816 0 : Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id);
2817 0 : disco->update_publication_locators(domain_id,
2818 : dp_id_copy,
2819 : publication_id_copy,
2820 : trans_conf_info);
2821 0 : }
2822 :
2823 0 : DDS::ReturnCode_t DataWriterImpl::setup_serialization()
2824 : {
2825 0 : if (qos_.representation.value.length() > 0 &&
2826 0 : qos_.representation.value[0] != UNALIGNED_CDR_DATA_REPRESENTATION) {
2827 : // If the QoS explicitly sets XCDR, XCDR2, or XML, force encapsulation
2828 0 : cdr_encapsulation(true);
2829 : }
2830 :
2831 0 : if (cdr_encapsulation()) {
2832 : Encoding::Kind encoding_kind;
2833 : // There should only be one data representation in a DataWriter, so
2834 : // simply use qos_.representation.value[0].
2835 0 : if (repr_to_encoding_kind(qos_.representation.value[0], encoding_kind)) {
2836 0 : encoding_mode_ = EncodingMode(type_support_, encoding_kind, swap_bytes());
2837 0 : if (encoding_kind == Encoding::KIND_XCDR1 &&
2838 0 : type_support_->max_extensibility() == MUTABLE) {
2839 0 : if (log_level >= LogLevel::Notice) {
2840 0 : ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DataWriterImpl::setup_serialization: "
2841 : "Encountered unsupported combination of XCDR1 encoding and mutable extensibility "
2842 : "for writer of type %C\n",
2843 : type_support_->name()));
2844 : }
2845 0 : return DDS::RETCODE_ERROR;
2846 0 : } else if (encoding_kind == Encoding::KIND_UNALIGNED_CDR) {
2847 0 : if (log_level >= LogLevel::Notice) {
2848 0 : ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DataWriterImpl::setup_serialization: "
2849 : "Unaligned CDR is not supported by transport types that require encapsulation\n"));
2850 : }
2851 0 : return DDS::RETCODE_ERROR;
2852 : }
2853 0 : } else if (log_level >= LogLevel::Warning) {
2854 0 : ACE_ERROR((LM_WARNING, "(%P|%t) WARNING: DataWriterImpl::setup_serialization: "
2855 : "Encountered unsupported or unknown data representation: %C ",
2856 : "for writer of type %C\n",
2857 : repr_to_string(qos_.representation.value[0]).c_str(),
2858 : type_support_->name()));
2859 : }
2860 : } else {
2861 : // Pick unaligned CDR as it is the implicit representation for non-encapsulated
2862 0 : encoding_mode_ = EncodingMode(type_support_, Encoding::KIND_UNALIGNED_CDR, swap_bytes());
2863 : }
2864 0 : if (!encoding_mode_.valid()) {
2865 0 : if (log_level >= LogLevel::Notice) {
2866 0 : ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DataWriterImpl::setup_serialization: "
2867 : "Could not find a valid data representation\n"));
2868 : }
2869 0 : return DDS::RETCODE_ERROR;
2870 : }
2871 :
2872 0 : if (DCPS_debug_level >= 2) {
2873 0 : ACE_DEBUG((LM_DEBUG, "(%P|%t) WriterImpl::setup_serialization: "
2874 : "Setup successfully with %C data representation.\n",
2875 : Encoding::kind_to_string(encoding_mode_.encoding().kind()).c_str()));
2876 : }
2877 :
2878 : // Set up allocator with reserved space for data if it is bounded
2879 0 : const SerializedSizeBound buffer_size_bound = encoding_mode_.buffer_size_bound();
2880 0 : if (buffer_size_bound) {
2881 0 : const size_t chunk_size = buffer_size_bound.get();
2882 0 : data_allocator_.reset(new DataAllocator(n_chunks_, chunk_size));
2883 0 : if (DCPS_debug_level >= 2) {
2884 0 : ACE_DEBUG((LM_DEBUG, "(%P|%t) DataWriterImpl::setup_serialization: "
2885 : "using data allocator at %x with %B %B byte chunks\n",
2886 : data_allocator_.get(),
2887 : n_chunks_,
2888 : chunk_size));
2889 : }
2890 0 : } else if (DCPS_debug_level >= 2) {
2891 0 : ACE_DEBUG((LM_DEBUG, "(%P|%t) DataWriterImpl::setup_serialization: "
2892 : "sample size is unbounded, not using data allocator, "
2893 : "always allocating from heap\n"));
2894 : }
2895 0 : return DDS::RETCODE_OK;
2896 : }
2897 :
2898 0 : DDS::ReturnCode_t DataWriterImpl::get_key_value(Sample_rch& sample, DDS::InstanceHandle_t handle)
2899 : {
2900 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, get_lock(), DDS::RETCODE_ERROR);
2901 0 : const InstanceHandlesToValues::iterator it = instance_handles_to_values_.find(handle);
2902 0 : if (it == instance_handles_to_values_.end()) {
2903 0 : return DDS::RETCODE_BAD_PARAMETER;
2904 : }
2905 0 : sample = it->second->copy(Sample::Mutable);
2906 0 : return DDS::RETCODE_OK;
2907 0 : }
2908 :
2909 0 : DDS::InstanceHandle_t DataWriterImpl::lookup_instance(const Sample& sample)
2910 : {
2911 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, get_lock(), DDS::RETCODE_ERROR);
2912 0 : const InstanceValuesToHandles::iterator it = find_instance(sample);
2913 0 : return it == instance_values_to_handles_.end() ? DDS::HANDLE_NIL : it->second;
2914 0 : }
2915 :
2916 0 : DDS::InstanceHandle_t DataWriterImpl::register_instance_w_timestamp(
2917 : const Sample& sample, const DDS::Time_t& timestamp)
2918 : {
2919 0 : DDS::InstanceHandle_t registered_handle = DDS::HANDLE_NIL;
2920 0 : const DDS::ReturnCode_t ret = get_or_create_instance_handle(registered_handle, sample, timestamp);
2921 0 : if (ret != DDS::RETCODE_OK && log_level >= LogLevel::Notice) {
2922 0 : ACE_ERROR((LM_NOTICE, ACE_TEXT("(%P|%t) NOTICE: DataWriterImpl::register_instance_w_timestamp: ")
2923 : ACE_TEXT("register failed: %C\n"),
2924 : retcode_to_string(ret)));
2925 : }
2926 0 : return registered_handle;
2927 : }
2928 :
2929 0 : DDS::ReturnCode_t DataWriterImpl::unregister_instance_w_timestamp(
2930 : const Sample& sample,
2931 : DDS::InstanceHandle_t instance_handle,
2932 : const DDS::Time_t& timestamp)
2933 : {
2934 0 : const DDS::ReturnCode_t rc = instance_must_exist(
2935 : "unregister_instance_w_timestamp", sample, instance_handle, /* remove = */ true);
2936 0 : if (rc != DDS::RETCODE_OK) {
2937 0 : return rc;
2938 : }
2939 0 : return unregister_instance_i(instance_handle, timestamp);
2940 : }
2941 :
2942 0 : DDS::ReturnCode_t DataWriterImpl::dispose_w_timestamp(
2943 : const Sample& sample,
2944 : DDS::InstanceHandle_t instance_handle,
2945 : const DDS::Time_t& source_timestamp)
2946 : {
2947 : #if defined(OPENDDS_SECURITY) && OPENDDS_HAS_DYNAMIC_DATA_ADAPTER
2948 0 : DDS::DynamicData_var dynamic_data = sample.get_dynamic_data(dynamic_type_);
2949 0 : DDS::Security::SecurityException ex;
2950 0 : if (dynamic_data && security_config_ &&
2951 0 : participant_permissions_handle_ != DDS::HANDLE_NIL &&
2952 0 : !security_config_->get_access_control()->check_local_datawriter_dispose_instance(participant_permissions_handle_, this, dynamic_data, ex)) {
2953 0 : if (log_level >= LogLevel::Notice) {
2954 0 : ACE_ERROR((LM_NOTICE,
2955 : "(%P|%t) NOTICE: DataWriterImpl::dispose_w_timestamp: unable to dispose instance SecurityException[%d.%d]: %C\n",
2956 : ex.code, ex.minor_code, ex.message.in()));
2957 : }
2958 0 : return DDS::Security::RETCODE_NOT_ALLOWED_BY_SECURITY;
2959 : }
2960 : #endif
2961 :
2962 0 : const DDS::ReturnCode_t rc = instance_must_exist(
2963 : "dispose_w_timestamp", sample, instance_handle);
2964 0 : if (rc != DDS::RETCODE_OK) {
2965 0 : return rc;
2966 : }
2967 0 : return dispose(instance_handle, source_timestamp);
2968 0 : }
2969 :
2970 0 : ACE_Message_Block* DataWriterImpl::serialize_sample(const Sample& sample)
2971 : {
2972 0 : const bool encapsulated = cdr_encapsulation();
2973 0 : const Encoding& encoding = encoding_mode_.encoding();
2974 0 : Message_Block_Ptr mb;
2975 : ACE_Message_Block* tmp_mb;
2976 :
2977 : // Don't use the cached allocator for the registered sample message
2978 : // block.
2979 0 : if (sample.key_only() && !skip_serialize_) {
2980 0 : ACE_NEW_RETURN(tmp_mb,
2981 : ACE_Message_Block(
2982 : encoding_mode_.buffer_size(sample),
2983 : ACE_Message_Block::MB_DATA,
2984 : 0, // cont
2985 : 0, // data
2986 : 0, // alloc_strategy
2987 : get_db_lock()),
2988 : 0);
2989 : } else {
2990 0 : ACE_NEW_MALLOC_RETURN(tmp_mb,
2991 : static_cast<ACE_Message_Block*>(
2992 : mb_allocator_->malloc(sizeof(ACE_Message_Block))),
2993 : ACE_Message_Block(
2994 : encoding_mode_.buffer_size(sample),
2995 : ACE_Message_Block::MB_DATA,
2996 : 0, // cont
2997 : 0, // data
2998 : data_allocator_.get(), // allocator_strategy
2999 : get_db_lock(), // data block locking_strategy
3000 : ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
3001 : ACE_Time_Value::zero,
3002 : ACE_Time_Value::max_time,
3003 : db_allocator_.get(),
3004 : mb_allocator_.get()),
3005 : 0);
3006 : }
3007 0 : mb.reset(tmp_mb);
3008 :
3009 0 : if (skip_serialize_) {
3010 0 : if (!sample.to_message_block(*mb)) {
3011 0 : if (log_level >= LogLevel::Error) {
3012 0 : ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: DataWriterImpl::serialize_sample: "
3013 : "to_message_block failed\n"));
3014 : }
3015 0 : return 0;
3016 : }
3017 : } else {
3018 0 : Serializer serializer(mb.get(), encoding);
3019 0 : if (encapsulated) {
3020 0 : EncapsulationHeader encap;
3021 0 : if (!encap.from_encoding(encoding, type_support_->base_extensibility())) {
3022 : // from_encoding logged the error
3023 0 : return 0;
3024 : }
3025 0 : if (!(serializer << encap)) {
3026 0 : if (log_level >= LogLevel::Error) {
3027 0 : ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: DataWriterImpl::serialize_sample: "
3028 : "failed to serialize data encapsulation header\n"));
3029 : }
3030 0 : return 0;
3031 : }
3032 : }
3033 0 : if (!sample.serialize(serializer)) {
3034 0 : if (log_level >= LogLevel::Error) {
3035 0 : ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: DataWriterImpl::serialize_sample: "
3036 : "failed to serialize sample data\n"));
3037 : }
3038 0 : return 0;
3039 : }
3040 0 : if (encapsulated && !EncapsulationHeader::set_encapsulation_options(mb)) {
3041 0 : if (log_level >= LogLevel::Error) {
3042 0 : ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: DataWriterImpl::serialize_sample: "
3043 : "set_encapsulation_options failed\n"));
3044 : }
3045 0 : return 0;
3046 : }
3047 0 : }
3048 :
3049 0 : return mb.release();
3050 0 : }
3051 :
3052 0 : bool DataWriterImpl::insert_instance(DDS::InstanceHandle_t handle, Sample_rch& sample)
3053 : {
3054 0 : OPENDDS_ASSERT(sample->key_only());
3055 0 : if (!instance_handles_to_values_.insert(
3056 0 : InstanceHandlesToValues::value_type(handle, sample)).second) {
3057 0 : return false;
3058 : }
3059 0 : if (!instance_values_to_handles_.insert(
3060 0 : InstanceValuesToHandles::value_type(sample, handle)).second) {
3061 0 : instance_handles_to_values_.erase(handle);
3062 0 : return false;
3063 : }
3064 0 : return true;
3065 : }
3066 :
3067 : DataWriterImpl::InstanceValuesToHandles::iterator
3068 0 : DataWriterImpl::find_instance(const Sample& sample)
3069 : {
3070 0 : Sample_rch dummy_rch(const_cast<Sample*>(&sample), keep_count());
3071 0 : InstanceValuesToHandles::iterator pos = instance_values_to_handles_.find(dummy_rch);
3072 0 : dummy_rch._retn();
3073 0 : return pos;
3074 0 : }
3075 :
3076 0 : DDS::ReturnCode_t DataWriterImpl::get_or_create_instance_handle(
3077 : DDS::InstanceHandle_t& handle,
3078 : const Sample& sample,
3079 : const DDS::Time_t& source_timestamp)
3080 : {
3081 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, get_lock(), DDS::RETCODE_ERROR);
3082 :
3083 0 : handle = lookup_instance(sample);
3084 0 : if (handle == DDS::HANDLE_NIL || !get_handle_instance(handle)) {
3085 0 : Sample_rch copy = sample.copy(Sample::ReadOnly, Sample::KeyOnly);
3086 : #if defined(OPENDDS_SECURITY) && OPENDDS_HAS_DYNAMIC_DATA_ADAPTER
3087 0 : DDS::DynamicData_var dynamic_data = copy->get_dynamic_data(dynamic_type_);
3088 0 : DDS::Security::SecurityException ex;
3089 0 : if (dynamic_data && security_config_ &&
3090 0 : participant_permissions_handle_ != DDS::HANDLE_NIL &&
3091 0 : !security_config_->get_access_control()->check_local_datawriter_register_instance(participant_permissions_handle_, this, dynamic_data, ex)) {
3092 0 : if (log_level >= LogLevel::Notice) {
3093 0 : ACE_ERROR((LM_NOTICE,
3094 : "(%P|%t) NOTICE: DataWriterImpl::get_or_create_instance_handle: unable to register instance SecurityException[%d.%d]: %C\n",
3095 : ex.code, ex.minor_code, ex.message.in()));
3096 : }
3097 0 : return DDS::Security::RETCODE_NOT_ALLOWED_BY_SECURITY;
3098 : }
3099 : #endif
3100 :
3101 : // don't use fast allocator for registration.
3102 0 : const TypeSupportImpl* const ts = get_type_support();
3103 0 : Message_Block_Ptr serialized(serialize_sample(*copy));
3104 0 : if (!serialized) {
3105 0 : if (log_level >= LogLevel::Notice) {
3106 0 : ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: %CDataWriterImpl::get_or_create_instance_handle: "
3107 : "failed to serialize sample\n", ts->name()));
3108 : }
3109 0 : return DDS::RETCODE_ERROR;
3110 : }
3111 :
3112 : // tell DataWriterLocal and Publisher about the instance.
3113 0 : const DDS::ReturnCode_t ret = register_instance_i(handle, move(serialized), source_timestamp);
3114 : // note: the WriteDataContainer/PublicationInstance maintains ownership
3115 : // of the marshalled sample.
3116 0 : if (ret != DDS::RETCODE_OK) {
3117 0 : handle = DDS::HANDLE_NIL;
3118 0 : return ret;
3119 : }
3120 :
3121 0 : if (!insert_instance(handle, copy)) {
3122 0 : handle = DDS::HANDLE_NIL;
3123 0 : if (log_level >= LogLevel::Notice) {
3124 0 : ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: %CDataWriterImpl::get_or_create_instance_handle: "
3125 : "insert instance failed\n", ts->name()));
3126 : }
3127 0 : return DDS::RETCODE_ERROR;
3128 : }
3129 :
3130 0 : send_all_to_flush_control(guard);
3131 0 : }
3132 :
3133 0 : return DDS::RETCODE_OK;
3134 0 : }
3135 :
3136 0 : DDS::ReturnCode_t DataWriterImpl::instance_must_exist(
3137 : const char* const method_name,
3138 : const Sample& sample,
3139 : DDS::InstanceHandle_t& instance_handle,
3140 : bool remove)
3141 : {
3142 0 : OPENDDS_ASSERT(sample.key_only());
3143 :
3144 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, get_lock(), DDS::RETCODE_ERROR);
3145 :
3146 0 : const InstanceValuesToHandles::iterator pos = find_instance(sample);
3147 0 : if (pos == instance_values_to_handles_.end()) {
3148 0 : if (log_level >= LogLevel::Notice) {
3149 0 : ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DataWriterImpl::%C: "
3150 : "The instance sample is not registered\n",
3151 : method_name));
3152 : }
3153 0 : return DDS::RETCODE_ERROR;
3154 : }
3155 :
3156 0 : if (instance_handle != DDS::HANDLE_NIL && instance_handle != pos->second) {
3157 0 : return DDS::RETCODE_PRECONDITION_NOT_MET;
3158 : }
3159 :
3160 0 : instance_handle = pos->second;
3161 :
3162 0 : if (remove) {
3163 0 : instance_values_to_handles_.erase(pos);
3164 0 : instance_handles_to_values_.erase(instance_handle);
3165 : }
3166 :
3167 0 : return DDS::RETCODE_OK;
3168 0 : }
3169 :
3170 0 : DDS::ReturnCode_t DataWriterImpl::write_w_timestamp(
3171 : const Sample& sample,
3172 : DDS::InstanceHandle_t handle,
3173 : const DDS::Time_t& source_timestamp)
3174 : {
3175 : // This operation assumes the provided handle is valid. The handle provided
3176 : // will not be verified.
3177 :
3178 0 : if (handle == DDS::HANDLE_NIL) {
3179 0 : DDS::InstanceHandle_t registered_handle = DDS::HANDLE_NIL;
3180 : const DDS::ReturnCode_t ret =
3181 0 : get_or_create_instance_handle(registered_handle, sample, source_timestamp);
3182 0 : if (ret != DDS::RETCODE_OK) {
3183 0 : if (log_level >= LogLevel::Notice) {
3184 0 : ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: %CDataWriterImpl::write_w_timestamp: "
3185 : "register failed: %C\n",
3186 : get_type_support()->name(),
3187 : retcode_to_string(ret)));
3188 : }
3189 0 : return ret;
3190 : }
3191 :
3192 0 : handle = registered_handle;
3193 : }
3194 :
3195 : // list of reader GUID_ts that should not get data
3196 0 : GUIDSeq_var filter_out;
3197 : #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
3198 0 : if (TheServiceParticipant->publisher_content_filter()) {
3199 0 : ACE_GUARD_RETURN(ACE_Thread_Mutex, reader_info_guard, reader_info_lock_, DDS::RETCODE_ERROR);
3200 0 : for (RepoIdToReaderInfoMap::iterator iter = reader_info_.begin(),
3201 0 : end = reader_info_.end(); iter != end; ++iter) {
3202 0 : const ReaderInfo& ri = iter->second;
3203 0 : if (!ri.eval_.is_nil()) {
3204 0 : if (!filter_out.ptr()) {
3205 0 : filter_out = new OpenDDS::DCPS::GUIDSeq;
3206 : }
3207 0 : if (!sample.eval(*ri.eval_, ri.expression_params_)) {
3208 0 : push_back(filter_out.inout(), iter->first);
3209 : }
3210 : }
3211 : }
3212 0 : }
3213 : #endif
3214 :
3215 0 : return write_sample(sample, handle, source_timestamp, filter_out._retn());
3216 0 : }
3217 :
3218 0 : DDS::ReturnCode_t DataWriterImpl::write_sample(
3219 : const Sample& sample,
3220 : DDS::InstanceHandle_t handle,
3221 : const DDS::Time_t& source_timestamp,
3222 : GUIDSeq* filter_out)
3223 : {
3224 0 : Message_Block_Ptr serialized(serialize_sample(sample));
3225 0 : if (!serialized) {
3226 0 : if (log_level >= LogLevel::Notice) {
3227 0 : ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DataWriterImpl::write_sample: "
3228 : "failed to serialize sample\n"));
3229 : }
3230 0 : return DDS::RETCODE_ERROR;
3231 : }
3232 :
3233 0 : return write(move(serialized), handle, source_timestamp, filter_out, sample.native_data());
3234 0 : }
3235 :
3236 : } // namespace DCPS
3237 : } // namespace OpenDDS
3238 :
3239 : OPENDDS_END_VERSIONED_NAMESPACE_DECL
|