Line data Source code
1 : /*
2 : * Distributed under the OpenDDS License.
3 : * See: http://www.opendds.org/license.html
4 : */
5 :
6 : #include <DCPS/DdsDcps_pch.h> // Only the _pch include should start with DCPS/
7 :
8 : #include "PublisherImpl.h"
9 :
10 : #include "FeatureDisabledQosCheck.h"
11 : #include "DataWriterImpl.h"
12 : #include "DomainParticipantImpl.h"
13 : #include "DataWriterImpl.h"
14 : #include "Service_Participant.h"
15 : #include "Qos_Helper.h"
16 : #include "GuidConverter.h"
17 : #include "Marked_Default_Qos.h"
18 : #include "TopicImpl.h"
19 : #include "MonitorFactory.h"
20 : #include "transport/framework/ReceivedDataSample.h"
21 : #include "transport/framework/DataLinkSet.h"
22 : #include "transport/framework/TransportImpl.h"
23 :
24 : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
25 :
26 : namespace OpenDDS {
27 : namespace DCPS {
28 :
29 0 : PublisherImpl::PublisherImpl(DDS::InstanceHandle_t handle,
30 : GUID_t id,
31 : const DDS::PublisherQos& qos,
32 : DDS::PublisherListener_ptr a_listener,
33 : const DDS::StatusMask& mask,
34 0 : DomainParticipantImpl* participant)
35 0 : : handle_(handle),
36 0 : qos_(qos),
37 0 : default_datawriter_qos_(TheServiceParticipant->initial_DataWriterQos()),
38 0 : listener_mask_(mask),
39 0 : listener_(DDS::PublisherListener::_duplicate(a_listener)),
40 : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
41 0 : change_depth_(0),
42 : #endif
43 0 : domain_id_(participant->get_domain_id()),
44 0 : participant_(*participant),
45 0 : suspend_depth_count_(0),
46 0 : sequence_number_(),
47 0 : reverse_pi_lock_(pi_lock_),
48 0 : publisher_id_(id)
49 : {
50 0 : monitor_.reset(TheServiceParticipant->monitor_factory_->create_publisher_monitor(this));
51 0 : }
52 :
53 0 : PublisherImpl::~PublisherImpl()
54 : {
55 0 : const RcHandle<DomainParticipantImpl> participant = participant_.lock();
56 0 : if (participant) {
57 0 : participant->return_handle(handle_);
58 : }
59 :
60 : // The datawriters should be deleted already before calling delete
61 : // publisher.
62 0 : String leftover_entities;
63 0 : if (!is_clean(&leftover_entities)) {
64 0 : if (log_level >= LogLevel::Warning) {
65 0 : ACE_ERROR((LM_WARNING, "(%P|%t) WARNING: PublisherImpl::~PublisherImpl: "
66 : "%C still exist\n", leftover_entities.c_str()));
67 : }
68 : }
69 0 : }
70 :
71 : DDS::InstanceHandle_t
72 0 : PublisherImpl::get_instance_handle()
73 : {
74 0 : return handle_;
75 : }
76 :
77 : bool
78 0 : PublisherImpl::contains_writer(DDS::InstanceHandle_t a_handle)
79 : {
80 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
81 : guard,
82 : this->pi_lock_,
83 : DDS::RETCODE_ERROR);
84 :
85 0 : for (DataWriterMap::iterator it(datawriter_map_.begin());
86 0 : it != datawriter_map_.end(); ++it) {
87 0 : if (a_handle == it->second->get_instance_handle()) {
88 0 : return true;
89 : }
90 : }
91 :
92 0 : return false;
93 0 : }
94 :
95 : DDS::DataWriter_ptr
96 0 : PublisherImpl::create_datawriter(
97 : DDS::Topic_ptr a_topic,
98 : const DDS::DataWriterQos & qos,
99 : DDS::DataWriterListener_ptr a_listener,
100 : DDS::StatusMask mask)
101 : {
102 0 : DDS::DataWriterQos dw_qos;
103 :
104 0 : if (!validate_datawriter_qos(qos, default_datawriter_qos_, a_topic, dw_qos)) {
105 0 : return DDS::DataWriter::_nil();
106 : }
107 :
108 0 : TopicImpl* topic_servant = dynamic_cast<TopicImpl*>(a_topic);
109 :
110 0 : if (!topic_servant) {
111 0 : if (DCPS_debug_level > 0) {
112 0 : CORBA::String_var name = a_topic->get_name();
113 0 : ACE_ERROR((LM_ERROR,
114 : ACE_TEXT("(%P|%t) ERROR: ")
115 : ACE_TEXT("PublisherImpl::create_datawriter, ")
116 : ACE_TEXT("topic_servant(topic_name=%C) is nil.\n"),
117 : name.in()));
118 0 : }
119 0 : return 0;
120 : }
121 :
122 : OpenDDS::DCPS::TypeSupport_ptr typesupport =
123 0 : topic_servant->get_type_support();
124 :
125 0 : if (typesupport == 0) {
126 0 : if (DCPS_debug_level > 0) {
127 0 : CORBA::String_var name = topic_servant->get_name();
128 0 : ACE_ERROR((LM_ERROR,
129 : ACE_TEXT("(%P|%t) ERROR: ")
130 : ACE_TEXT("PublisherImpl::create_datawriter, ")
131 : ACE_TEXT("typesupport(topic_name=%C) is nil.\n"),
132 : name.in()));
133 0 : }
134 0 : return DDS::DataWriter::_nil();
135 : }
136 :
137 0 : DDS::DataWriter_var dw_obj = typesupport->create_datawriter();
138 :
139 : DataWriterImpl* dw_servant =
140 0 : dynamic_cast <DataWriterImpl*>(dw_obj.in());
141 :
142 0 : if (dw_servant == 0) {
143 0 : if (DCPS_debug_level > 0) {
144 0 : ACE_ERROR((LM_ERROR,
145 : ACE_TEXT("(%P|%t) ERROR: ")
146 : ACE_TEXT("PublisherImpl::create_datawriter, ")
147 : ACE_TEXT("servant is nil.\n")));
148 : }
149 0 : return DDS::DataWriter::_nil();
150 : }
151 :
152 0 : dw_servant->init(
153 : topic_servant,
154 : dw_qos,
155 : a_listener,
156 : mask,
157 0 : participant_,
158 : this);
159 :
160 0 : if (enabled_ && qos_.entity_factory.autoenable_created_entities) {
161 0 : const DDS::ReturnCode_t ret = dw_servant->enable();
162 :
163 0 : if (ret != DDS::RETCODE_OK) {
164 0 : if (DCPS_debug_level > 0) {
165 0 : ACE_ERROR((LM_WARNING,
166 : ACE_TEXT("(%P|%t) WARNING: ")
167 : ACE_TEXT("PublisherImpl::create_datawriter, ")
168 : ACE_TEXT("enable failed.\n")));
169 : }
170 0 : return DDS::DataWriter::_nil();
171 : }
172 : } else {
173 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, pi_lock_, 0);
174 0 : writers_not_enabled_.insert(rchandle_from(dw_servant));
175 0 : }
176 :
177 0 : return DDS::DataWriter::_duplicate(dw_obj.in());
178 0 : }
179 :
180 : DDS::ReturnCode_t
181 0 : PublisherImpl::delete_datawriter(DDS::DataWriter_ptr a_datawriter)
182 : {
183 0 : DataWriterImpl* dw_servant = dynamic_cast<DataWriterImpl*>(a_datawriter);
184 0 : if (!dw_servant) {
185 0 : if (DCPS_debug_level > 0) {
186 0 : ACE_ERROR((LM_ERROR,
187 : "(%P|%t) PublisherImpl::delete_datawriter - dynamic cast to DataWriterImpl failed\n"));
188 : }
189 0 : return DDS::RETCODE_ERROR;
190 : }
191 :
192 : {
193 0 : DDS::Publisher_var dw_publisher(dw_servant->get_publisher());
194 :
195 0 : if (dw_publisher.in() != this) {
196 0 : if (DCPS_debug_level > 0) {
197 0 : ACE_ERROR((LM_ERROR,
198 : ACE_TEXT("(%P|%t) PublisherImpl::delete_datawriter: ")
199 : ACE_TEXT("the data writer %C doesn't ")
200 : ACE_TEXT("belong to this subscriber\n"),
201 : LogGuid(dw_servant->get_guid()).c_str()));
202 : }
203 0 : return DDS::RETCODE_PRECONDITION_NOT_MET;
204 : }
205 0 : }
206 :
207 0 : if (!dw_servant->get_deleted()) {
208 0 : dw_servant->prepare_to_delete();
209 0 : dw_servant->set_wait_pending_deadline(TheServiceParticipant->new_pending_timeout_deadline());
210 : }
211 :
212 : // Wait for any data and control messages to be transported during
213 : // unregistering of instances.
214 0 : dw_servant->wait_pending();
215 :
216 0 : GUID_t publication_id = GUID_UNKNOWN;
217 : {
218 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
219 : guard,
220 : this->pi_lock_,
221 : DDS::RETCODE_ERROR);
222 :
223 0 : publication_id = dw_servant->get_guid();
224 :
225 0 : PublicationMap::iterator it = publication_map_.find(publication_id);
226 :
227 0 : if (it == publication_map_.end()) {
228 0 : if (DCPS_debug_level > 0) {
229 0 : ACE_ERROR((LM_ERROR,
230 : ACE_TEXT("(%P|%t) ERROR: ")
231 : ACE_TEXT("PublisherImpl::delete_datawriter, ")
232 : ACE_TEXT("datawriter %C not found.\n"),
233 : LogGuid(publication_id).c_str()));
234 : }
235 0 : return DDS::RETCODE_ERROR;
236 : }
237 :
238 : // We can not erase the datawriter from datawriter map by the topic name
239 : // because the map might have multiple datawriters with the same topic
240 : // name.
241 : // Find the iterator to the datawriter in the datawriter map and erase
242 : // by the iterator.
243 0 : DataWriterMap::iterator writ;
244 0 : DataWriterMap::iterator the_writ = datawriter_map_.end();
245 :
246 0 : for (writ = datawriter_map_.begin();
247 0 : writ != datawriter_map_.end();
248 0 : ++writ) {
249 0 : if (writ->second == it->second) {
250 0 : the_writ = writ;
251 0 : break;
252 : }
253 : }
254 :
255 0 : if (the_writ != datawriter_map_.end()) {
256 0 : datawriter_map_.erase(the_writ);
257 : }
258 :
259 0 : publication_map_.erase(it);
260 :
261 : // not just unregister but remove any pending writes/sends.
262 0 : dw_servant->unregister_all();
263 :
264 : // Release pi_lock_ before making call to transport layer to avoid
265 : // some deadlock situations that threads acquire locks(PublisherImpl
266 : // pi_lock_, TransportClient reservation_lock and TransportImpl
267 : // lock_) in reverse order.
268 0 : ACE_GUARD_RETURN(reverse_lock_type, reverse_monitor, this->reverse_pi_lock_,
269 : DDS::RETCODE_ERROR);
270 : // Wait for pending samples to drain prior to removing associations
271 : // and unregistering the publication.
272 0 : dw_servant->wait_pending();
273 :
274 : // Call remove association before unregistering the datawriter
275 : // with the transport, otherwise some callbacks resulted from
276 : // remove_association may lost.
277 0 : dw_servant->remove_all_associations();
278 0 : dw_servant->cleanup();
279 0 : }
280 :
281 0 : if (this->monitor_) {
282 0 : this->monitor_->report();
283 : }
284 :
285 0 : RcHandle<DomainParticipantImpl> participant = this->participant_.lock();
286 :
287 0 : Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
288 0 : if (!disco->remove_publication(
289 : this->domain_id_,
290 0 : participant->get_id(),
291 : publication_id)) {
292 0 : if (DCPS_debug_level > 0) {
293 0 : ACE_ERROR((LM_ERROR,
294 : ACE_TEXT("(%P|%t) ERROR: ")
295 : ACE_TEXT("PublisherImpl::delete_datawriter, ")
296 : ACE_TEXT("publication not removed from discovery.\n")));
297 : }
298 0 : return DDS::RETCODE_ERROR;
299 : }
300 :
301 0 : participant->remove_adjust_liveliness_timers();
302 :
303 0 : return DDS::RETCODE_OK;
304 0 : }
305 :
306 : DDS::DataWriter_ptr
307 0 : PublisherImpl::lookup_datawriter(const char* topic_name)
308 : {
309 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
310 : guard,
311 : this->pi_lock_,
312 : DDS::DataWriter::_nil());
313 :
314 : // If multiple entries whose key is "topic_name" then which one is
315 : // returned ? Spec does not limit which one should give.
316 0 : DataWriterMap::iterator it = datawriter_map_.find(topic_name);
317 :
318 0 : if (it == datawriter_map_.end()) {
319 0 : if (DCPS_debug_level >= 2) {
320 0 : ACE_DEBUG((LM_DEBUG,
321 : ACE_TEXT("(%P|%t) ")
322 : ACE_TEXT("PublisherImpl::lookup_datawriter, ")
323 : ACE_TEXT("The datawriter(topic_name=%C) is not found\n"),
324 : topic_name));
325 : }
326 :
327 0 : return DDS::DataWriter::_nil();
328 :
329 : } else {
330 0 : return DDS::DataWriter::_duplicate(it->second.in());
331 : }
332 0 : }
333 :
334 0 : bool PublisherImpl::prepare_to_delete_datawriters()
335 : {
336 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, pi_lock_, false);
337 0 : bool result = true;
338 0 : const DataWriterMap::iterator end = datawriter_map_.end();
339 0 : for (DataWriterMap::iterator i = datawriter_map_.begin(); i != end; ++i) {
340 0 : DataWriterImpl* const writer = dynamic_cast<DataWriterImpl*>(i->second.in());
341 0 : if (writer) {
342 0 : if (!writer->get_deleted()) {
343 0 : writer->prepare_to_delete();
344 : }
345 : } else {
346 0 : result = false;
347 : }
348 : }
349 :
350 0 : return result;
351 0 : }
352 :
353 0 : bool PublisherImpl::set_wait_pending_deadline(const MonotonicTimePoint& deadline)
354 : {
355 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, pi_lock_, false);
356 0 : bool result = true;
357 0 : const DataWriterMap::iterator end = datawriter_map_.end();
358 0 : for (DataWriterMap::iterator i = datawriter_map_.begin(); i != end; ++i) {
359 0 : DataWriterImpl* const writer = dynamic_cast<DataWriterImpl*>(i->second.in());
360 0 : if (writer) {
361 0 : writer->set_wait_pending_deadline(deadline);
362 : } else {
363 0 : result = false;
364 : }
365 : }
366 0 : return result;
367 0 : }
368 :
369 0 : DDS::ReturnCode_t PublisherImpl::delete_contained_entities()
370 : {
371 : // If the call isn't part of another delete, prepare the datawriters to be
372 : // deleted and set the pending deadline on all the writers.
373 0 : if (!get_deleted()) {
374 : // mark that the entity is being deleted
375 0 : set_deleted(true);
376 :
377 0 : if (!prepare_to_delete_datawriters()) {
378 0 : return DDS::RETCODE_ERROR;
379 : }
380 0 : if (!set_wait_pending_deadline(TheServiceParticipant->new_pending_timeout_deadline())) {
381 0 : return DDS::RETCODE_ERROR;
382 : }
383 : }
384 :
385 : while (true) {
386 0 : GUID_t pub_id = GUID_UNKNOWN;
387 0 : DataWriterImpl_rch a_datawriter;
388 :
389 : {
390 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
391 : guard,
392 : this->pi_lock_,
393 : DDS::RETCODE_ERROR);
394 :
395 0 : if (datawriter_map_.empty()) {
396 0 : break;
397 : } else {
398 0 : a_datawriter = datawriter_map_.begin()->second;
399 0 : pub_id = a_datawriter->get_guid();
400 : }
401 0 : }
402 :
403 0 : const DDS::ReturnCode_t ret = delete_datawriter(a_datawriter.in());
404 :
405 0 : if (ret != DDS::RETCODE_OK) {
406 0 : if (DCPS_debug_level > 0) {
407 0 : ACE_ERROR((LM_ERROR,
408 : ACE_TEXT("(%P|%t) ERROR: ")
409 : ACE_TEXT("PublisherImpl::")
410 : ACE_TEXT("delete_contained_entities: ")
411 : ACE_TEXT("failed to delete ")
412 : ACE_TEXT("datawriter %C.\n"),
413 : LogGuid(pub_id).c_str()));
414 : }
415 0 : return ret;
416 : }
417 0 : }
418 :
419 : // the publisher can now start creating new publications
420 0 : set_deleted(false);
421 :
422 0 : return DDS::RETCODE_OK;
423 : }
424 :
425 : DDS::ReturnCode_t
426 0 : PublisherImpl::set_qos(const DDS::PublisherQos & qos)
427 : {
428 :
429 : OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
430 :
431 0 : if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
432 0 : if (qos_ == qos)
433 0 : return DDS::RETCODE_OK;
434 :
435 : // for the not changeable qos, it can be changed before enable
436 0 : if (!Qos_Helper::changeable(qos_, qos) && enabled_) {
437 0 : return DDS::RETCODE_IMMUTABLE_POLICY;
438 :
439 : } else {
440 0 : qos_ = qos;
441 :
442 0 : DwIdToQosMap idToQosMap;
443 : {
444 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
445 : guard,
446 : this->pi_lock_,
447 : DDS::RETCODE_ERROR);
448 :
449 0 : for (PublicationMap::iterator iter = publication_map_.begin();
450 0 : iter != publication_map_.end();
451 0 : ++iter) {
452 0 : DDS::DataWriterQos qos = iter->second->qos_;
453 0 : GUID_t id = iter->second->get_guid();
454 : std::pair<DwIdToQosMap::iterator, bool> pair =
455 0 : idToQosMap.insert(DwIdToQosMap::value_type(id, qos));
456 :
457 0 : if (!pair.second) {
458 0 : if (DCPS_debug_level > 0) {
459 0 : ACE_ERROR((LM_ERROR,
460 : ACE_TEXT("(%P|%t) ")
461 : ACE_TEXT("PublisherImpl::set_qos: ")
462 : ACE_TEXT("insert id %C to DwIdToQosMap ")
463 : ACE_TEXT("failed.\n"),
464 : LogGuid(id).c_str()));
465 : }
466 0 : return DDS::RETCODE_ERROR;
467 : }
468 0 : }
469 0 : }
470 :
471 0 : DwIdToQosMap::iterator iter = idToQosMap.begin();
472 :
473 0 : while (iter != idToQosMap.end()) {
474 0 : Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
475 0 : bool status = false;
476 :
477 0 : RcHandle<DomainParticipantImpl> participant = this->participant_.lock();
478 0 : if (participant)
479 0 : status = disco->update_publication_qos(
480 0 : participant->get_domain_id(),
481 0 : participant->get_id(),
482 0 : iter->first,
483 0 : iter->second,
484 0 : this->qos_);
485 :
486 0 : if (!status) {
487 0 : if (DCPS_debug_level > 0) {
488 0 : ACE_ERROR((LM_ERROR,
489 : ACE_TEXT("(%P|%t) PublisherImpl::set_qos, ")
490 : ACE_TEXT("failed.\n")));
491 : }
492 0 : return DDS::RETCODE_ERROR;
493 : }
494 :
495 0 : ++iter;
496 0 : }
497 0 : }
498 :
499 0 : return DDS::RETCODE_OK;
500 :
501 : } else {
502 0 : return DDS::RETCODE_INCONSISTENT_POLICY;
503 : }
504 : }
505 :
506 : DDS::ReturnCode_t
507 0 : PublisherImpl::get_qos(DDS::PublisherQos & qos)
508 : {
509 0 : qos = qos_;
510 0 : return DDS::RETCODE_OK;
511 : }
512 :
513 : DDS::ReturnCode_t
514 0 : PublisherImpl::set_listener(DDS::PublisherListener_ptr a_listener,
515 : DDS::StatusMask mask)
516 : {
517 0 : ACE_Guard<ACE_Thread_Mutex> g(listener_mutex_);
518 0 : listener_mask_ = mask;
519 : //note: OK to duplicate a nil object ref
520 0 : listener_ = DDS::PublisherListener::_duplicate(a_listener);
521 0 : return DDS::RETCODE_OK;
522 0 : }
523 :
524 : DDS::PublisherListener_ptr
525 0 : PublisherImpl::get_listener()
526 : {
527 0 : ACE_Guard<ACE_Thread_Mutex> g(listener_mutex_);
528 0 : return DDS::PublisherListener::_duplicate(listener_.in());
529 0 : }
530 :
531 : DDS::ReturnCode_t
532 0 : PublisherImpl::suspend_publications()
533 : {
534 0 : if (!enabled_) {
535 0 : if (DCPS_debug_level > 0) {
536 0 : ACE_ERROR((LM_ERROR,
537 : ACE_TEXT("(%P|%t) ERROR: ")
538 : ACE_TEXT("PublisherImpl::suspend_publications, ")
539 : ACE_TEXT(" Entity is not enabled.\n")));
540 : }
541 0 : return DDS::RETCODE_NOT_ENABLED;
542 : }
543 :
544 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
545 : suspend_guard,
546 : this->pi_suspended_lock_,
547 : DDS::RETCODE_ERROR);
548 0 : ++suspend_depth_count_;
549 0 : return DDS::RETCODE_OK;
550 0 : }
551 :
552 : bool
553 0 : PublisherImpl::is_suspended() const
554 : {
555 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
556 : suspend_guard,
557 : this->pi_suspended_lock_,
558 : false);
559 0 : return suspend_depth_count_;
560 0 : }
561 :
562 : DDS::ReturnCode_t
563 0 : PublisherImpl::resume_publications()
564 : {
565 0 : if (!enabled_) {
566 0 : if (DCPS_debug_level > 0) {
567 0 : ACE_ERROR((LM_ERROR,
568 : ACE_TEXT("(%P|%t) ERROR: ")
569 : ACE_TEXT("PublisherImpl::resume_publications, ")
570 : ACE_TEXT(" Entity is not enabled.\n")));
571 : }
572 0 : return DDS::RETCODE_NOT_ENABLED;
573 : }
574 :
575 0 : PublicationMap publication_map_copy;
576 : {
577 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
578 : suspend_guard,
579 : this->pi_suspended_lock_,
580 : DDS::RETCODE_ERROR);
581 0 : --suspend_depth_count_;
582 :
583 0 : if (suspend_depth_count_ < 0) {
584 0 : suspend_depth_count_ = 0;
585 0 : return DDS::RETCODE_PRECONDITION_NOT_MET;
586 : }
587 0 : if (suspend_depth_count_ == 0) {
588 0 : suspend_guard.release();
589 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
590 : guard,
591 : this->pi_lock_,
592 : DDS::RETCODE_ERROR);
593 :
594 0 : publication_map_copy = publication_map_;
595 0 : }
596 0 : }
597 :
598 0 : for (PublicationMap::const_iterator it = publication_map_copy.begin();
599 0 : it != publication_map_copy.end(); ++it) {
600 0 : it->second->send_suspended_data();
601 : }
602 :
603 0 : return DDS::RETCODE_OK;
604 0 : }
605 :
606 : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
607 :
608 : DDS::ReturnCode_t
609 0 : PublisherImpl::begin_coherent_changes()
610 : {
611 0 : if (!enabled_) {
612 0 : if (DCPS_debug_level > 0) {
613 0 : ACE_ERROR((LM_ERROR,
614 : ACE_TEXT("(%P|%t) ERROR: PublisherImpl::begin_coherent_changes:")
615 : ACE_TEXT(" Publisher is not enabled!\n")));
616 : }
617 0 : return DDS::RETCODE_NOT_ENABLED;
618 : }
619 :
620 0 : if (!qos_.presentation.coherent_access) {
621 0 : if (DCPS_debug_level > 0) {
622 0 : ACE_ERROR((LM_ERROR,
623 : ACE_TEXT("(%P|%t) ERROR: PublisherImpl::begin_coherent_changes:")
624 : ACE_TEXT(" QoS policy does not support coherent access!\n")));
625 : }
626 0 : return DDS::RETCODE_ERROR;
627 : }
628 :
629 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
630 : guard,
631 : this->pi_lock_,
632 : DDS::RETCODE_ERROR);
633 :
634 0 : ++this->change_depth_;
635 :
636 0 : if (qos_.presentation.access_scope == DDS::INSTANCE_PRESENTATION_QOS) {
637 : // INSTANCE access scope essentially behaves
638 : // as a no-op. (see: 7.1.3.6)
639 0 : return DDS::RETCODE_OK;
640 : }
641 :
642 : // We should only notify publications on the first
643 : // and last change to the current change set:
644 0 : if (this->change_depth_ == 1) {
645 0 : for (PublicationMap::iterator it = this->publication_map_.begin();
646 0 : it != this->publication_map_.end(); ++it) {
647 0 : it->second->begin_coherent_changes();
648 : }
649 : }
650 :
651 0 : return DDS::RETCODE_OK;
652 0 : }
653 :
654 : DDS::ReturnCode_t
655 0 : PublisherImpl::end_coherent_changes()
656 : {
657 0 : if (!enabled_) {
658 0 : if (DCPS_debug_level > 0) {
659 0 : ACE_ERROR((LM_ERROR,
660 : ACE_TEXT("(%P|%t) ERROR: PublisherImpl::end_coherent_changes:")
661 : ACE_TEXT(" Publisher is not enabled!\n")));
662 : }
663 0 : return DDS::RETCODE_NOT_ENABLED;
664 : }
665 :
666 0 : if (!qos_.presentation.coherent_access) {
667 0 : if (DCPS_debug_level > 0) {
668 0 : ACE_ERROR((LM_ERROR,
669 : ACE_TEXT("(%P|%t) ERROR: PublisherImpl::end_coherent_changes:")
670 : ACE_TEXT(" QoS policy does not support coherent access!\n")));
671 : }
672 0 : return DDS::RETCODE_ERROR;
673 : }
674 :
675 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
676 : guard,
677 : this->pi_lock_,
678 : DDS::RETCODE_ERROR);
679 :
680 0 : if (this->change_depth_ == 0) {
681 0 : if (DCPS_debug_level > 0) {
682 0 : ACE_ERROR((LM_ERROR,
683 : ACE_TEXT("(%P|%t) ERROR: PublisherImpl::end_coherent_changes:")
684 : ACE_TEXT(" No matching call to begin_coherent_changes!\n")));
685 : }
686 0 : return DDS::RETCODE_PRECONDITION_NOT_MET;
687 : }
688 :
689 0 : --this->change_depth_;
690 :
691 0 : if (qos_.presentation.access_scope == DDS::INSTANCE_PRESENTATION_QOS) {
692 : // INSTANCE access scope essentially behaves
693 : // as a no-op. (see: 7.1.3.6)
694 0 : return DDS::RETCODE_OK;
695 : }
696 :
697 : // We should only notify publications on the first
698 : // and last change to the current change set:
699 0 : if (this->change_depth_ == 0) {
700 0 : GroupCoherentSamples group_samples;
701 0 : for (PublicationMap::iterator it = this->publication_map_.begin();
702 0 : it != this->publication_map_.end(); ++it) {
703 :
704 0 : if (it->second->coherent_samples_ == 0) {
705 0 : continue;
706 : }
707 :
708 : std::pair<GroupCoherentSamples::iterator, bool> pair =
709 0 : group_samples.insert(GroupCoherentSamples::value_type(
710 0 : it->second->get_guid(),
711 0 : WriterCoherentSample(it->second->coherent_samples_,
712 0 : it->second->sequence_number_)));
713 :
714 0 : if (!pair.second) {
715 0 : if (DCPS_debug_level > 0) {
716 0 : ACE_ERROR((LM_ERROR,
717 : ACE_TEXT("(%P|%t) ERROR: PublisherImpl::end_coherent_changes: ")
718 : ACE_TEXT("failed to insert to GroupCoherentSamples.\n")));
719 : }
720 0 : return DDS::RETCODE_ERROR;
721 : }
722 : }
723 :
724 0 : for (PublicationMap::iterator it = this->publication_map_.begin();
725 0 : it != this->publication_map_.end(); ++it) {
726 0 : if (it->second->coherent_samples_ == 0) {
727 0 : continue;
728 : }
729 :
730 0 : it->second->end_coherent_changes(group_samples);
731 : }
732 0 : }
733 :
734 0 : return DDS::RETCODE_OK;
735 0 : }
736 :
737 : #endif // OPENDDS_NO_OBJECT_MODEL_PROFILE
738 :
739 : DDS::ReturnCode_t
740 0 : PublisherImpl::wait_for_acknowledgments(
741 : const DDS::Duration_t& max_wait)
742 : {
743 0 : if (!enabled_) {
744 0 : if (DCPS_debug_level > 0) {
745 0 : ACE_ERROR((LM_ERROR,
746 : ACE_TEXT("(%P|%t) ERROR: PublisherImpl::wait_for_acknowledgments, ")
747 : ACE_TEXT("Entity is not enabled.\n")));
748 : }
749 0 : return DDS::RETCODE_NOT_ENABLED;
750 : }
751 :
752 : typedef OPENDDS_MAP(DataWriterImpl*, DataWriterImpl::AckToken) DataWriterAckMap;
753 0 : DataWriterAckMap ack_writers;
754 : {
755 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
756 : guard,
757 : this->pi_lock_,
758 : DDS::RETCODE_ERROR);
759 :
760 : // Collect writers to request acks
761 0 : for (DataWriterMap::iterator it(this->datawriter_map_.begin());
762 0 : it != this->datawriter_map_.end(); ++it) {
763 0 : DataWriterImpl_rch writer = it->second;
764 0 : if (writer->qos_.reliability.kind != DDS::RELIABLE_RELIABILITY_QOS)
765 0 : continue;
766 0 : if (writer->should_ack()) {
767 0 : DataWriterImpl::AckToken token = writer->create_ack_token(max_wait);
768 :
769 : std::pair<DataWriterAckMap::iterator, bool> pair =
770 0 : ack_writers.insert(DataWriterAckMap::value_type(writer.in(), token));
771 :
772 0 : if (!pair.second) {
773 0 : if (DCPS_debug_level > 0) {
774 0 : ACE_ERROR((LM_ERROR,
775 : ACE_TEXT("(%P|%t) ERROR: PublisherImpl::wait_for_acknowledgments, ")
776 : ACE_TEXT("Unable to insert AckToken into DataWriterAckMap!\n")));
777 : }
778 0 : return DDS::RETCODE_ERROR;
779 : }
780 0 : }
781 0 : }
782 0 : }
783 :
784 0 : if (ack_writers.empty()) {
785 0 : if (DCPS_debug_level > 0) {
786 0 : ACE_DEBUG((LM_DEBUG,
787 : ACE_TEXT("(%P|%t) PublisherImpl::wait_for_acknowledgments() - ")
788 : ACE_TEXT("not blocking due to no writers requiring acks.\n")));
789 : }
790 :
791 0 : return DDS::RETCODE_OK;
792 : }
793 :
794 : // Wait for ack responses from all associated readers
795 0 : for (DataWriterAckMap::iterator it(ack_writers.begin());
796 0 : it != ack_writers.end(); ++it) {
797 0 : DataWriterImpl::AckToken token = it->second;
798 :
799 0 : it->first->wait_for_specific_ack(token);
800 0 : }
801 :
802 0 : return DDS::RETCODE_OK;
803 0 : }
804 :
805 : DDS::DomainParticipant_ptr
806 0 : PublisherImpl::get_participant()
807 : {
808 0 : return participant_.lock()._retn();
809 : }
810 :
811 : DDS::ReturnCode_t
812 0 : PublisherImpl::set_default_datawriter_qos(const DDS::DataWriterQos & qos)
813 : {
814 0 : if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
815 0 : default_datawriter_qos_ = qos;
816 0 : return DDS::RETCODE_OK;
817 :
818 : } else {
819 0 : return DDS::RETCODE_INCONSISTENT_POLICY;
820 : }
821 : }
822 :
823 : DDS::ReturnCode_t
824 0 : PublisherImpl::get_default_datawriter_qos(DDS::DataWriterQos & qos)
825 : {
826 0 : qos = default_datawriter_qos_;
827 0 : return DDS::RETCODE_OK;
828 : }
829 :
830 : DDS::ReturnCode_t
831 0 : PublisherImpl::copy_from_topic_qos(DDS::DataWriterQos & a_datawriter_qos,
832 : const DDS::TopicQos & a_topic_qos)
833 : {
834 0 : if (Qos_Helper::copy_from_topic_qos(a_datawriter_qos, a_topic_qos)) {
835 0 : return DDS::RETCODE_OK;
836 : } else {
837 0 : return DDS::RETCODE_INCONSISTENT_POLICY;
838 : }
839 : }
840 :
841 : DDS::ReturnCode_t
842 0 : PublisherImpl::enable()
843 : {
844 : //According spec:
845 : // - Calling enable on an already enabled Entity returns OK and has no
846 : // effect.
847 : // - Calling enable on an Entity whose factory is not enabled will fail
848 : // and return PRECONDITION_NOT_MET.
849 :
850 0 : if (this->is_enabled()) {
851 0 : return DDS::RETCODE_OK;
852 : }
853 :
854 0 : RcHandle<DomainParticipantImpl> participant = this->participant_.lock();
855 0 : if (!participant || !participant->is_enabled()) {
856 0 : return DDS::RETCODE_PRECONDITION_NOT_MET;
857 : }
858 :
859 0 : if (this->monitor_) {
860 0 : this->monitor_->report();
861 : }
862 :
863 0 : this->set_enabled();
864 :
865 0 : if (qos_.entity_factory.autoenable_created_entities) {
866 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, pi_lock_, DDS::RETCODE_ERROR);
867 0 : DataWriterSet writers;
868 0 : writers_not_enabled_.swap(writers);
869 0 : for (DataWriterSet::iterator it = writers.begin(); it != writers.end(); ++it) {
870 0 : (*it)->enable();
871 : }
872 0 : }
873 :
874 0 : return DDS::RETCODE_OK;
875 0 : }
876 :
877 0 : bool PublisherImpl::is_clean(String* leftover_entities) const
878 : {
879 0 : if (leftover_entities) {
880 0 : leftover_entities->clear();
881 : }
882 :
883 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, pi_lock_, false);
884 :
885 0 : const size_t writer_count = datawriter_map_.size();
886 0 : if (leftover_entities && writer_count) {
887 0 : *leftover_entities += to_dds_string(writer_count) + " writer(s)";
888 : }
889 :
890 0 : const size_t publication_count = publication_map_.size();
891 0 : if (leftover_entities && publication_count) {
892 0 : if (leftover_entities->size()) {
893 0 : *leftover_entities += ", ";
894 : }
895 0 : *leftover_entities += to_dds_string(publication_count) + " publication(s)";
896 : }
897 :
898 0 : return writer_count == 0 && publication_count == 0;
899 0 : }
900 :
901 : DDS::ReturnCode_t
902 0 : PublisherImpl::writer_enabled(const char* topic_name,
903 : DataWriterImpl* writer_ptr)
904 : {
905 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
906 : guard,
907 : this->pi_lock_,
908 : DDS::RETCODE_ERROR);
909 0 : DataWriterImpl_rch writer = rchandle_from(writer_ptr);
910 0 : writers_not_enabled_.erase(writer);
911 :
912 0 : datawriter_map_.insert(DataWriterMap::value_type(topic_name, writer));
913 :
914 0 : const GUID_t publication_id = writer->get_guid();
915 :
916 : std::pair<PublicationMap::iterator, bool> pair =
917 0 : publication_map_.insert(PublicationMap::value_type(publication_id, writer));
918 :
919 0 : if (!pair.second) {
920 0 : if (DCPS_debug_level > 0) {
921 0 : ACE_ERROR((LM_ERROR,
922 : ACE_TEXT("(%P|%t) ERROR: ")
923 : ACE_TEXT("PublisherImpl::writer_enabled: ")
924 : ACE_TEXT("insert publication %C failed.\n"),
925 : LogGuid(publication_id).c_str()));
926 : }
927 0 : return DDS::RETCODE_ERROR;
928 : }
929 :
930 0 : if (this->monitor_) {
931 0 : this->monitor_->report();
932 : }
933 :
934 0 : return DDS::RETCODE_OK;
935 0 : }
936 :
937 :
938 : DDS::PublisherListener_ptr
939 0 : PublisherImpl::listener_for(DDS::StatusKind kind)
940 : {
941 : // per 2.1.4.3.1 Listener Access to Plain Communication Status
942 : // use this entities factory if listener is mask not enabled
943 : // for this kind.
944 0 : RcHandle<DomainParticipantImpl> participant = this->participant_.lock();
945 :
946 0 : if (!participant)
947 0 : return 0;
948 :
949 0 : ACE_Guard<ACE_Thread_Mutex> g(listener_mutex_);
950 0 : if (CORBA::is_nil(listener_.in()) || (listener_mask_ & kind) == 0) {
951 0 : g.release();
952 0 : return participant->listener_for(kind);
953 :
954 : } else {
955 0 : return DDS::PublisherListener::_duplicate(listener_.in());
956 : }
957 0 : }
958 :
959 : DDS::ReturnCode_t
960 0 : PublisherImpl::assert_liveliness_by_participant()
961 : {
962 0 : DDS::ReturnCode_t ret = DDS::RETCODE_OK;
963 :
964 0 : for (DataWriterMap::iterator it(datawriter_map_.begin());
965 0 : it != datawriter_map_.end(); ++it) {
966 0 : const DDS::ReturnCode_t dw_ret = it->second->assert_liveliness_by_participant();
967 :
968 0 : if (dw_ret != DDS::RETCODE_OK) {
969 0 : ret = dw_ret;
970 : }
971 : }
972 :
973 0 : return ret;
974 : }
975 :
976 : TimeDuration
977 0 : PublisherImpl::liveliness_check_interval(DDS::LivelinessQosPolicyKind kind)
978 : {
979 0 : TimeDuration tv = TimeDuration::max_value;
980 0 : for (DataWriterMap::iterator it(datawriter_map_.begin());
981 0 : it != datawriter_map_.end(); ++it) {
982 0 : tv = std::min(tv, it->second->liveliness_check_interval(kind));
983 : }
984 0 : return tv;
985 0 : }
986 :
987 : bool
988 0 : PublisherImpl::participant_liveliness_activity_after(const MonotonicTimePoint& tv)
989 : {
990 0 : for (DataWriterMap::iterator it(datawriter_map_.begin());
991 0 : it != datawriter_map_.end(); ++it) {
992 0 : if (it->second->participant_liveliness_activity_after(tv)) {
993 0 : return true;
994 : }
995 : }
996 0 : return false;
997 : }
998 :
999 : void
1000 0 : PublisherImpl::get_publication_ids(PublicationIdVec& pubs)
1001 : {
1002 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
1003 : guard,
1004 : this->pi_lock_,
1005 : );
1006 :
1007 0 : pubs.reserve(publication_map_.size());
1008 0 : for (PublicationMap::iterator iter = publication_map_.begin();
1009 0 : iter != publication_map_.end();
1010 0 : ++iter) {
1011 0 : pubs.push_back(iter->first);
1012 : }
1013 0 : }
1014 :
1015 : RcHandle<EntityImpl>
1016 0 : PublisherImpl::parent() const
1017 : {
1018 0 : return this->participant_.lock();
1019 : }
1020 :
1021 : bool
1022 0 : PublisherImpl::validate_datawriter_qos(const DDS::DataWriterQos& qos,
1023 : const DDS::DataWriterQos& default_qos,
1024 : DDS::Topic_ptr a_topic,
1025 : DDS::DataWriterQos& dw_qos)
1026 : {
1027 0 : if (CORBA::is_nil(a_topic)) {
1028 0 : if (DCPS_debug_level > 0) {
1029 0 : ACE_ERROR((LM_ERROR,
1030 : ACE_TEXT("(%P|%t) ERROR: ")
1031 : ACE_TEXT("PublisherImpl::create_datawriter, ")
1032 : ACE_TEXT("topic is nil.\n")));
1033 : }
1034 0 : return DDS::DataWriter::_nil();
1035 : }
1036 :
1037 0 : if (qos == DATAWRITER_QOS_DEFAULT) {
1038 0 : dw_qos = default_qos;
1039 :
1040 0 : } else if (qos == DATAWRITER_QOS_USE_TOPIC_QOS) {
1041 0 : DDS::TopicQos topic_qos;
1042 0 : a_topic->get_qos(topic_qos);
1043 0 : dw_qos = default_qos;
1044 :
1045 0 : Qos_Helper::copy_from_topic_qos(dw_qos, topic_qos);
1046 :
1047 0 : } else {
1048 0 : dw_qos = qos;
1049 : }
1050 :
1051 : OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(dw_qos, DDS::DataWriter::_nil());
1052 : OPENDDS_NO_OWNERSHIP_STRENGTH_COMPATIBILITY_CHECK(dw_qos, DDS::DataWriter::_nil());
1053 : OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(dw_qos, DDS::DataWriter::_nil());
1054 : OPENDDS_NO_DURABILITY_SERVICE_COMPATIBILITY_CHECK(dw_qos, DDS::DataWriter::_nil());
1055 : OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(dw_qos, DDS::DataWriter::_nil());
1056 :
1057 0 : if (!Qos_Helper::valid(dw_qos)) {
1058 0 : if (DCPS_debug_level > 0) {
1059 0 : ACE_ERROR((LM_ERROR,
1060 : ACE_TEXT("(%P|%t) ERROR: ")
1061 : ACE_TEXT("PublisherImpl::create_datawriter, ")
1062 : ACE_TEXT("invalid qos.\n")));
1063 : }
1064 0 : return DDS::DataWriter::_nil();
1065 : }
1066 :
1067 0 : if (!Qos_Helper::consistent(dw_qos)) {
1068 0 : if (DCPS_debug_level > 0) {
1069 0 : ACE_ERROR((LM_ERROR,
1070 : ACE_TEXT("(%P|%t) ERROR: ")
1071 : ACE_TEXT("PublisherImpl::create_datawriter, ")
1072 : ACE_TEXT("inconsistent qos.\n")));
1073 : }
1074 0 : return DDS::DataWriter::_nil();
1075 : }
1076 0 : return true;
1077 : }
1078 :
1079 : } // namespace DCPS
1080 : } // namespace OpenDDS
1081 : OPENDDS_END_VERSIONED_NAMESPACE_DECL
|