PublisherImpl.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
00009 #include "PublisherImpl.h"
00010 #include "FeatureDisabledQosCheck.h"
00011 #include "DataWriterImpl.h"
00012 #include "DomainParticipantImpl.h"
00013 #include "DataWriterImpl.h"
00014 #include "Service_Participant.h"
00015 #include "Qos_Helper.h"
00016 #include "GuidConverter.h"
00017 #include "Marked_Default_Qos.h"
00018 #include "TopicImpl.h"
00019 #include "MonitorFactory.h"
00020 #include "dds/DCPS/transport/framework/ReceivedDataSample.h"
00021 #include "dds/DCPS/transport/framework/DataLinkSet.h"
00022 #include "dds/DCPS/transport/framework/TransportImpl.h"
00023 #include "tao/debug.h"
00024 
00025 namespace OpenDDS {
00026 namespace DCPS {
00027 
00028 // Implementation skeleton constructor
00029 PublisherImpl::PublisherImpl(DDS::InstanceHandle_t      handle,
00030     RepoId                     id,
00031     const DDS::PublisherQos&   qos,
00032     DDS::PublisherListener_ptr a_listener,
00033     const DDS::StatusMask&     mask,
00034     DomainParticipantImpl*     participant)
00035 : handle_(handle),
00036   qos_(qos),
00037   default_datawriter_qos_(TheServiceParticipant->initial_DataWriterQos()),
00038   listener_mask_(mask),
00039   listener_(DDS::PublisherListener::_duplicate(a_listener)),
00040 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00041   change_depth_(0),
00042 #endif
00043   domain_id_(participant->get_domain_id()),
00044   participant_(participant),
00045   suspend_depth_count_(0),
00046   sequence_number_(),
00047   aggregation_period_start_(ACE_Time_Value::zero),
00048   reverse_pi_lock_(pi_lock_),
00049   monitor_(0),
00050   publisher_id_(id)
00051 {
00052   monitor_ = TheServiceParticipant->monitor_factory_->create_publisher_monitor(this);
00053 }
00054 
00055 // Implementation skeleton destructor
00056 PublisherImpl::~PublisherImpl()
00057 {
00058   //The datawriters should be deleted already before calling delete
00059   //publisher.
00060   if (!is_clean()) {
00061     ACE_ERROR((LM_ERROR,
00062         ACE_TEXT("(%P|%t) ERROR: ")
00063         ACE_TEXT("PublisherImpl::~PublisherImpl, ")
00064         ACE_TEXT("some datawriters still exist.\n")));
00065   }
00066 }
00067 
00068 DDS::InstanceHandle_t
00069 PublisherImpl::get_instance_handle()
00070 {
00071   return handle_;
00072 }
00073 
00074 bool
00075 PublisherImpl::contains_writer(DDS::InstanceHandle_t a_handle)
00076 {
00077   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00078       guard,
00079       this->pi_lock_,
00080       DDS::RETCODE_ERROR);
00081 
00082   for (DataWriterMap::iterator it(datawriter_map_.begin());
00083       it != datawriter_map_.end(); ++it) {
00084     if (a_handle == it->second->get_instance_handle()) {
00085       return true;
00086     }
00087   }
00088 
00089   return false;
00090 }
00091 
00092 DDS::DataWriter_ptr
00093 PublisherImpl::create_datawriter(
00094     DDS::Topic_ptr              a_topic,
00095     const DDS::DataWriterQos &  qos,
00096     DDS::DataWriterListener_ptr a_listener,
00097     DDS::StatusMask             mask)
00098 {
00099   DDS::DataWriterQos dw_qos;
00100 
00101   if (!validate_datawriter_qos(qos, default_datawriter_qos_, a_topic, dw_qos)) {
00102     return DDS::DataWriter::_nil();
00103   }
00104 
00105   TopicImpl* topic_servant = dynamic_cast<TopicImpl*>(a_topic);
00106 
00107   OpenDDS::DCPS::TypeSupport_ptr typesupport =
00108       topic_servant->get_type_support();
00109 
00110   if (typesupport == 0) {
00111     CORBA::String_var name = topic_servant->get_name();
00112     ACE_ERROR((LM_ERROR,
00113         ACE_TEXT("(%P|%t) ERROR: ")
00114         ACE_TEXT("PublisherImpl::create_datawriter, ")
00115         ACE_TEXT("typesupport(topic_name=%C) is nil.\n"),
00116         name.in()));
00117     return DDS::DataWriter::_nil();
00118   }
00119 
00120   DDS::DataWriter_var dw_obj = typesupport->create_datawriter();
00121 
00122   DataWriterImpl* dw_servant =
00123       dynamic_cast <DataWriterImpl*>(dw_obj.in());
00124 
00125   dw_servant->init(a_topic,
00126       topic_servant,
00127       dw_qos,
00128       a_listener,
00129       mask,
00130       participant_,
00131       this,
00132       dw_obj.in());
00133 
00134   if (this->enabled_ == true
00135       && qos_.entity_factory.autoenable_created_entities == 1) {
00136 
00137     DDS::ReturnCode_t ret = dw_servant->enable();
00138 
00139     if (ret != DDS::RETCODE_OK) {
00140       ACE_ERROR((LM_ERROR,
00141           ACE_TEXT("(%P|%t) ERROR: ")
00142           ACE_TEXT("PublisherImpl::create_datawriter, ")
00143           ACE_TEXT("enable failed.\n")));
00144       return DDS::DataWriter::_nil();
00145     }
00146   }
00147 
00148   return DDS::DataWriter::_duplicate(dw_obj.in());
00149 }
00150 
00151 DDS::ReturnCode_t
00152 PublisherImpl::delete_datawriter(DDS::DataWriter_ptr a_datawriter)
00153 {
00154   DataWriterImpl* dw_servant = dynamic_cast<DataWriterImpl*>(a_datawriter);
00155   if (dw_servant) {
00156     // marks entity as deleted and stops future associating
00157     dw_servant->prepare_to_delete();
00158   }
00159 
00160   if (!dw_servant) {
00161     ACE_ERROR((LM_ERROR,
00162               "(%P|%t) PublisherImpl::delete_datawriter - dynamic cast to DataWriterImpl failed\n"
00163     ));
00164     return DDS::RETCODE_ERROR;
00165   }
00166 
00167   {
00168     DDS::Publisher_var dw_publisher(dw_servant->get_publisher());
00169 
00170     if (dw_publisher.in() != this) {
00171       RepoId id = dw_servant->get_publication_id();
00172       GuidConverter converter(id);
00173       ACE_ERROR((LM_ERROR,
00174           ACE_TEXT("(%P|%t) PublisherImpl::delete_datawriter: ")
00175           ACE_TEXT("the data writer %C doesn't ")
00176           ACE_TEXT("belong to this subscriber \n"),
00177           OPENDDS_STRING(converter).c_str()));
00178       return DDS::RETCODE_PRECONDITION_NOT_MET;
00179     }
00180   }
00181 
00182 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
00183   // Trigger data to be persisted, i.e. made durable, if so
00184   // configured. This needs be called before unregister_instances
00185   // because unregister_instances may cause instance dispose.
00186   if (!dw_servant->persist_data() && DCPS_debug_level >= 2) {
00187     ACE_ERROR((LM_ERROR,
00188         ACE_TEXT("(%P|%t) ERROR: ")
00189         ACE_TEXT("PublisherImpl::delete_datawriter, ")
00190         ACE_TEXT("failed to make data durable.\n")));
00191   }
00192 #endif
00193 
00194   // Unregister all registered instances prior to deletion.
00195   DDS::Time_t source_timestamp = time_value_to_time(ACE_OS::gettimeofday());
00196   dw_servant->unregister_instances(source_timestamp);
00197 
00198   // Wait for any control messages to be transported during
00199   // unregistering of instances.
00200   dw_servant->wait_pending();
00201   dw_servant->wait_control_pending();
00202 
00203   CORBA::String_var topic_name = dw_servant->get_topic_name();
00204   RepoId publication_id  = GUID_UNKNOWN;
00205   {
00206     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00207         guard,
00208         this->pi_lock_,
00209         DDS::RETCODE_ERROR);
00210 
00211     publication_id = dw_servant->get_publication_id();
00212 
00213     PublicationMap::iterator it = publication_map_.find(publication_id);
00214 
00215     if (it == publication_map_.end()) {
00216       GuidConverter converter(publication_id);
00217       ACE_ERROR_RETURN((LM_ERROR,
00218           ACE_TEXT("(%P|%t) ERROR: ")
00219           ACE_TEXT("PublisherImpl::delete_datawriter, ")
00220           ACE_TEXT("datawriter %C not found.\n"),
00221           OPENDDS_STRING(converter).c_str()), DDS::RETCODE_ERROR);
00222     }
00223 
00224     // We can not erase the datawriter from datawriter map by the topic name
00225     // because the map might have multiple datawriters with the same topic
00226     // name.
00227     // Find the iterator to the datawriter in the datawriter map and erase
00228     // by the iterator.
00229     DataWriterMap::iterator writ;
00230     DataWriterMap::iterator the_writ = datawriter_map_.end();
00231 
00232     for (writ = datawriter_map_.begin();
00233         writ != datawriter_map_.end();
00234         ++writ) {
00235       if (writ->second == it->second) {
00236         the_writ = writ;
00237         break;
00238       }
00239     }
00240 
00241     if (the_writ != datawriter_map_.end()) {
00242       datawriter_map_.erase(the_writ);
00243     }
00244 
00245     publication_map_.erase(it);
00246 
00247     // Release pi_lock_ before making call to transport layer to avoid
00248     // some deadlock situations that threads acquire locks(PublisherImpl
00249     // pi_lock_, TransportClient reservation_lock and TransportImpl
00250     // lock_) in reverse order.
00251     ACE_GUARD_RETURN(reverse_lock_type, reverse_monitor, this->reverse_pi_lock_,
00252         DDS::RETCODE_ERROR);
00253     // Wait for pending samples to drain prior to removing associations
00254     // and unregistering the publication.
00255     dw_servant->wait_pending();
00256     // Call remove association before unregistering the datawriter
00257     // with the transport, otherwise some callbacks resulted from
00258     // remove_association may lost.
00259     dw_servant->remove_all_associations();
00260     dw_servant->cleanup();
00261   }
00262   // not just unregister but remove any pending writes/sends.
00263   dw_servant->unregister_all();
00264   Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
00265   if (!disco->remove_publication(
00266       this->domain_id_,
00267       this->participant_->get_id(),
00268       publication_id)) {
00269     ACE_ERROR_RETURN((LM_ERROR,
00270         ACE_TEXT("(%P|%t) ERROR: ")
00271         ACE_TEXT("PublisherImpl::delete_datawriter, ")
00272         ACE_TEXT("publication not removed from discovery.\n")),
00273         DDS::RETCODE_ERROR);
00274   }
00275   // Decrease ref count after the servant is removed from the maps.
00276   dw_servant->_remove_ref();
00277 
00278   participant_->remove_adjust_liveliness_timers();
00279 
00280   return DDS::RETCODE_OK;
00281 }
00282 
00283 DDS::DataWriter_ptr
00284 PublisherImpl::lookup_datawriter(const char* topic_name)
00285 {
00286   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00287       guard,
00288       this->pi_lock_,
00289       DDS::DataWriter::_nil());
00290 
00291   // If multiple entries whose key is "topic_name" then which one is
00292   // returned ? Spec does not limit which one should give.
00293   DataWriterMap::iterator it = datawriter_map_.find(topic_name);
00294 
00295   if (it == datawriter_map_.end()) {
00296     if (DCPS_debug_level >= 2) {
00297       ACE_DEBUG((LM_DEBUG,
00298           ACE_TEXT("(%P|%t) ")
00299           ACE_TEXT("PublisherImpl::lookup_datawriter, ")
00300           ACE_TEXT("The datawriter(topic_name=%C) is not found\n"),
00301           topic_name));
00302     }
00303 
00304     return DDS::DataWriter::_nil();
00305 
00306   } else {
00307     return DDS::DataWriter::_duplicate(it->second);
00308   }
00309 }
00310 
00311 DDS::ReturnCode_t
00312 PublisherImpl::delete_contained_entities()
00313 {
00314   // mark that the entity is being deleted
00315   set_deleted(true);
00316 
00317   while (true) {
00318     PublicationId pub_id = GUID_UNKNOWN;
00319     DataWriterImpl* a_datawriter;
00320 
00321     {
00322       ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00323           guard,
00324           this->pi_lock_,
00325           DDS::RETCODE_ERROR);
00326 
00327       if (datawriter_map_.empty()) {
00328         break;
00329       } else {
00330         a_datawriter = datawriter_map_.begin()->second;
00331         pub_id = a_datawriter->get_publication_id();
00332       }
00333     }
00334 
00335     DDS::ReturnCode_t ret = delete_datawriter(a_datawriter);
00336 
00337     if (ret != DDS::RETCODE_OK) {
00338       GuidConverter converter(pub_id);
00339       ACE_ERROR_RETURN((LM_ERROR,
00340           ACE_TEXT("(%P|%t) ERROR: ")
00341           ACE_TEXT("PublisherImpl::")
00342           ACE_TEXT("delete_contained_entities: ")
00343           ACE_TEXT("failed to delete ")
00344           ACE_TEXT("datawriter %C.\n"),
00345           OPENDDS_STRING(converter).c_str()),ret);
00346     }
00347   }
00348 
00349   // the publisher can now start creating new publications
00350   set_deleted(false);
00351 
00352   return DDS::RETCODE_OK;
00353 }
00354 
00355 DDS::ReturnCode_t
00356 PublisherImpl::set_qos(const DDS::PublisherQos & qos)
00357 {
00358 
00359   OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00360 
00361   if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
00362     if (qos_ == qos)
00363       return DDS::RETCODE_OK;
00364 
00365     // for the not changeable qos, it can be changed before enable
00366     if (!Qos_Helper::changeable(qos_, qos) && enabled_ == true) {
00367       return DDS::RETCODE_IMMUTABLE_POLICY;
00368 
00369     } else {
00370       qos_ = qos;
00371 
00372       DwIdToQosMap idToQosMap;
00373       {
00374         ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00375             guard,
00376             this->pi_lock_,
00377             DDS::RETCODE_ERROR);
00378 
00379         for (PublicationMap::iterator iter = publication_map_.begin();
00380             iter != publication_map_.end();
00381             ++iter) {
00382           DDS::DataWriterQos qos;
00383           iter->second->get_qos(qos);
00384           RepoId id = iter->second->get_publication_id();
00385           std::pair<DwIdToQosMap::iterator, bool> pair =
00386               idToQosMap.insert(DwIdToQosMap::value_type(id, qos));
00387 
00388           if (pair.second == false) {
00389             GuidConverter converter(id);
00390             ACE_ERROR_RETURN((LM_ERROR,
00391                 ACE_TEXT("(%P|%t) ")
00392                 ACE_TEXT("PublisherImpl::set_qos: ")
00393                 ACE_TEXT("insert id %d to DwIdToQosMap ")
00394                 ACE_TEXT("failed.\n"),
00395                 OPENDDS_STRING(converter).c_str()), DDS::RETCODE_ERROR);
00396           }
00397         }
00398       }
00399 
00400       DwIdToQosMap::iterator iter = idToQosMap.begin();
00401 
00402       while (iter != idToQosMap.end()) {
00403         Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
00404         const bool status
00405         = disco->update_publication_qos(
00406             participant_->get_domain_id(),
00407             participant_->get_id(),
00408             iter->first,
00409             iter->second,
00410             this->qos_);
00411 
00412         if (!status) {
00413           ACE_ERROR_RETURN((LM_ERROR,
00414               ACE_TEXT("(%P|%t) PublisherImpl::set_qos, ")
00415               ACE_TEXT("failed. \n")),
00416               DDS::RETCODE_ERROR);
00417         }
00418 
00419         ++iter;
00420       }
00421     }
00422 
00423     return DDS::RETCODE_OK;
00424 
00425   } else {
00426     return DDS::RETCODE_INCONSISTENT_POLICY;
00427   }
00428 }
00429 
00430 DDS::ReturnCode_t
00431 PublisherImpl::get_qos(DDS::PublisherQos & qos)
00432 {
00433   qos = qos_;
00434   return DDS::RETCODE_OK;
00435 }
00436 
00437 DDS::ReturnCode_t
00438 PublisherImpl::set_listener(DDS::PublisherListener_ptr a_listener,
00439     DDS::StatusMask            mask)
00440 {
00441   listener_mask_ = mask;
00442   //note: OK to duplicate  a nil object ref
00443   listener_ = DDS::PublisherListener::_duplicate(a_listener);
00444   return DDS::RETCODE_OK;
00445 }
00446 
00447 DDS::PublisherListener_ptr
00448 PublisherImpl::get_listener()
00449 {
00450   return DDS::PublisherListener::_duplicate(listener_.in());
00451 }
00452 
00453 DDS::ReturnCode_t
00454 PublisherImpl::suspend_publications()
00455 {
00456   if (enabled_ == false) {
00457     ACE_ERROR_RETURN((LM_ERROR,
00458         ACE_TEXT("(%P|%t) ERROR: ")
00459         ACE_TEXT("PublisherImpl::suspend_publications, ")
00460         ACE_TEXT(" Entity is not enabled. \n")),
00461         DDS::RETCODE_NOT_ENABLED);
00462   }
00463 
00464   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00465       guard,
00466       this->pi_lock_,
00467       DDS::RETCODE_ERROR);
00468   ++suspend_depth_count_;
00469   return DDS::RETCODE_OK;
00470 }
00471 
00472 bool
00473 PublisherImpl::is_suspended() const
00474 {
00475   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00476       guard,
00477       this->pi_lock_,
00478       false);
00479   return suspend_depth_count_;
00480 }
00481 
00482 DDS::ReturnCode_t
00483 PublisherImpl::resume_publications()
00484 {
00485   if (enabled_ == false) {
00486     ACE_ERROR_RETURN((LM_ERROR,
00487         ACE_TEXT("(%P|%t) ERROR: ")
00488         ACE_TEXT("PublisherImpl::resume_publications, ")
00489         ACE_TEXT(" Entity is not enabled. \n")),
00490         DDS::RETCODE_NOT_ENABLED);
00491   }
00492 
00493   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00494       guard,
00495       this->pi_lock_,
00496       DDS::RETCODE_ERROR);
00497 
00498   --suspend_depth_count_;
00499 
00500   if (suspend_depth_count_ < 0) {
00501     suspend_depth_count_ = 0;
00502     return DDS::RETCODE_PRECONDITION_NOT_MET;
00503   }
00504 
00505   if (suspend_depth_count_ == 0) {
00506 
00507     for (PublicationMap::iterator it = this->publication_map_.begin();
00508         it != this->publication_map_.end(); ++it) {
00509       it->second->send_suspended_data();
00510     }
00511   }
00512 
00513   return DDS::RETCODE_OK;
00514 }
00515 
00516 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00517 
00518 DDS::ReturnCode_t
00519 PublisherImpl::begin_coherent_changes()
00520 {
00521   if (enabled_ == false) {
00522     ACE_ERROR_RETURN((LM_ERROR,
00523         ACE_TEXT("(%P|%t) ERROR: PublisherImpl::begin_coherent_changes:")
00524         ACE_TEXT(" Publisher is not enabled!\n")),
00525         DDS::RETCODE_NOT_ENABLED);
00526   }
00527 
00528   if (!qos_.presentation.coherent_access) {
00529     ACE_ERROR_RETURN((LM_ERROR,
00530         ACE_TEXT("(%P|%t) ERROR: PublisherImpl::begin_coherent_changes:")
00531         ACE_TEXT(" QoS policy does not support coherent access!\n")),
00532         DDS::RETCODE_ERROR);
00533   }
00534 
00535   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00536       guard,
00537       this->pi_lock_,
00538       DDS::RETCODE_ERROR);
00539 
00540   ++this->change_depth_;
00541 
00542   if (qos_.presentation.access_scope == DDS::INSTANCE_PRESENTATION_QOS) {
00543     // INSTANCE access scope essentially behaves
00544     // as a no-op. (see: 7.1.3.6)
00545     return DDS::RETCODE_OK;
00546   }
00547 
00548   // We should only notify publications on the first
00549   // and last change to the current change set:
00550   if (this->change_depth_ == 1) {
00551     for (PublicationMap::iterator it = this->publication_map_.begin();
00552         it != this->publication_map_.end(); ++it) {
00553       it->second->begin_coherent_changes();
00554     }
00555   }
00556 
00557   return DDS::RETCODE_OK;
00558 }
00559 
00560 DDS::ReturnCode_t
00561 PublisherImpl::end_coherent_changes()
00562 {
00563   if (enabled_ == false) {
00564     ACE_ERROR_RETURN((LM_ERROR,
00565         ACE_TEXT("(%P|%t) ERROR: PublisherImpl::end_coherent_changes:")
00566         ACE_TEXT(" Publisher is not enabled!\n")),
00567         DDS::RETCODE_NOT_ENABLED);
00568   }
00569 
00570   if (!qos_.presentation.coherent_access) {
00571     ACE_ERROR_RETURN((LM_ERROR,
00572         ACE_TEXT("(%P|%t) ERROR: PublisherImpl::end_coherent_changes:")
00573         ACE_TEXT(" QoS policy does not support coherent access!\n")),
00574         DDS::RETCODE_ERROR);
00575   }
00576 
00577   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00578       guard,
00579       this->pi_lock_,
00580       DDS::RETCODE_ERROR);
00581 
00582   if (this->change_depth_ == 0) {
00583     ACE_ERROR_RETURN((LM_ERROR,
00584         ACE_TEXT("(%P|%t) ERROR: PublisherImpl::end_coherent_changes:")
00585         ACE_TEXT(" No matching call to begin_coherent_changes!\n")),
00586         DDS::RETCODE_PRECONDITION_NOT_MET);
00587   }
00588 
00589   --this->change_depth_;
00590 
00591   if (qos_.presentation.access_scope == DDS::INSTANCE_PRESENTATION_QOS) {
00592     // INSTANCE access scope essentially behaves
00593     // as a no-op. (see: 7.1.3.6)
00594     return DDS::RETCODE_OK;
00595   }
00596 
00597   // We should only notify publications on the first
00598   // and last change to the current change set:
00599   if (this->change_depth_ == 0) {
00600     GroupCoherentSamples group_samples;
00601     for (PublicationMap::iterator it = this->publication_map_.begin();
00602         it != this->publication_map_.end(); ++it) {
00603 
00604       if (it->second->coherent_samples_ == 0) {
00605         continue;
00606       }
00607 
00608       std::pair<GroupCoherentSamples::iterator, bool> pair =
00609           group_samples.insert(GroupCoherentSamples::value_type(
00610               it->second->get_publication_id(),
00611               WriterCoherentSample(it->second->coherent_samples_,
00612                   it->second->sequence_number_)));
00613 
00614       if (pair.second == false) {
00615         ACE_ERROR_RETURN((LM_ERROR,
00616             ACE_TEXT("(%P|%t) ERROR: PublisherImpl::end_coherent_changes: ")
00617             ACE_TEXT("failed to insert to GroupCoherentSamples.\n")),
00618             DDS::RETCODE_ERROR);
00619       }
00620     }
00621 
00622     for (PublicationMap::iterator it = this->publication_map_.begin();
00623         it != this->publication_map_.end(); ++it) {
00624       if (it->second->coherent_samples_ == 0) {
00625         continue;
00626       }
00627 
00628       it->second->end_coherent_changes(group_samples);
00629     }
00630   }
00631 
00632   return DDS::RETCODE_OK;
00633 }
00634 
00635 #endif // OPENDDS_NO_OBJECT_MODEL_PROFILE
00636 
00637 DDS::ReturnCode_t
00638 PublisherImpl::wait_for_acknowledgments(
00639     const DDS::Duration_t& max_wait)
00640 {
00641   if (enabled_ == false) {
00642     ACE_ERROR_RETURN((LM_ERROR,
00643         ACE_TEXT("(%P|%t) ERROR: PublisherImpl::wait_for_acknowledgments, ")
00644         ACE_TEXT("Entity is not enabled.\n")),
00645         DDS::RETCODE_NOT_ENABLED);
00646   }
00647 
00648   typedef OPENDDS_MAP(DataWriterImpl*, DataWriterImpl::AckToken) DataWriterAckMap;
00649   DataWriterAckMap ack_writers;
00650   {
00651     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00652         guard,
00653         this->pi_lock_,
00654         DDS::RETCODE_ERROR);
00655 
00656     // Collect writers to request acks
00657     for (DataWriterMap::iterator it(this->datawriter_map_.begin());
00658         it != this->datawriter_map_.end(); ++it) {
00659       DataWriterImpl* writer = it->second;
00660       if (writer->qos_.reliability.kind != DDS::RELIABLE_RELIABILITY_QOS)
00661         continue;
00662       if (writer->should_ack()) {
00663         DataWriterImpl::AckToken token = writer->create_ack_token(max_wait);
00664 
00665         std::pair<DataWriterAckMap::iterator, bool> pair =
00666             ack_writers.insert(DataWriterAckMap::value_type(writer, token));
00667 
00668         if (!pair.second) {
00669           ACE_ERROR_RETURN((LM_ERROR,
00670               ACE_TEXT("(%P|%t) ERROR: PublisherImpl::wait_for_acknowledgments, ")
00671               ACE_TEXT("Unable to insert AckToken into DataWriterAckMap!\n")),
00672               DDS::RETCODE_ERROR);
00673         }
00674       }
00675     }
00676   }
00677 
00678   if (ack_writers.empty()) {
00679     if (DCPS_debug_level > 0) {
00680       ACE_DEBUG((LM_DEBUG,
00681           ACE_TEXT("(%P|%t) PublisherImpl::wait_for_acknowledgments() - ")
00682           ACE_TEXT("not blocking due to no writers requiring acks.\n")));
00683     }
00684 
00685     return DDS::RETCODE_OK;
00686   }
00687 
00688   // Wait for ack responses from all associated readers
00689   for (DataWriterAckMap::iterator it(ack_writers.begin());
00690       it != ack_writers.end(); ++it) {
00691     DataWriterImpl::AckToken token = it->second;
00692 
00693     it->first->wait_for_specific_ack(token);
00694   }
00695 
00696   return DDS::RETCODE_OK;
00697 }
00698 
00699 DDS::DomainParticipant_ptr
00700 PublisherImpl::get_participant()
00701 {
00702   return DDS::DomainParticipant::_duplicate(participant_);
00703 }
00704 
00705 DDS::ReturnCode_t
00706 PublisherImpl::set_default_datawriter_qos(const DDS::DataWriterQos & qos)
00707 {
00708   if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
00709     default_datawriter_qos_ = qos;
00710     return DDS::RETCODE_OK;
00711 
00712   } else {
00713     return DDS::RETCODE_INCONSISTENT_POLICY;
00714   }
00715 }
00716 
00717 DDS::ReturnCode_t
00718 PublisherImpl::get_default_datawriter_qos(DDS::DataWriterQos & qos)
00719 {
00720   qos = default_datawriter_qos_;
00721   return DDS::RETCODE_OK;
00722 }
00723 
00724 DDS::ReturnCode_t
00725 PublisherImpl::copy_from_topic_qos(DDS::DataWriterQos &  a_datawriter_qos,
00726     const DDS::TopicQos & a_topic_qos)
00727 {
00728   if (Qos_Helper::copy_from_topic_qos(a_datawriter_qos, a_topic_qos)) {
00729     return DDS::RETCODE_OK;
00730   } else {
00731     return DDS::RETCODE_INCONSISTENT_POLICY;
00732   }
00733 }
00734 
00735 DDS::ReturnCode_t
00736 PublisherImpl::enable()
00737 {
00738   //According spec:
00739   // - Calling enable on an already enabled Entity returns OK and has no
00740   // effect.
00741   // - Calling enable on an Entity whose factory is not enabled will fail
00742   // and return PRECONDITION_NOT_MET.
00743 
00744   if (this->is_enabled()) {
00745     return DDS::RETCODE_OK;
00746   }
00747 
00748   if (this->participant_->is_enabled() == false) {
00749     return DDS::RETCODE_PRECONDITION_NOT_MET;
00750   }
00751 
00752   if (this->monitor_) {
00753     this->monitor_->report();
00754   }
00755 
00756   this->set_enabled();
00757   return DDS::RETCODE_OK;
00758 }
00759 
00760 bool
00761 PublisherImpl::is_clean() const
00762 {
00763   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00764       guard,
00765       this->pi_lock_,
00766       false);
00767   return datawriter_map_.empty() && publication_map_.empty();
00768 }
00769 
00770 DDS::ReturnCode_t
00771 PublisherImpl::writer_enabled(const char*     topic_name,
00772     DataWriterImpl* writer)
00773 {
00774   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00775       guard,
00776       this->pi_lock_,
00777       DDS::RETCODE_ERROR);
00778 
00779   datawriter_map_.insert(DataWriterMap::value_type(topic_name, writer));
00780 
00781   const RepoId publication_id = writer->get_publication_id();
00782 
00783   std::pair<PublicationMap::iterator, bool> pair =
00784       publication_map_.insert(PublicationMap::value_type(publication_id, writer));
00785 
00786   if (pair.second == false) {
00787     GuidConverter converter(publication_id);
00788     ACE_ERROR_RETURN((LM_ERROR,
00789         ACE_TEXT("(%P|%t) ERROR: ")
00790         ACE_TEXT("PublisherImpl::writer_enabled: ")
00791         ACE_TEXT("insert publication %C failed.\n"),
00792         OPENDDS_STRING(converter).c_str()), DDS::RETCODE_ERROR);
00793   }
00794 
00795   // Increase ref count when the servant is added to the
00796   // datawriter/publication map.
00797   writer->_add_ref();
00798 
00799   return DDS::RETCODE_OK;
00800 }
00801 
00802 
00803 DDS::PublisherListener_ptr
00804 PublisherImpl::listener_for(DDS::StatusKind kind)
00805 {
00806   // per 2.1.4.3.1 Listener Access to Plain Communication Status
00807   // use this entities factory if listener is mask not enabled
00808   // for this kind.
00809   if (CORBA::is_nil(listener_.in()) || (listener_mask_ & kind) == 0) {
00810     return participant_->listener_for(kind);
00811 
00812   } else {
00813     return DDS::PublisherListener::_duplicate(listener_.in());
00814   }
00815 }
00816 
00817 DDS::ReturnCode_t
00818 PublisherImpl::assert_liveliness_by_participant()
00819 {
00820   DDS::ReturnCode_t ret = DDS::RETCODE_OK;
00821 
00822   for (DataWriterMap::iterator it(datawriter_map_.begin());
00823       it != datawriter_map_.end(); ++it) {
00824     DDS::ReturnCode_t dw_ret = it->second->assert_liveliness_by_participant();
00825 
00826     if (dw_ret != DDS::RETCODE_OK) {
00827       ret = dw_ret;
00828     }
00829   }
00830 
00831   return ret;
00832 }
00833 
00834 ACE_Time_Value
00835 PublisherImpl::liveliness_check_interval(DDS::LivelinessQosPolicyKind kind)
00836 {
00837   ACE_Time_Value tv = ACE_Time_Value::max_time;
00838   for (DataWriterMap::iterator it(datawriter_map_.begin());
00839       it != datawriter_map_.end(); ++it) {
00840     tv = std::min (tv, it->second->liveliness_check_interval(kind));
00841   }
00842   return tv;
00843 }
00844 
00845 bool
00846 PublisherImpl::participant_liveliness_activity_after(const ACE_Time_Value& tv)
00847 {
00848   for (DataWriterMap::iterator it(datawriter_map_.begin());
00849       it != datawriter_map_.end(); ++it) {
00850     if (it->second->participant_liveliness_activity_after(tv)) {
00851       return true;
00852     }
00853   }
00854   return false;
00855 }
00856 
00857 void
00858 PublisherImpl::get_publication_ids(PublicationIdVec& pubs)
00859 {
00860   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00861       guard,
00862       this->pi_lock_,
00863   );
00864 
00865   pubs.reserve(publication_map_.size());
00866   for (PublicationMap::iterator iter = publication_map_.begin();
00867       iter != publication_map_.end();
00868       ++iter) {
00869     pubs.push_back(iter->first);
00870   }
00871 }
00872 
00873 EntityImpl*
00874 PublisherImpl::parent() const
00875 {
00876   return this->participant_;
00877 }
00878 
00879 bool
00880 PublisherImpl::validate_datawriter_qos(const DDS::DataWriterQos& qos,
00881     const DDS::DataWriterQos& default_qos,
00882     DDS::Topic_ptr            a_topic,
00883     DDS::DataWriterQos&       dw_qos)
00884 {
00885   if (CORBA::is_nil(a_topic)) {
00886     ACE_ERROR((LM_ERROR,
00887         ACE_TEXT("(%P|%t) ERROR: ")
00888         ACE_TEXT("PublisherImpl::create_datawriter, ")
00889         ACE_TEXT("topic is nil.\n")));
00890     return DDS::DataWriter::_nil();
00891   }
00892 
00893   if (qos == DATAWRITER_QOS_DEFAULT) {
00894     dw_qos = default_qos;
00895 
00896   } else if (qos == DATAWRITER_QOS_USE_TOPIC_QOS) {
00897     DDS::TopicQos topic_qos;
00898     a_topic->get_qos(topic_qos);
00899     dw_qos = default_qos;
00900 
00901     Qos_Helper::copy_from_topic_qos(dw_qos, topic_qos);
00902 
00903   } else {
00904     dw_qos = qos;
00905   }
00906 
00907   OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(dw_qos, DDS::DataWriter::_nil());
00908   OPENDDS_NO_OWNERSHIP_STRENGTH_COMPATIBILITY_CHECK(dw_qos, DDS::DataWriter::_nil());
00909   OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(dw_qos, DDS::DataWriter::_nil());
00910   OPENDDS_NO_DURABILITY_SERVICE_COMPATIBILITY_CHECK(dw_qos, DDS::DataWriter::_nil());
00911   OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(dw_qos, DDS::DataWriter::_nil());
00912 
00913   if (!Qos_Helper::valid(dw_qos)) {
00914     ACE_ERROR((LM_ERROR,
00915         ACE_TEXT("(%P|%t) ERROR: ")
00916         ACE_TEXT("PublisherImpl::create_datawriter, ")
00917         ACE_TEXT("invalid qos.\n")));
00918     return DDS::DataWriter::_nil();
00919   }
00920 
00921   if (!Qos_Helper::consistent(dw_qos)) {
00922     ACE_ERROR((LM_ERROR,
00923         ACE_TEXT("(%P|%t) ERROR: ")
00924         ACE_TEXT("PublisherImpl::create_datawriter, ")
00925         ACE_TEXT("inconsistent qos.\n")));
00926     return DDS::DataWriter::_nil();
00927   }
00928   return true;
00929 }
00930 
00931 
00932 } // namespace DCPS
00933 } // namespace OpenDDS

Generated on Fri Feb 12 20:05:24 2016 for OpenDDS by  doxygen 1.4.7