00001
00002
00003
00004
00005
00006
00007
00008 #include "DCPS/DdsDcps_pch.h"
00009 #include "PublisherImpl.h"
00010 #include "FeatureDisabledQosCheck.h"
00011 #include "DataWriterImpl.h"
00012 #include "DomainParticipantImpl.h"
00013 #include "DataWriterImpl.h"
00014 #include "Service_Participant.h"
00015 #include "Qos_Helper.h"
00016 #include "GuidConverter.h"
00017 #include "Marked_Default_Qos.h"
00018 #include "TopicImpl.h"
00019 #include "MonitorFactory.h"
00020 #include "dds/DCPS/transport/framework/ReceivedDataSample.h"
00021 #include "dds/DCPS/transport/framework/DataLinkSet.h"
00022 #include "dds/DCPS/transport/framework/TransportImpl.h"
00023 #include "tao/debug.h"
00024
00025 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00026
00027 namespace OpenDDS {
00028 namespace DCPS {
00029
00030 PublisherImpl::PublisherImpl(DDS::InstanceHandle_t handle,
00031 RepoId id,
00032 const DDS::PublisherQos& qos,
00033 DDS::PublisherListener_ptr a_listener,
00034 const DDS::StatusMask& mask,
00035 DomainParticipantImpl* participant)
00036 : handle_(handle),
00037 qos_(qos),
00038 default_datawriter_qos_(TheServiceParticipant->initial_DataWriterQos()),
00039 listener_mask_(mask),
00040 listener_(DDS::PublisherListener::_duplicate(a_listener)),
00041 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00042 change_depth_(0),
00043 #endif
00044 domain_id_(participant->get_domain_id()),
00045 participant_(*participant),
00046 suspend_depth_count_(0),
00047 sequence_number_(),
00048 aggregation_period_start_(ACE_Time_Value::zero),
00049 reverse_pi_lock_(pi_lock_),
00050 monitor_(0),
00051 publisher_id_(id)
00052 {
00053 monitor_ = TheServiceParticipant->monitor_factory_->create_publisher_monitor(this);
00054 }
00055
00056 PublisherImpl::~PublisherImpl()
00057 {
00058
00059
00060 if (!is_clean()) {
00061 ACE_ERROR((LM_ERROR,
00062 ACE_TEXT("(%P|%t) ERROR: ")
00063 ACE_TEXT("PublisherImpl::~PublisherImpl, ")
00064 ACE_TEXT("%B datawriters and %B publications still exist.\n"),
00065 datawriter_map_.size(), publication_map_.size()));
00066 }
00067 }
00068
00069 DDS::InstanceHandle_t
00070 PublisherImpl::get_instance_handle()
00071 {
00072 return handle_;
00073 }
00074
00075 bool
00076 PublisherImpl::contains_writer(DDS::InstanceHandle_t a_handle)
00077 {
00078 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00079 guard,
00080 this->pi_lock_,
00081 DDS::RETCODE_ERROR);
00082
00083 for (DataWriterMap::iterator it(datawriter_map_.begin());
00084 it != datawriter_map_.end(); ++it) {
00085 if (a_handle == it->second->get_instance_handle()) {
00086 return true;
00087 }
00088 }
00089
00090 return false;
00091 }
00092
00093 DDS::DataWriter_ptr
00094 PublisherImpl::create_datawriter(
00095 DDS::Topic_ptr a_topic,
00096 const DDS::DataWriterQos & qos,
00097 DDS::DataWriterListener_ptr a_listener,
00098 DDS::StatusMask mask)
00099 {
00100 DDS::DataWriterQos dw_qos;
00101
00102 if (!validate_datawriter_qos(qos, default_datawriter_qos_, a_topic, dw_qos)) {
00103 return DDS::DataWriter::_nil();
00104 }
00105
00106 TopicImpl* topic_servant = dynamic_cast<TopicImpl*>(a_topic);
00107
00108 if (!topic_servant) {
00109 CORBA::String_var name = a_topic->get_name();
00110 ACE_ERROR((LM_ERROR,
00111 ACE_TEXT("(%P|%t) ERROR: ")
00112 ACE_TEXT("PublisherImpl::create_datawriter, ")
00113 ACE_TEXT("topic_servant(topic_name=%C) is nil.\n"),
00114 name.in()));
00115 return 0;
00116 }
00117
00118 OpenDDS::DCPS::TypeSupport_ptr typesupport =
00119 topic_servant->get_type_support();
00120
00121 if (typesupport == 0) {
00122 CORBA::String_var name = topic_servant->get_name();
00123 ACE_ERROR((LM_ERROR,
00124 ACE_TEXT("(%P|%t) ERROR: ")
00125 ACE_TEXT("PublisherImpl::create_datawriter, ")
00126 ACE_TEXT("typesupport(topic_name=%C) is nil.\n"),
00127 name.in()));
00128 return DDS::DataWriter::_nil();
00129 }
00130
00131 DDS::DataWriter_var dw_obj = typesupport->create_datawriter();
00132
00133 DataWriterImpl* dw_servant =
00134 dynamic_cast <DataWriterImpl*>(dw_obj.in());
00135
00136 if (dw_servant == 0) {
00137 ACE_ERROR((LM_ERROR,
00138 ACE_TEXT("(%P|%t) ERROR: ")
00139 ACE_TEXT("PublisherImpl::create_datawriter, ")
00140 ACE_TEXT("servant is nil.\n")));
00141 return DDS::DataWriter::_nil();
00142 }
00143
00144 dw_servant->init(
00145 topic_servant,
00146 dw_qos,
00147 a_listener,
00148 mask,
00149 participant_,
00150 this);
00151
00152 if ((this->enabled_ == true) && (qos_.entity_factory.autoenable_created_entities)) {
00153 const DDS::ReturnCode_t ret = dw_servant->enable();
00154
00155 if (ret != DDS::RETCODE_OK) {
00156 ACE_ERROR((LM_WARNING,
00157 ACE_TEXT("(%P|%t) WARNING: ")
00158 ACE_TEXT("PublisherImpl::create_datawriter, ")
00159 ACE_TEXT("enable failed.\n")));
00160 return DDS::DataWriter::_nil();
00161 }
00162 } else {
00163 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, pi_lock_, 0);
00164 writers_not_enabled_.insert(rchandle_from(dw_servant));
00165 }
00166
00167 return DDS::DataWriter::_duplicate(dw_obj.in());
00168 }
00169
00170 DDS::ReturnCode_t
00171 PublisherImpl::delete_datawriter(DDS::DataWriter_ptr a_datawriter)
00172 {
00173 DataWriterImpl* dw_servant = dynamic_cast<DataWriterImpl*>(a_datawriter);
00174 if (!dw_servant) {
00175 ACE_ERROR((LM_ERROR,
00176 "(%P|%t) PublisherImpl::delete_datawriter - dynamic cast to DataWriterImpl failed\n"
00177 ));
00178 return DDS::RETCODE_ERROR;
00179 }
00180
00181
00182 dw_servant->prepare_to_delete();
00183
00184 {
00185 DDS::Publisher_var dw_publisher(dw_servant->get_publisher());
00186
00187 if (dw_publisher.in() != this) {
00188 RepoId id = dw_servant->get_publication_id();
00189 GuidConverter converter(id);
00190 ACE_ERROR((LM_ERROR,
00191 ACE_TEXT("(%P|%t) PublisherImpl::delete_datawriter: ")
00192 ACE_TEXT("the data writer %C doesn't ")
00193 ACE_TEXT("belong to this subscriber \n"),
00194 OPENDDS_STRING(converter).c_str()));
00195 return DDS::RETCODE_PRECONDITION_NOT_MET;
00196 }
00197 }
00198
00199 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
00200
00201
00202
00203 if (!dw_servant->persist_data() && DCPS_debug_level >= 2) {
00204 ACE_ERROR((LM_ERROR,
00205 ACE_TEXT("(%P|%t) ERROR: ")
00206 ACE_TEXT("PublisherImpl::delete_datawriter, ")
00207 ACE_TEXT("failed to make data durable.\n")));
00208 }
00209 #endif
00210
00211
00212 DDS::Time_t source_timestamp = time_value_to_time(ACE_OS::gettimeofday());
00213 dw_servant->unregister_instances(source_timestamp);
00214
00215
00216
00217 dw_servant->wait_pending();
00218 dw_servant->wait_control_pending();
00219
00220 RepoId publication_id = GUID_UNKNOWN;
00221 {
00222 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00223 guard,
00224 this->pi_lock_,
00225 DDS::RETCODE_ERROR);
00226
00227 publication_id = dw_servant->get_publication_id();
00228
00229 PublicationMap::iterator it = publication_map_.find(publication_id);
00230
00231 if (it == publication_map_.end()) {
00232 GuidConverter converter(publication_id);
00233 ACE_ERROR_RETURN((LM_ERROR,
00234 ACE_TEXT("(%P|%t) ERROR: ")
00235 ACE_TEXT("PublisherImpl::delete_datawriter, ")
00236 ACE_TEXT("datawriter %C not found.\n"),
00237 OPENDDS_STRING(converter).c_str()), DDS::RETCODE_ERROR);
00238 }
00239
00240
00241
00242
00243
00244
00245 DataWriterMap::iterator writ;
00246 DataWriterMap::iterator the_writ = datawriter_map_.end();
00247
00248 for (writ = datawriter_map_.begin();
00249 writ != datawriter_map_.end();
00250 ++writ) {
00251 if (writ->second == it->second) {
00252 the_writ = writ;
00253 break;
00254 }
00255 }
00256
00257 if (the_writ != datawriter_map_.end()) {
00258 datawriter_map_.erase(the_writ);
00259 }
00260
00261 publication_map_.erase(it);
00262
00263
00264
00265
00266
00267 ACE_GUARD_RETURN(reverse_lock_type, reverse_monitor, this->reverse_pi_lock_,
00268 DDS::RETCODE_ERROR);
00269
00270
00271 dw_servant->wait_pending();
00272
00273
00274
00275 dw_servant->remove_all_associations();
00276 dw_servant->cleanup();
00277 }
00278
00279 if (this->monitor_) {
00280 this->monitor_->report();
00281 }
00282
00283
00284 dw_servant->unregister_all();
00285
00286 RcHandle<DomainParticipantImpl> participant = this->participant_.lock();
00287
00288 Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
00289 if (!disco->remove_publication(
00290 this->domain_id_,
00291 participant->get_id(),
00292 publication_id)) {
00293 ACE_ERROR_RETURN((LM_ERROR,
00294 ACE_TEXT("(%P|%t) ERROR: ")
00295 ACE_TEXT("PublisherImpl::delete_datawriter, ")
00296 ACE_TEXT("publication not removed from discovery.\n")),
00297 DDS::RETCODE_ERROR);
00298 }
00299
00300 participant->remove_adjust_liveliness_timers();
00301
00302 return DDS::RETCODE_OK;
00303 }
00304
00305 DDS::DataWriter_ptr
00306 PublisherImpl::lookup_datawriter(const char* topic_name)
00307 {
00308 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00309 guard,
00310 this->pi_lock_,
00311 DDS::DataWriter::_nil());
00312
00313
00314
00315 DataWriterMap::iterator it = datawriter_map_.find(topic_name);
00316
00317 if (it == datawriter_map_.end()) {
00318 if (DCPS_debug_level >= 2) {
00319 ACE_DEBUG((LM_DEBUG,
00320 ACE_TEXT("(%P|%t) ")
00321 ACE_TEXT("PublisherImpl::lookup_datawriter, ")
00322 ACE_TEXT("The datawriter(topic_name=%C) is not found\n"),
00323 topic_name));
00324 }
00325
00326 return DDS::DataWriter::_nil();
00327
00328 } else {
00329 return DDS::DataWriter::_duplicate(it->second.in());
00330 }
00331 }
00332
00333 DDS::ReturnCode_t
00334 PublisherImpl::delete_contained_entities()
00335 {
00336
00337 set_deleted(true);
00338
00339 while (true) {
00340 PublicationId pub_id = GUID_UNKNOWN;
00341 DataWriterImpl_rch a_datawriter;
00342
00343 {
00344 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00345 guard,
00346 this->pi_lock_,
00347 DDS::RETCODE_ERROR);
00348
00349 if (datawriter_map_.empty()) {
00350 break;
00351 } else {
00352 a_datawriter = datawriter_map_.begin()->second;
00353 pub_id = a_datawriter->get_publication_id();
00354 }
00355 }
00356
00357 const DDS::ReturnCode_t ret = delete_datawriter(a_datawriter.in());
00358
00359 if (ret != DDS::RETCODE_OK) {
00360 GuidConverter converter(pub_id);
00361 ACE_ERROR_RETURN((LM_ERROR,
00362 ACE_TEXT("(%P|%t) ERROR: ")
00363 ACE_TEXT("PublisherImpl::")
00364 ACE_TEXT("delete_contained_entities: ")
00365 ACE_TEXT("failed to delete ")
00366 ACE_TEXT("datawriter %C.\n"),
00367 OPENDDS_STRING(converter).c_str()),ret);
00368 }
00369 }
00370
00371
00372 set_deleted(false);
00373
00374 return DDS::RETCODE_OK;
00375 }
00376
00377 DDS::ReturnCode_t
00378 PublisherImpl::set_qos(const DDS::PublisherQos & qos)
00379 {
00380
00381 OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00382
00383 if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
00384 if (qos_ == qos)
00385 return DDS::RETCODE_OK;
00386
00387
00388 if (!Qos_Helper::changeable(qos_, qos) && enabled_ == true) {
00389 return DDS::RETCODE_IMMUTABLE_POLICY;
00390
00391 } else {
00392 qos_ = qos;
00393
00394 DwIdToQosMap idToQosMap;
00395 {
00396 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00397 guard,
00398 this->pi_lock_,
00399 DDS::RETCODE_ERROR);
00400
00401 for (PublicationMap::iterator iter = publication_map_.begin();
00402 iter != publication_map_.end();
00403 ++iter) {
00404 DDS::DataWriterQos qos;
00405 iter->second->get_qos(qos);
00406 RepoId id = iter->second->get_publication_id();
00407 std::pair<DwIdToQosMap::iterator, bool> pair =
00408 idToQosMap.insert(DwIdToQosMap::value_type(id, qos));
00409
00410 if (pair.second == false) {
00411 GuidConverter converter(id);
00412 ACE_ERROR_RETURN((LM_ERROR,
00413 ACE_TEXT("(%P|%t) ")
00414 ACE_TEXT("PublisherImpl::set_qos: ")
00415 ACE_TEXT("insert id %C to DwIdToQosMap ")
00416 ACE_TEXT("failed.\n"),
00417 OPENDDS_STRING(converter).c_str()), DDS::RETCODE_ERROR);
00418 }
00419 }
00420 }
00421
00422 DwIdToQosMap::iterator iter = idToQosMap.begin();
00423
00424 while (iter != idToQosMap.end()) {
00425 Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
00426 bool status = false;
00427
00428 RcHandle<DomainParticipantImpl> participant = this->participant_.lock();
00429 if (participant)
00430 status = disco->update_publication_qos(
00431 participant->get_domain_id(),
00432 participant->get_id(),
00433 iter->first,
00434 iter->second,
00435 this->qos_);
00436
00437 if (!status) {
00438 ACE_ERROR_RETURN((LM_ERROR,
00439 ACE_TEXT("(%P|%t) PublisherImpl::set_qos, ")
00440 ACE_TEXT("failed. \n")),
00441 DDS::RETCODE_ERROR);
00442 }
00443
00444 ++iter;
00445 }
00446 }
00447
00448 return DDS::RETCODE_OK;
00449
00450 } else {
00451 return DDS::RETCODE_INCONSISTENT_POLICY;
00452 }
00453 }
00454
00455 DDS::ReturnCode_t
00456 PublisherImpl::get_qos(DDS::PublisherQos & qos)
00457 {
00458 qos = qos_;
00459 return DDS::RETCODE_OK;
00460 }
00461
00462 DDS::ReturnCode_t
00463 PublisherImpl::set_listener(DDS::PublisherListener_ptr a_listener,
00464 DDS::StatusMask mask)
00465 {
00466 listener_mask_ = mask;
00467
00468 listener_ = DDS::PublisherListener::_duplicate(a_listener);
00469 return DDS::RETCODE_OK;
00470 }
00471
00472 DDS::PublisherListener_ptr
00473 PublisherImpl::get_listener()
00474 {
00475 return DDS::PublisherListener::_duplicate(listener_.in());
00476 }
00477
00478 DDS::ReturnCode_t
00479 PublisherImpl::suspend_publications()
00480 {
00481 if (enabled_ == false) {
00482 ACE_ERROR_RETURN((LM_ERROR,
00483 ACE_TEXT("(%P|%t) ERROR: ")
00484 ACE_TEXT("PublisherImpl::suspend_publications, ")
00485 ACE_TEXT(" Entity is not enabled. \n")),
00486 DDS::RETCODE_NOT_ENABLED);
00487 }
00488
00489 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00490 guard,
00491 this->pi_lock_,
00492 DDS::RETCODE_ERROR);
00493 ++suspend_depth_count_;
00494 return DDS::RETCODE_OK;
00495 }
00496
00497 bool
00498 PublisherImpl::is_suspended() const
00499 {
00500 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00501 guard,
00502 this->pi_lock_,
00503 false);
00504 return suspend_depth_count_;
00505 }
00506
00507 DDS::ReturnCode_t
00508 PublisherImpl::resume_publications()
00509 {
00510 if (enabled_ == false) {
00511 ACE_ERROR_RETURN((LM_ERROR,
00512 ACE_TEXT("(%P|%t) ERROR: ")
00513 ACE_TEXT("PublisherImpl::resume_publications, ")
00514 ACE_TEXT(" Entity is not enabled. \n")),
00515 DDS::RETCODE_NOT_ENABLED);
00516 }
00517
00518 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00519 guard,
00520 this->pi_lock_,
00521 DDS::RETCODE_ERROR);
00522
00523 --suspend_depth_count_;
00524
00525 if (suspend_depth_count_ < 0) {
00526 suspend_depth_count_ = 0;
00527 return DDS::RETCODE_PRECONDITION_NOT_MET;
00528 }
00529
00530 if (suspend_depth_count_ == 0) {
00531
00532 for (PublicationMap::iterator it = this->publication_map_.begin();
00533 it != this->publication_map_.end(); ++it) {
00534 it->second->send_suspended_data();
00535 }
00536 }
00537
00538 return DDS::RETCODE_OK;
00539 }
00540
00541 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00542
00543 DDS::ReturnCode_t
00544 PublisherImpl::begin_coherent_changes()
00545 {
00546 if (enabled_ == false) {
00547 ACE_ERROR_RETURN((LM_ERROR,
00548 ACE_TEXT("(%P|%t) ERROR: PublisherImpl::begin_coherent_changes:")
00549 ACE_TEXT(" Publisher is not enabled!\n")),
00550 DDS::RETCODE_NOT_ENABLED);
00551 }
00552
00553 if (!qos_.presentation.coherent_access) {
00554 ACE_ERROR_RETURN((LM_ERROR,
00555 ACE_TEXT("(%P|%t) ERROR: PublisherImpl::begin_coherent_changes:")
00556 ACE_TEXT(" QoS policy does not support coherent access!\n")),
00557 DDS::RETCODE_ERROR);
00558 }
00559
00560 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00561 guard,
00562 this->pi_lock_,
00563 DDS::RETCODE_ERROR);
00564
00565 ++this->change_depth_;
00566
00567 if (qos_.presentation.access_scope == DDS::INSTANCE_PRESENTATION_QOS) {
00568
00569
00570 return DDS::RETCODE_OK;
00571 }
00572
00573
00574
00575 if (this->change_depth_ == 1) {
00576 for (PublicationMap::iterator it = this->publication_map_.begin();
00577 it != this->publication_map_.end(); ++it) {
00578 it->second->begin_coherent_changes();
00579 }
00580 }
00581
00582 return DDS::RETCODE_OK;
00583 }
00584
00585 DDS::ReturnCode_t
00586 PublisherImpl::end_coherent_changes()
00587 {
00588 if (enabled_ == false) {
00589 ACE_ERROR_RETURN((LM_ERROR,
00590 ACE_TEXT("(%P|%t) ERROR: PublisherImpl::end_coherent_changes:")
00591 ACE_TEXT(" Publisher is not enabled!\n")),
00592 DDS::RETCODE_NOT_ENABLED);
00593 }
00594
00595 if (!qos_.presentation.coherent_access) {
00596 ACE_ERROR_RETURN((LM_ERROR,
00597 ACE_TEXT("(%P|%t) ERROR: PublisherImpl::end_coherent_changes:")
00598 ACE_TEXT(" QoS policy does not support coherent access!\n")),
00599 DDS::RETCODE_ERROR);
00600 }
00601
00602 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00603 guard,
00604 this->pi_lock_,
00605 DDS::RETCODE_ERROR);
00606
00607 if (this->change_depth_ == 0) {
00608 ACE_ERROR_RETURN((LM_ERROR,
00609 ACE_TEXT("(%P|%t) ERROR: PublisherImpl::end_coherent_changes:")
00610 ACE_TEXT(" No matching call to begin_coherent_changes!\n")),
00611 DDS::RETCODE_PRECONDITION_NOT_MET);
00612 }
00613
00614 --this->change_depth_;
00615
00616 if (qos_.presentation.access_scope == DDS::INSTANCE_PRESENTATION_QOS) {
00617
00618
00619 return DDS::RETCODE_OK;
00620 }
00621
00622
00623
00624 if (this->change_depth_ == 0) {
00625 GroupCoherentSamples group_samples;
00626 for (PublicationMap::iterator it = this->publication_map_.begin();
00627 it != this->publication_map_.end(); ++it) {
00628
00629 if (it->second->coherent_samples_ == 0) {
00630 continue;
00631 }
00632
00633 std::pair<GroupCoherentSamples::iterator, bool> pair =
00634 group_samples.insert(GroupCoherentSamples::value_type(
00635 it->second->get_publication_id(),
00636 WriterCoherentSample(it->second->coherent_samples_,
00637 it->second->sequence_number_)));
00638
00639 if (pair.second == false) {
00640 ACE_ERROR_RETURN((LM_ERROR,
00641 ACE_TEXT("(%P|%t) ERROR: PublisherImpl::end_coherent_changes: ")
00642 ACE_TEXT("failed to insert to GroupCoherentSamples.\n")),
00643 DDS::RETCODE_ERROR);
00644 }
00645 }
00646
00647 for (PublicationMap::iterator it = this->publication_map_.begin();
00648 it != this->publication_map_.end(); ++it) {
00649 if (it->second->coherent_samples_ == 0) {
00650 continue;
00651 }
00652
00653 it->second->end_coherent_changes(group_samples);
00654 }
00655 }
00656
00657 return DDS::RETCODE_OK;
00658 }
00659
00660 #endif // OPENDDS_NO_OBJECT_MODEL_PROFILE
00661
00662 DDS::ReturnCode_t
00663 PublisherImpl::wait_for_acknowledgments(
00664 const DDS::Duration_t& max_wait)
00665 {
00666 if (enabled_ == false) {
00667 ACE_ERROR_RETURN((LM_ERROR,
00668 ACE_TEXT("(%P|%t) ERROR: PublisherImpl::wait_for_acknowledgments, ")
00669 ACE_TEXT("Entity is not enabled.\n")),
00670 DDS::RETCODE_NOT_ENABLED);
00671 }
00672
00673 typedef OPENDDS_MAP(DataWriterImpl*, DataWriterImpl::AckToken) DataWriterAckMap;
00674 DataWriterAckMap ack_writers;
00675 {
00676 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00677 guard,
00678 this->pi_lock_,
00679 DDS::RETCODE_ERROR);
00680
00681
00682 for (DataWriterMap::iterator it(this->datawriter_map_.begin());
00683 it != this->datawriter_map_.end(); ++it) {
00684 DataWriterImpl_rch writer = it->second;
00685 if (writer->qos_.reliability.kind != DDS::RELIABLE_RELIABILITY_QOS)
00686 continue;
00687 if (writer->should_ack()) {
00688 DataWriterImpl::AckToken token = writer->create_ack_token(max_wait);
00689
00690 std::pair<DataWriterAckMap::iterator, bool> pair =
00691 ack_writers.insert(DataWriterAckMap::value_type(writer.in(), token));
00692
00693 if (!pair.second) {
00694 ACE_ERROR_RETURN((LM_ERROR,
00695 ACE_TEXT("(%P|%t) ERROR: PublisherImpl::wait_for_acknowledgments, ")
00696 ACE_TEXT("Unable to insert AckToken into DataWriterAckMap!\n")),
00697 DDS::RETCODE_ERROR);
00698 }
00699 }
00700 }
00701 }
00702
00703 if (ack_writers.empty()) {
00704 if (DCPS_debug_level > 0) {
00705 ACE_DEBUG((LM_DEBUG,
00706 ACE_TEXT("(%P|%t) PublisherImpl::wait_for_acknowledgments() - ")
00707 ACE_TEXT("not blocking due to no writers requiring acks.\n")));
00708 }
00709
00710 return DDS::RETCODE_OK;
00711 }
00712
00713
00714 for (DataWriterAckMap::iterator it(ack_writers.begin());
00715 it != ack_writers.end(); ++it) {
00716 DataWriterImpl::AckToken token = it->second;
00717
00718 it->first->wait_for_specific_ack(token);
00719 }
00720
00721 return DDS::RETCODE_OK;
00722 }
00723
00724 DDS::DomainParticipant_ptr
00725 PublisherImpl::get_participant()
00726 {
00727 return participant_.lock()._retn();
00728 }
00729
00730 DDS::ReturnCode_t
00731 PublisherImpl::set_default_datawriter_qos(const DDS::DataWriterQos & qos)
00732 {
00733 if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
00734 default_datawriter_qos_ = qos;
00735 return DDS::RETCODE_OK;
00736
00737 } else {
00738 return DDS::RETCODE_INCONSISTENT_POLICY;
00739 }
00740 }
00741
00742 DDS::ReturnCode_t
00743 PublisherImpl::get_default_datawriter_qos(DDS::DataWriterQos & qos)
00744 {
00745 qos = default_datawriter_qos_;
00746 return DDS::RETCODE_OK;
00747 }
00748
00749 DDS::ReturnCode_t
00750 PublisherImpl::copy_from_topic_qos(DDS::DataWriterQos & a_datawriter_qos,
00751 const DDS::TopicQos & a_topic_qos)
00752 {
00753 if (Qos_Helper::copy_from_topic_qos(a_datawriter_qos, a_topic_qos)) {
00754 return DDS::RETCODE_OK;
00755 } else {
00756 return DDS::RETCODE_INCONSISTENT_POLICY;
00757 }
00758 }
00759
00760 DDS::ReturnCode_t
00761 PublisherImpl::enable()
00762 {
00763
00764
00765
00766
00767
00768
00769 if (this->is_enabled()) {
00770 return DDS::RETCODE_OK;
00771 }
00772
00773 RcHandle<DomainParticipantImpl> participant = this->participant_.lock();
00774 if (!participant || participant->is_enabled() == false) {
00775 return DDS::RETCODE_PRECONDITION_NOT_MET;
00776 }
00777
00778 if (this->monitor_) {
00779 this->monitor_->report();
00780 }
00781
00782 this->set_enabled();
00783
00784 if (qos_.entity_factory.autoenable_created_entities) {
00785 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, pi_lock_, DDS::RETCODE_ERROR);
00786 DataWriterSet writers;
00787 writers_not_enabled_.swap(writers);
00788 for (DataWriterSet::iterator it = writers.begin(); it != writers.end(); ++it) {
00789 (*it)->enable();
00790 }
00791 }
00792
00793 return DDS::RETCODE_OK;
00794 }
00795
00796 bool
00797 PublisherImpl::is_clean() const
00798 {
00799 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00800 guard,
00801 this->pi_lock_,
00802 false);
00803 return datawriter_map_.empty() && publication_map_.empty();
00804 }
00805
00806 DDS::ReturnCode_t
00807 PublisherImpl::writer_enabled(const char* topic_name,
00808 DataWriterImpl* writer_ptr)
00809 {
00810 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00811 guard,
00812 this->pi_lock_,
00813 DDS::RETCODE_ERROR);
00814 DataWriterImpl_rch writer = rchandle_from(writer_ptr);
00815 writers_not_enabled_.erase(writer);
00816
00817 datawriter_map_.insert(DataWriterMap::value_type(topic_name, writer));
00818
00819 const RepoId publication_id = writer->get_publication_id();
00820
00821 std::pair<PublicationMap::iterator, bool> pair =
00822 publication_map_.insert(PublicationMap::value_type(publication_id, writer));
00823
00824 if (pair.second == false) {
00825 GuidConverter converter(publication_id);
00826 ACE_ERROR_RETURN((LM_ERROR,
00827 ACE_TEXT("(%P|%t) ERROR: ")
00828 ACE_TEXT("PublisherImpl::writer_enabled: ")
00829 ACE_TEXT("insert publication %C failed.\n"),
00830 OPENDDS_STRING(converter).c_str()), DDS::RETCODE_ERROR);
00831 }
00832
00833 if (this->monitor_) {
00834 this->monitor_->report();
00835 }
00836
00837 return DDS::RETCODE_OK;
00838 }
00839
00840
00841 DDS::PublisherListener_ptr
00842 PublisherImpl::listener_for(DDS::StatusKind kind)
00843 {
00844
00845
00846
00847 RcHandle<DomainParticipantImpl> participant = this->participant_.lock();
00848
00849 if (!participant)
00850 return 0;
00851
00852 if (CORBA::is_nil(listener_.in()) || (listener_mask_ & kind) == 0) {
00853 return participant->listener_for(kind);
00854
00855 } else {
00856 return DDS::PublisherListener::_duplicate(listener_.in());
00857 }
00858 }
00859
00860 DDS::ReturnCode_t
00861 PublisherImpl::assert_liveliness_by_participant()
00862 {
00863 DDS::ReturnCode_t ret = DDS::RETCODE_OK;
00864
00865 for (DataWriterMap::iterator it(datawriter_map_.begin());
00866 it != datawriter_map_.end(); ++it) {
00867 DDS::ReturnCode_t dw_ret = it->second->assert_liveliness_by_participant();
00868
00869 if (dw_ret != DDS::RETCODE_OK) {
00870 ret = dw_ret;
00871 }
00872 }
00873
00874 return ret;
00875 }
00876
00877 ACE_Time_Value
00878 PublisherImpl::liveliness_check_interval(DDS::LivelinessQosPolicyKind kind)
00879 {
00880 ACE_Time_Value tv = ACE_Time_Value::max_time;
00881 for (DataWriterMap::iterator it(datawriter_map_.begin());
00882 it != datawriter_map_.end(); ++it) {
00883 tv = std::min (tv, it->second->liveliness_check_interval(kind));
00884 }
00885 return tv;
00886 }
00887
00888 bool
00889 PublisherImpl::participant_liveliness_activity_after(const ACE_Time_Value& tv)
00890 {
00891 for (DataWriterMap::iterator it(datawriter_map_.begin());
00892 it != datawriter_map_.end(); ++it) {
00893 if (it->second->participant_liveliness_activity_after(tv)) {
00894 return true;
00895 }
00896 }
00897 return false;
00898 }
00899
00900 void
00901 PublisherImpl::get_publication_ids(PublicationIdVec& pubs)
00902 {
00903 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00904 guard,
00905 this->pi_lock_,
00906 );
00907
00908 pubs.reserve(publication_map_.size());
00909 for (PublicationMap::iterator iter = publication_map_.begin();
00910 iter != publication_map_.end();
00911 ++iter) {
00912 pubs.push_back(iter->first);
00913 }
00914 }
00915
00916 RcHandle<EntityImpl>
00917 PublisherImpl::parent() const
00918 {
00919 return this->participant_.lock();
00920 }
00921
00922 bool
00923 PublisherImpl::validate_datawriter_qos(const DDS::DataWriterQos& qos,
00924 const DDS::DataWriterQos& default_qos,
00925 DDS::Topic_ptr a_topic,
00926 DDS::DataWriterQos& dw_qos)
00927 {
00928 if (CORBA::is_nil(a_topic)) {
00929 ACE_ERROR((LM_ERROR,
00930 ACE_TEXT("(%P|%t) ERROR: ")
00931 ACE_TEXT("PublisherImpl::create_datawriter, ")
00932 ACE_TEXT("topic is nil.\n")));
00933 return DDS::DataWriter::_nil();
00934 }
00935
00936 if (qos == DATAWRITER_QOS_DEFAULT) {
00937 dw_qos = default_qos;
00938
00939 } else if (qos == DATAWRITER_QOS_USE_TOPIC_QOS) {
00940 DDS::TopicQos topic_qos;
00941 a_topic->get_qos(topic_qos);
00942 dw_qos = default_qos;
00943
00944 Qos_Helper::copy_from_topic_qos(dw_qos, topic_qos);
00945
00946 } else {
00947 dw_qos = qos;
00948 }
00949
00950 OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(dw_qos, DDS::DataWriter::_nil());
00951 OPENDDS_NO_OWNERSHIP_STRENGTH_COMPATIBILITY_CHECK(dw_qos, DDS::DataWriter::_nil());
00952 OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(dw_qos, DDS::DataWriter::_nil());
00953 OPENDDS_NO_DURABILITY_SERVICE_COMPATIBILITY_CHECK(dw_qos, DDS::DataWriter::_nil());
00954 OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(dw_qos, DDS::DataWriter::_nil());
00955
00956 if (!Qos_Helper::valid(dw_qos)) {
00957 ACE_ERROR((LM_ERROR,
00958 ACE_TEXT("(%P|%t) ERROR: ")
00959 ACE_TEXT("PublisherImpl::create_datawriter, ")
00960 ACE_TEXT("invalid qos.\n")));
00961 return DDS::DataWriter::_nil();
00962 }
00963
00964 if (!Qos_Helper::consistent(dw_qos)) {
00965 ACE_ERROR((LM_ERROR,
00966 ACE_TEXT("(%P|%t) ERROR: ")
00967 ACE_TEXT("PublisherImpl::create_datawriter, ")
00968 ACE_TEXT("inconsistent qos.\n")));
00969 return DDS::DataWriter::_nil();
00970 }
00971 return true;
00972 }
00973
00974 OPENDDS_END_VERSIONED_NAMESPACE_DECL
00975
00976 }
00977 }