00001
00002
00003
00004
00005
00006
00007
00008 #include "DCPS/DdsDcps_pch.h"
00009 #include "PublisherImpl.h"
00010 #include "FeatureDisabledQosCheck.h"
00011 #include "DataWriterImpl.h"
00012 #include "DomainParticipantImpl.h"
00013 #include "DataWriterImpl.h"
00014 #include "Service_Participant.h"
00015 #include "Qos_Helper.h"
00016 #include "GuidConverter.h"
00017 #include "Marked_Default_Qos.h"
00018 #include "TopicImpl.h"
00019 #include "MonitorFactory.h"
00020 #include "dds/DCPS/transport/framework/ReceivedDataSample.h"
00021 #include "dds/DCPS/transport/framework/DataLinkSet.h"
00022 #include "dds/DCPS/transport/framework/TransportImpl.h"
00023 #include "tao/debug.h"
00024
00025 namespace OpenDDS {
00026 namespace DCPS {
00027
00028
00029 PublisherImpl::PublisherImpl(DDS::InstanceHandle_t handle,
00030 RepoId id,
00031 const DDS::PublisherQos& qos,
00032 DDS::PublisherListener_ptr a_listener,
00033 const DDS::StatusMask& mask,
00034 DomainParticipantImpl* participant)
00035 : handle_(handle),
00036 qos_(qos),
00037 default_datawriter_qos_(TheServiceParticipant->initial_DataWriterQos()),
00038 listener_mask_(mask),
00039 listener_(DDS::PublisherListener::_duplicate(a_listener)),
00040 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00041 change_depth_(0),
00042 #endif
00043 domain_id_(participant->get_domain_id()),
00044 participant_(participant),
00045 suspend_depth_count_(0),
00046 sequence_number_(),
00047 aggregation_period_start_(ACE_Time_Value::zero),
00048 reverse_pi_lock_(pi_lock_),
00049 monitor_(0),
00050 publisher_id_(id)
00051 {
00052 monitor_ = TheServiceParticipant->monitor_factory_->create_publisher_monitor(this);
00053 }
00054
00055
00056 PublisherImpl::~PublisherImpl()
00057 {
00058
00059
00060 if (!is_clean()) {
00061 ACE_ERROR((LM_ERROR,
00062 ACE_TEXT("(%P|%t) ERROR: ")
00063 ACE_TEXT("PublisherImpl::~PublisherImpl, ")
00064 ACE_TEXT("some datawriters still exist.\n")));
00065 }
00066 }
00067
00068 DDS::InstanceHandle_t
00069 PublisherImpl::get_instance_handle()
00070 {
00071 return handle_;
00072 }
00073
00074 bool
00075 PublisherImpl::contains_writer(DDS::InstanceHandle_t a_handle)
00076 {
00077 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00078 guard,
00079 this->pi_lock_,
00080 DDS::RETCODE_ERROR);
00081
00082 for (DataWriterMap::iterator it(datawriter_map_.begin());
00083 it != datawriter_map_.end(); ++it) {
00084 if (a_handle == it->second->get_instance_handle()) {
00085 return true;
00086 }
00087 }
00088
00089 return false;
00090 }
00091
00092 DDS::DataWriter_ptr
00093 PublisherImpl::create_datawriter(
00094 DDS::Topic_ptr a_topic,
00095 const DDS::DataWriterQos & qos,
00096 DDS::DataWriterListener_ptr a_listener,
00097 DDS::StatusMask mask)
00098 {
00099 DDS::DataWriterQos dw_qos;
00100
00101 if (!validate_datawriter_qos(qos, default_datawriter_qos_, a_topic, dw_qos)) {
00102 return DDS::DataWriter::_nil();
00103 }
00104
00105 TopicImpl* topic_servant = dynamic_cast<TopicImpl*>(a_topic);
00106
00107 OpenDDS::DCPS::TypeSupport_ptr typesupport =
00108 topic_servant->get_type_support();
00109
00110 if (typesupport == 0) {
00111 CORBA::String_var name = topic_servant->get_name();
00112 ACE_ERROR((LM_ERROR,
00113 ACE_TEXT("(%P|%t) ERROR: ")
00114 ACE_TEXT("PublisherImpl::create_datawriter, ")
00115 ACE_TEXT("typesupport(topic_name=%C) is nil.\n"),
00116 name.in()));
00117 return DDS::DataWriter::_nil();
00118 }
00119
00120 DDS::DataWriter_var dw_obj = typesupport->create_datawriter();
00121
00122 DataWriterImpl* dw_servant =
00123 dynamic_cast <DataWriterImpl*>(dw_obj.in());
00124
00125 dw_servant->init(a_topic,
00126 topic_servant,
00127 dw_qos,
00128 a_listener,
00129 mask,
00130 participant_,
00131 this,
00132 dw_obj.in());
00133
00134 if (this->enabled_ == true
00135 && qos_.entity_factory.autoenable_created_entities == 1) {
00136
00137 DDS::ReturnCode_t ret = dw_servant->enable();
00138
00139 if (ret != DDS::RETCODE_OK) {
00140 ACE_ERROR((LM_ERROR,
00141 ACE_TEXT("(%P|%t) ERROR: ")
00142 ACE_TEXT("PublisherImpl::create_datawriter, ")
00143 ACE_TEXT("enable failed.\n")));
00144 return DDS::DataWriter::_nil();
00145 }
00146 }
00147
00148 return DDS::DataWriter::_duplicate(dw_obj.in());
00149 }
00150
00151 DDS::ReturnCode_t
00152 PublisherImpl::delete_datawriter(DDS::DataWriter_ptr a_datawriter)
00153 {
00154 DataWriterImpl* dw_servant = dynamic_cast<DataWriterImpl*>(a_datawriter);
00155 if (dw_servant) {
00156
00157 dw_servant->prepare_to_delete();
00158 }
00159
00160 if (!dw_servant) {
00161 ACE_ERROR((LM_ERROR,
00162 "(%P|%t) PublisherImpl::delete_datawriter - dynamic cast to DataWriterImpl failed\n"
00163 ));
00164 return DDS::RETCODE_ERROR;
00165 }
00166
00167 {
00168 DDS::Publisher_var dw_publisher(dw_servant->get_publisher());
00169
00170 if (dw_publisher.in() != this) {
00171 RepoId id = dw_servant->get_publication_id();
00172 GuidConverter converter(id);
00173 ACE_ERROR((LM_ERROR,
00174 ACE_TEXT("(%P|%t) PublisherImpl::delete_datawriter: ")
00175 ACE_TEXT("the data writer %C doesn't ")
00176 ACE_TEXT("belong to this subscriber \n"),
00177 OPENDDS_STRING(converter).c_str()));
00178 return DDS::RETCODE_PRECONDITION_NOT_MET;
00179 }
00180 }
00181
00182 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
00183
00184
00185
00186 if (!dw_servant->persist_data() && DCPS_debug_level >= 2) {
00187 ACE_ERROR((LM_ERROR,
00188 ACE_TEXT("(%P|%t) ERROR: ")
00189 ACE_TEXT("PublisherImpl::delete_datawriter, ")
00190 ACE_TEXT("failed to make data durable.\n")));
00191 }
00192 #endif
00193
00194
00195 DDS::Time_t source_timestamp = time_value_to_time(ACE_OS::gettimeofday());
00196 dw_servant->unregister_instances(source_timestamp);
00197
00198
00199
00200 dw_servant->wait_pending();
00201 dw_servant->wait_control_pending();
00202
00203 CORBA::String_var topic_name = dw_servant->get_topic_name();
00204 RepoId publication_id = GUID_UNKNOWN;
00205 {
00206 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00207 guard,
00208 this->pi_lock_,
00209 DDS::RETCODE_ERROR);
00210
00211 publication_id = dw_servant->get_publication_id();
00212
00213 PublicationMap::iterator it = publication_map_.find(publication_id);
00214
00215 if (it == publication_map_.end()) {
00216 GuidConverter converter(publication_id);
00217 ACE_ERROR_RETURN((LM_ERROR,
00218 ACE_TEXT("(%P|%t) ERROR: ")
00219 ACE_TEXT("PublisherImpl::delete_datawriter, ")
00220 ACE_TEXT("datawriter %C not found.\n"),
00221 OPENDDS_STRING(converter).c_str()), DDS::RETCODE_ERROR);
00222 }
00223
00224
00225
00226
00227
00228
00229 DataWriterMap::iterator writ;
00230 DataWriterMap::iterator the_writ = datawriter_map_.end();
00231
00232 for (writ = datawriter_map_.begin();
00233 writ != datawriter_map_.end();
00234 ++writ) {
00235 if (writ->second == it->second) {
00236 the_writ = writ;
00237 break;
00238 }
00239 }
00240
00241 if (the_writ != datawriter_map_.end()) {
00242 datawriter_map_.erase(the_writ);
00243 }
00244
00245 publication_map_.erase(it);
00246
00247
00248
00249
00250
00251 ACE_GUARD_RETURN(reverse_lock_type, reverse_monitor, this->reverse_pi_lock_,
00252 DDS::RETCODE_ERROR);
00253
00254
00255 dw_servant->wait_pending();
00256
00257
00258
00259 dw_servant->remove_all_associations();
00260 dw_servant->cleanup();
00261 }
00262
00263 dw_servant->unregister_all();
00264 Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
00265 if (!disco->remove_publication(
00266 this->domain_id_,
00267 this->participant_->get_id(),
00268 publication_id)) {
00269 ACE_ERROR_RETURN((LM_ERROR,
00270 ACE_TEXT("(%P|%t) ERROR: ")
00271 ACE_TEXT("PublisherImpl::delete_datawriter, ")
00272 ACE_TEXT("publication not removed from discovery.\n")),
00273 DDS::RETCODE_ERROR);
00274 }
00275
00276 dw_servant->_remove_ref();
00277
00278 participant_->remove_adjust_liveliness_timers();
00279
00280 return DDS::RETCODE_OK;
00281 }
00282
00283 DDS::DataWriter_ptr
00284 PublisherImpl::lookup_datawriter(const char* topic_name)
00285 {
00286 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00287 guard,
00288 this->pi_lock_,
00289 DDS::DataWriter::_nil());
00290
00291
00292
00293 DataWriterMap::iterator it = datawriter_map_.find(topic_name);
00294
00295 if (it == datawriter_map_.end()) {
00296 if (DCPS_debug_level >= 2) {
00297 ACE_DEBUG((LM_DEBUG,
00298 ACE_TEXT("(%P|%t) ")
00299 ACE_TEXT("PublisherImpl::lookup_datawriter, ")
00300 ACE_TEXT("The datawriter(topic_name=%C) is not found\n"),
00301 topic_name));
00302 }
00303
00304 return DDS::DataWriter::_nil();
00305
00306 } else {
00307 return DDS::DataWriter::_duplicate(it->second);
00308 }
00309 }
00310
00311 DDS::ReturnCode_t
00312 PublisherImpl::delete_contained_entities()
00313 {
00314
00315 set_deleted(true);
00316
00317 while (true) {
00318 PublicationId pub_id = GUID_UNKNOWN;
00319 DataWriterImpl* a_datawriter;
00320
00321 {
00322 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00323 guard,
00324 this->pi_lock_,
00325 DDS::RETCODE_ERROR);
00326
00327 if (datawriter_map_.empty()) {
00328 break;
00329 } else {
00330 a_datawriter = datawriter_map_.begin()->second;
00331 pub_id = a_datawriter->get_publication_id();
00332 }
00333 }
00334
00335 DDS::ReturnCode_t ret = delete_datawriter(a_datawriter);
00336
00337 if (ret != DDS::RETCODE_OK) {
00338 GuidConverter converter(pub_id);
00339 ACE_ERROR_RETURN((LM_ERROR,
00340 ACE_TEXT("(%P|%t) ERROR: ")
00341 ACE_TEXT("PublisherImpl::")
00342 ACE_TEXT("delete_contained_entities: ")
00343 ACE_TEXT("failed to delete ")
00344 ACE_TEXT("datawriter %C.\n"),
00345 OPENDDS_STRING(converter).c_str()),ret);
00346 }
00347 }
00348
00349
00350 set_deleted(false);
00351
00352 return DDS::RETCODE_OK;
00353 }
00354
00355 DDS::ReturnCode_t
00356 PublisherImpl::set_qos(const DDS::PublisherQos & qos)
00357 {
00358
00359 OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00360
00361 if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
00362 if (qos_ == qos)
00363 return DDS::RETCODE_OK;
00364
00365
00366 if (!Qos_Helper::changeable(qos_, qos) && enabled_ == true) {
00367 return DDS::RETCODE_IMMUTABLE_POLICY;
00368
00369 } else {
00370 qos_ = qos;
00371
00372 DwIdToQosMap idToQosMap;
00373 {
00374 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00375 guard,
00376 this->pi_lock_,
00377 DDS::RETCODE_ERROR);
00378
00379 for (PublicationMap::iterator iter = publication_map_.begin();
00380 iter != publication_map_.end();
00381 ++iter) {
00382 DDS::DataWriterQos qos;
00383 iter->second->get_qos(qos);
00384 RepoId id = iter->second->get_publication_id();
00385 std::pair<DwIdToQosMap::iterator, bool> pair =
00386 idToQosMap.insert(DwIdToQosMap::value_type(id, qos));
00387
00388 if (pair.second == false) {
00389 GuidConverter converter(id);
00390 ACE_ERROR_RETURN((LM_ERROR,
00391 ACE_TEXT("(%P|%t) ")
00392 ACE_TEXT("PublisherImpl::set_qos: ")
00393 ACE_TEXT("insert id %d to DwIdToQosMap ")
00394 ACE_TEXT("failed.\n"),
00395 OPENDDS_STRING(converter).c_str()), DDS::RETCODE_ERROR);
00396 }
00397 }
00398 }
00399
00400 DwIdToQosMap::iterator iter = idToQosMap.begin();
00401
00402 while (iter != idToQosMap.end()) {
00403 Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
00404 const bool status
00405 = disco->update_publication_qos(
00406 participant_->get_domain_id(),
00407 participant_->get_id(),
00408 iter->first,
00409 iter->second,
00410 this->qos_);
00411
00412 if (!status) {
00413 ACE_ERROR_RETURN((LM_ERROR,
00414 ACE_TEXT("(%P|%t) PublisherImpl::set_qos, ")
00415 ACE_TEXT("failed. \n")),
00416 DDS::RETCODE_ERROR);
00417 }
00418
00419 ++iter;
00420 }
00421 }
00422
00423 return DDS::RETCODE_OK;
00424
00425 } else {
00426 return DDS::RETCODE_INCONSISTENT_POLICY;
00427 }
00428 }
00429
00430 DDS::ReturnCode_t
00431 PublisherImpl::get_qos(DDS::PublisherQos & qos)
00432 {
00433 qos = qos_;
00434 return DDS::RETCODE_OK;
00435 }
00436
00437 DDS::ReturnCode_t
00438 PublisherImpl::set_listener(DDS::PublisherListener_ptr a_listener,
00439 DDS::StatusMask mask)
00440 {
00441 listener_mask_ = mask;
00442
00443 listener_ = DDS::PublisherListener::_duplicate(a_listener);
00444 return DDS::RETCODE_OK;
00445 }
00446
00447 DDS::PublisherListener_ptr
00448 PublisherImpl::get_listener()
00449 {
00450 return DDS::PublisherListener::_duplicate(listener_.in());
00451 }
00452
00453 DDS::ReturnCode_t
00454 PublisherImpl::suspend_publications()
00455 {
00456 if (enabled_ == false) {
00457 ACE_ERROR_RETURN((LM_ERROR,
00458 ACE_TEXT("(%P|%t) ERROR: ")
00459 ACE_TEXT("PublisherImpl::suspend_publications, ")
00460 ACE_TEXT(" Entity is not enabled. \n")),
00461 DDS::RETCODE_NOT_ENABLED);
00462 }
00463
00464 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00465 guard,
00466 this->pi_lock_,
00467 DDS::RETCODE_ERROR);
00468 ++suspend_depth_count_;
00469 return DDS::RETCODE_OK;
00470 }
00471
00472 bool
00473 PublisherImpl::is_suspended() const
00474 {
00475 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00476 guard,
00477 this->pi_lock_,
00478 false);
00479 return suspend_depth_count_;
00480 }
00481
00482 DDS::ReturnCode_t
00483 PublisherImpl::resume_publications()
00484 {
00485 if (enabled_ == false) {
00486 ACE_ERROR_RETURN((LM_ERROR,
00487 ACE_TEXT("(%P|%t) ERROR: ")
00488 ACE_TEXT("PublisherImpl::resume_publications, ")
00489 ACE_TEXT(" Entity is not enabled. \n")),
00490 DDS::RETCODE_NOT_ENABLED);
00491 }
00492
00493 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00494 guard,
00495 this->pi_lock_,
00496 DDS::RETCODE_ERROR);
00497
00498 --suspend_depth_count_;
00499
00500 if (suspend_depth_count_ < 0) {
00501 suspend_depth_count_ = 0;
00502 return DDS::RETCODE_PRECONDITION_NOT_MET;
00503 }
00504
00505 if (suspend_depth_count_ == 0) {
00506
00507 for (PublicationMap::iterator it = this->publication_map_.begin();
00508 it != this->publication_map_.end(); ++it) {
00509 it->second->send_suspended_data();
00510 }
00511 }
00512
00513 return DDS::RETCODE_OK;
00514 }
00515
00516 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00517
00518 DDS::ReturnCode_t
00519 PublisherImpl::begin_coherent_changes()
00520 {
00521 if (enabled_ == false) {
00522 ACE_ERROR_RETURN((LM_ERROR,
00523 ACE_TEXT("(%P|%t) ERROR: PublisherImpl::begin_coherent_changes:")
00524 ACE_TEXT(" Publisher is not enabled!\n")),
00525 DDS::RETCODE_NOT_ENABLED);
00526 }
00527
00528 if (!qos_.presentation.coherent_access) {
00529 ACE_ERROR_RETURN((LM_ERROR,
00530 ACE_TEXT("(%P|%t) ERROR: PublisherImpl::begin_coherent_changes:")
00531 ACE_TEXT(" QoS policy does not support coherent access!\n")),
00532 DDS::RETCODE_ERROR);
00533 }
00534
00535 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00536 guard,
00537 this->pi_lock_,
00538 DDS::RETCODE_ERROR);
00539
00540 ++this->change_depth_;
00541
00542 if (qos_.presentation.access_scope == DDS::INSTANCE_PRESENTATION_QOS) {
00543
00544
00545 return DDS::RETCODE_OK;
00546 }
00547
00548
00549
00550 if (this->change_depth_ == 1) {
00551 for (PublicationMap::iterator it = this->publication_map_.begin();
00552 it != this->publication_map_.end(); ++it) {
00553 it->second->begin_coherent_changes();
00554 }
00555 }
00556
00557 return DDS::RETCODE_OK;
00558 }
00559
00560 DDS::ReturnCode_t
00561 PublisherImpl::end_coherent_changes()
00562 {
00563 if (enabled_ == false) {
00564 ACE_ERROR_RETURN((LM_ERROR,
00565 ACE_TEXT("(%P|%t) ERROR: PublisherImpl::end_coherent_changes:")
00566 ACE_TEXT(" Publisher is not enabled!\n")),
00567 DDS::RETCODE_NOT_ENABLED);
00568 }
00569
00570 if (!qos_.presentation.coherent_access) {
00571 ACE_ERROR_RETURN((LM_ERROR,
00572 ACE_TEXT("(%P|%t) ERROR: PublisherImpl::end_coherent_changes:")
00573 ACE_TEXT(" QoS policy does not support coherent access!\n")),
00574 DDS::RETCODE_ERROR);
00575 }
00576
00577 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00578 guard,
00579 this->pi_lock_,
00580 DDS::RETCODE_ERROR);
00581
00582 if (this->change_depth_ == 0) {
00583 ACE_ERROR_RETURN((LM_ERROR,
00584 ACE_TEXT("(%P|%t) ERROR: PublisherImpl::end_coherent_changes:")
00585 ACE_TEXT(" No matching call to begin_coherent_changes!\n")),
00586 DDS::RETCODE_PRECONDITION_NOT_MET);
00587 }
00588
00589 --this->change_depth_;
00590
00591 if (qos_.presentation.access_scope == DDS::INSTANCE_PRESENTATION_QOS) {
00592
00593
00594 return DDS::RETCODE_OK;
00595 }
00596
00597
00598
00599 if (this->change_depth_ == 0) {
00600 GroupCoherentSamples group_samples;
00601 for (PublicationMap::iterator it = this->publication_map_.begin();
00602 it != this->publication_map_.end(); ++it) {
00603
00604 if (it->second->coherent_samples_ == 0) {
00605 continue;
00606 }
00607
00608 std::pair<GroupCoherentSamples::iterator, bool> pair =
00609 group_samples.insert(GroupCoherentSamples::value_type(
00610 it->second->get_publication_id(),
00611 WriterCoherentSample(it->second->coherent_samples_,
00612 it->second->sequence_number_)));
00613
00614 if (pair.second == false) {
00615 ACE_ERROR_RETURN((LM_ERROR,
00616 ACE_TEXT("(%P|%t) ERROR: PublisherImpl::end_coherent_changes: ")
00617 ACE_TEXT("failed to insert to GroupCoherentSamples.\n")),
00618 DDS::RETCODE_ERROR);
00619 }
00620 }
00621
00622 for (PublicationMap::iterator it = this->publication_map_.begin();
00623 it != this->publication_map_.end(); ++it) {
00624 if (it->second->coherent_samples_ == 0) {
00625 continue;
00626 }
00627
00628 it->second->end_coherent_changes(group_samples);
00629 }
00630 }
00631
00632 return DDS::RETCODE_OK;
00633 }
00634
00635 #endif // OPENDDS_NO_OBJECT_MODEL_PROFILE
00636
00637 DDS::ReturnCode_t
00638 PublisherImpl::wait_for_acknowledgments(
00639 const DDS::Duration_t& max_wait)
00640 {
00641 if (enabled_ == false) {
00642 ACE_ERROR_RETURN((LM_ERROR,
00643 ACE_TEXT("(%P|%t) ERROR: PublisherImpl::wait_for_acknowledgments, ")
00644 ACE_TEXT("Entity is not enabled.\n")),
00645 DDS::RETCODE_NOT_ENABLED);
00646 }
00647
00648 typedef OPENDDS_MAP(DataWriterImpl*, DataWriterImpl::AckToken) DataWriterAckMap;
00649 DataWriterAckMap ack_writers;
00650 {
00651 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00652 guard,
00653 this->pi_lock_,
00654 DDS::RETCODE_ERROR);
00655
00656
00657 for (DataWriterMap::iterator it(this->datawriter_map_.begin());
00658 it != this->datawriter_map_.end(); ++it) {
00659 DataWriterImpl* writer = it->second;
00660 if (writer->qos_.reliability.kind != DDS::RELIABLE_RELIABILITY_QOS)
00661 continue;
00662 if (writer->should_ack()) {
00663 DataWriterImpl::AckToken token = writer->create_ack_token(max_wait);
00664
00665 std::pair<DataWriterAckMap::iterator, bool> pair =
00666 ack_writers.insert(DataWriterAckMap::value_type(writer, token));
00667
00668 if (!pair.second) {
00669 ACE_ERROR_RETURN((LM_ERROR,
00670 ACE_TEXT("(%P|%t) ERROR: PublisherImpl::wait_for_acknowledgments, ")
00671 ACE_TEXT("Unable to insert AckToken into DataWriterAckMap!\n")),
00672 DDS::RETCODE_ERROR);
00673 }
00674 }
00675 }
00676 }
00677
00678 if (ack_writers.empty()) {
00679 if (DCPS_debug_level > 0) {
00680 ACE_DEBUG((LM_DEBUG,
00681 ACE_TEXT("(%P|%t) PublisherImpl::wait_for_acknowledgments() - ")
00682 ACE_TEXT("not blocking due to no writers requiring acks.\n")));
00683 }
00684
00685 return DDS::RETCODE_OK;
00686 }
00687
00688
00689 for (DataWriterAckMap::iterator it(ack_writers.begin());
00690 it != ack_writers.end(); ++it) {
00691 DataWriterImpl::AckToken token = it->second;
00692
00693 it->first->wait_for_specific_ack(token);
00694 }
00695
00696 return DDS::RETCODE_OK;
00697 }
00698
00699 DDS::DomainParticipant_ptr
00700 PublisherImpl::get_participant()
00701 {
00702 return DDS::DomainParticipant::_duplicate(participant_);
00703 }
00704
00705 DDS::ReturnCode_t
00706 PublisherImpl::set_default_datawriter_qos(const DDS::DataWriterQos & qos)
00707 {
00708 if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
00709 default_datawriter_qos_ = qos;
00710 return DDS::RETCODE_OK;
00711
00712 } else {
00713 return DDS::RETCODE_INCONSISTENT_POLICY;
00714 }
00715 }
00716
00717 DDS::ReturnCode_t
00718 PublisherImpl::get_default_datawriter_qos(DDS::DataWriterQos & qos)
00719 {
00720 qos = default_datawriter_qos_;
00721 return DDS::RETCODE_OK;
00722 }
00723
00724 DDS::ReturnCode_t
00725 PublisherImpl::copy_from_topic_qos(DDS::DataWriterQos & a_datawriter_qos,
00726 const DDS::TopicQos & a_topic_qos)
00727 {
00728 if (Qos_Helper::copy_from_topic_qos(a_datawriter_qos, a_topic_qos)) {
00729 return DDS::RETCODE_OK;
00730 } else {
00731 return DDS::RETCODE_INCONSISTENT_POLICY;
00732 }
00733 }
00734
00735 DDS::ReturnCode_t
00736 PublisherImpl::enable()
00737 {
00738
00739
00740
00741
00742
00743
00744 if (this->is_enabled()) {
00745 return DDS::RETCODE_OK;
00746 }
00747
00748 if (this->participant_->is_enabled() == false) {
00749 return DDS::RETCODE_PRECONDITION_NOT_MET;
00750 }
00751
00752 if (this->monitor_) {
00753 this->monitor_->report();
00754 }
00755
00756 this->set_enabled();
00757 return DDS::RETCODE_OK;
00758 }
00759
00760 bool
00761 PublisherImpl::is_clean() const
00762 {
00763 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00764 guard,
00765 this->pi_lock_,
00766 false);
00767 return datawriter_map_.empty() && publication_map_.empty();
00768 }
00769
00770 DDS::ReturnCode_t
00771 PublisherImpl::writer_enabled(const char* topic_name,
00772 DataWriterImpl* writer)
00773 {
00774 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00775 guard,
00776 this->pi_lock_,
00777 DDS::RETCODE_ERROR);
00778
00779 datawriter_map_.insert(DataWriterMap::value_type(topic_name, writer));
00780
00781 const RepoId publication_id = writer->get_publication_id();
00782
00783 std::pair<PublicationMap::iterator, bool> pair =
00784 publication_map_.insert(PublicationMap::value_type(publication_id, writer));
00785
00786 if (pair.second == false) {
00787 GuidConverter converter(publication_id);
00788 ACE_ERROR_RETURN((LM_ERROR,
00789 ACE_TEXT("(%P|%t) ERROR: ")
00790 ACE_TEXT("PublisherImpl::writer_enabled: ")
00791 ACE_TEXT("insert publication %C failed.\n"),
00792 OPENDDS_STRING(converter).c_str()), DDS::RETCODE_ERROR);
00793 }
00794
00795
00796
00797 writer->_add_ref();
00798
00799 return DDS::RETCODE_OK;
00800 }
00801
00802
00803 DDS::PublisherListener_ptr
00804 PublisherImpl::listener_for(DDS::StatusKind kind)
00805 {
00806
00807
00808
00809 if (CORBA::is_nil(listener_.in()) || (listener_mask_ & kind) == 0) {
00810 return participant_->listener_for(kind);
00811
00812 } else {
00813 return DDS::PublisherListener::_duplicate(listener_.in());
00814 }
00815 }
00816
00817 DDS::ReturnCode_t
00818 PublisherImpl::assert_liveliness_by_participant()
00819 {
00820 DDS::ReturnCode_t ret = DDS::RETCODE_OK;
00821
00822 for (DataWriterMap::iterator it(datawriter_map_.begin());
00823 it != datawriter_map_.end(); ++it) {
00824 DDS::ReturnCode_t dw_ret = it->second->assert_liveliness_by_participant();
00825
00826 if (dw_ret != DDS::RETCODE_OK) {
00827 ret = dw_ret;
00828 }
00829 }
00830
00831 return ret;
00832 }
00833
00834 ACE_Time_Value
00835 PublisherImpl::liveliness_check_interval(DDS::LivelinessQosPolicyKind kind)
00836 {
00837 ACE_Time_Value tv = ACE_Time_Value::max_time;
00838 for (DataWriterMap::iterator it(datawriter_map_.begin());
00839 it != datawriter_map_.end(); ++it) {
00840 tv = std::min (tv, it->second->liveliness_check_interval(kind));
00841 }
00842 return tv;
00843 }
00844
00845 bool
00846 PublisherImpl::participant_liveliness_activity_after(const ACE_Time_Value& tv)
00847 {
00848 for (DataWriterMap::iterator it(datawriter_map_.begin());
00849 it != datawriter_map_.end(); ++it) {
00850 if (it->second->participant_liveliness_activity_after(tv)) {
00851 return true;
00852 }
00853 }
00854 return false;
00855 }
00856
00857 void
00858 PublisherImpl::get_publication_ids(PublicationIdVec& pubs)
00859 {
00860 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00861 guard,
00862 this->pi_lock_,
00863 );
00864
00865 pubs.reserve(publication_map_.size());
00866 for (PublicationMap::iterator iter = publication_map_.begin();
00867 iter != publication_map_.end();
00868 ++iter) {
00869 pubs.push_back(iter->first);
00870 }
00871 }
00872
00873 EntityImpl*
00874 PublisherImpl::parent() const
00875 {
00876 return this->participant_;
00877 }
00878
00879 bool
00880 PublisherImpl::validate_datawriter_qos(const DDS::DataWriterQos& qos,
00881 const DDS::DataWriterQos& default_qos,
00882 DDS::Topic_ptr a_topic,
00883 DDS::DataWriterQos& dw_qos)
00884 {
00885 if (CORBA::is_nil(a_topic)) {
00886 ACE_ERROR((LM_ERROR,
00887 ACE_TEXT("(%P|%t) ERROR: ")
00888 ACE_TEXT("PublisherImpl::create_datawriter, ")
00889 ACE_TEXT("topic is nil.\n")));
00890 return DDS::DataWriter::_nil();
00891 }
00892
00893 if (qos == DATAWRITER_QOS_DEFAULT) {
00894 dw_qos = default_qos;
00895
00896 } else if (qos == DATAWRITER_QOS_USE_TOPIC_QOS) {
00897 DDS::TopicQos topic_qos;
00898 a_topic->get_qos(topic_qos);
00899 dw_qos = default_qos;
00900
00901 Qos_Helper::copy_from_topic_qos(dw_qos, topic_qos);
00902
00903 } else {
00904 dw_qos = qos;
00905 }
00906
00907 OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(dw_qos, DDS::DataWriter::_nil());
00908 OPENDDS_NO_OWNERSHIP_STRENGTH_COMPATIBILITY_CHECK(dw_qos, DDS::DataWriter::_nil());
00909 OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(dw_qos, DDS::DataWriter::_nil());
00910 OPENDDS_NO_DURABILITY_SERVICE_COMPATIBILITY_CHECK(dw_qos, DDS::DataWriter::_nil());
00911 OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(dw_qos, DDS::DataWriter::_nil());
00912
00913 if (!Qos_Helper::valid(dw_qos)) {
00914 ACE_ERROR((LM_ERROR,
00915 ACE_TEXT("(%P|%t) ERROR: ")
00916 ACE_TEXT("PublisherImpl::create_datawriter, ")
00917 ACE_TEXT("invalid qos.\n")));
00918 return DDS::DataWriter::_nil();
00919 }
00920
00921 if (!Qos_Helper::consistent(dw_qos)) {
00922 ACE_ERROR((LM_ERROR,
00923 ACE_TEXT("(%P|%t) ERROR: ")
00924 ACE_TEXT("PublisherImpl::create_datawriter, ")
00925 ACE_TEXT("inconsistent qos.\n")));
00926 return DDS::DataWriter::_nil();
00927 }
00928 return true;
00929 }
00930
00931
00932 }
00933 }