PublisherImpl.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
00009 #include "PublisherImpl.h"
00010 #include "FeatureDisabledQosCheck.h"
00011 #include "DataWriterImpl.h"
00012 #include "DomainParticipantImpl.h"
00013 #include "DataWriterImpl.h"
00014 #include "Service_Participant.h"
00015 #include "Qos_Helper.h"
00016 #include "GuidConverter.h"
00017 #include "Marked_Default_Qos.h"
00018 #include "TopicImpl.h"
00019 #include "MonitorFactory.h"
00020 #include "dds/DCPS/transport/framework/ReceivedDataSample.h"
00021 #include "dds/DCPS/transport/framework/DataLinkSet.h"
00022 #include "dds/DCPS/transport/framework/TransportImpl.h"
00023 #include "tao/debug.h"
00024 
00025 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00026 
00027 namespace OpenDDS {
00028 namespace DCPS {
00029 
00030 PublisherImpl::PublisherImpl(DDS::InstanceHandle_t      handle,
00031     RepoId                     id,
00032     const DDS::PublisherQos&   qos,
00033     DDS::PublisherListener_ptr a_listener,
00034     const DDS::StatusMask&     mask,
00035     DomainParticipantImpl*     participant)
00036 : handle_(handle),
00037   qos_(qos),
00038   default_datawriter_qos_(TheServiceParticipant->initial_DataWriterQos()),
00039   listener_mask_(mask),
00040   listener_(DDS::PublisherListener::_duplicate(a_listener)),
00041 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00042   change_depth_(0),
00043 #endif
00044   domain_id_(participant->get_domain_id()),
00045   participant_(*participant),
00046   suspend_depth_count_(0),
00047   sequence_number_(),
00048   aggregation_period_start_(ACE_Time_Value::zero),
00049   reverse_pi_lock_(pi_lock_),
00050   monitor_(0),
00051   publisher_id_(id)
00052 {
00053   monitor_ = TheServiceParticipant->monitor_factory_->create_publisher_monitor(this);
00054 }
00055 
00056 PublisherImpl::~PublisherImpl()
00057 {
00058   //The datawriters should be deleted already before calling delete
00059   //publisher.
00060   if (!is_clean()) {
00061     ACE_ERROR((LM_ERROR,
00062         ACE_TEXT("(%P|%t) ERROR: ")
00063         ACE_TEXT("PublisherImpl::~PublisherImpl, ")
00064         ACE_TEXT("%B datawriters and %B publications still exist.\n"),
00065         datawriter_map_.size(), publication_map_.size()));
00066   }
00067 }
00068 
00069 DDS::InstanceHandle_t
00070 PublisherImpl::get_instance_handle()
00071 {
00072   return handle_;
00073 }
00074 
00075 bool
00076 PublisherImpl::contains_writer(DDS::InstanceHandle_t a_handle)
00077 {
00078   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00079       guard,
00080       this->pi_lock_,
00081       DDS::RETCODE_ERROR);
00082 
00083   for (DataWriterMap::iterator it(datawriter_map_.begin());
00084       it != datawriter_map_.end(); ++it) {
00085     if (a_handle == it->second->get_instance_handle()) {
00086       return true;
00087     }
00088   }
00089 
00090   return false;
00091 }
00092 
00093 DDS::DataWriter_ptr
00094 PublisherImpl::create_datawriter(
00095     DDS::Topic_ptr              a_topic,
00096     const DDS::DataWriterQos &  qos,
00097     DDS::DataWriterListener_ptr a_listener,
00098     DDS::StatusMask             mask)
00099 {
00100   DDS::DataWriterQos dw_qos;
00101 
00102   if (!validate_datawriter_qos(qos, default_datawriter_qos_, a_topic, dw_qos)) {
00103     return DDS::DataWriter::_nil();
00104   }
00105 
00106   TopicImpl* topic_servant = dynamic_cast<TopicImpl*>(a_topic);
00107 
00108   if (!topic_servant) {
00109     CORBA::String_var name = a_topic->get_name();
00110     ACE_ERROR((LM_ERROR,
00111       ACE_TEXT("(%P|%t) ERROR: ")
00112       ACE_TEXT("PublisherImpl::create_datawriter, ")
00113       ACE_TEXT("topic_servant(topic_name=%C) is nil.\n"),
00114       name.in()));
00115     return 0;
00116   }
00117 
00118   OpenDDS::DCPS::TypeSupport_ptr typesupport =
00119       topic_servant->get_type_support();
00120 
00121   if (typesupport == 0) {
00122     CORBA::String_var name = topic_servant->get_name();
00123     ACE_ERROR((LM_ERROR,
00124         ACE_TEXT("(%P|%t) ERROR: ")
00125         ACE_TEXT("PublisherImpl::create_datawriter, ")
00126         ACE_TEXT("typesupport(topic_name=%C) is nil.\n"),
00127         name.in()));
00128     return DDS::DataWriter::_nil();
00129   }
00130 
00131   DDS::DataWriter_var dw_obj = typesupport->create_datawriter();
00132 
00133   DataWriterImpl* dw_servant =
00134       dynamic_cast <DataWriterImpl*>(dw_obj.in());
00135 
00136   if (dw_servant == 0) {
00137     ACE_ERROR((LM_ERROR,
00138         ACE_TEXT("(%P|%t) ERROR: ")
00139         ACE_TEXT("PublisherImpl::create_datawriter, ")
00140         ACE_TEXT("servant is nil.\n")));
00141     return DDS::DataWriter::_nil();
00142   }
00143 
00144   dw_servant->init(
00145       topic_servant,
00146       dw_qos,
00147       a_listener,
00148       mask,
00149       participant_,
00150       this);
00151 
00152   if ((this->enabled_ == true) && (qos_.entity_factory.autoenable_created_entities)) {
00153     const DDS::ReturnCode_t ret = dw_servant->enable();
00154 
00155     if (ret != DDS::RETCODE_OK) {
00156       ACE_ERROR((LM_WARNING,
00157           ACE_TEXT("(%P|%t) WARNING: ")
00158           ACE_TEXT("PublisherImpl::create_datawriter, ")
00159           ACE_TEXT("enable failed.\n")));
00160       return DDS::DataWriter::_nil();
00161     }
00162   } else {
00163     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, pi_lock_, 0);
00164     writers_not_enabled_.insert(rchandle_from(dw_servant));
00165   }
00166 
00167   return DDS::DataWriter::_duplicate(dw_obj.in());
00168 }
00169 
00170 DDS::ReturnCode_t
00171 PublisherImpl::delete_datawriter(DDS::DataWriter_ptr a_datawriter)
00172 {
00173   DataWriterImpl* dw_servant = dynamic_cast<DataWriterImpl*>(a_datawriter);
00174   if (!dw_servant) {
00175     ACE_ERROR((LM_ERROR,
00176               "(%P|%t) PublisherImpl::delete_datawriter - dynamic cast to DataWriterImpl failed\n"
00177     ));
00178     return DDS::RETCODE_ERROR;
00179   }
00180 
00181   // marks entity as deleted and stops future associating
00182   dw_servant->prepare_to_delete();
00183 
00184   {
00185     DDS::Publisher_var dw_publisher(dw_servant->get_publisher());
00186 
00187     if (dw_publisher.in() != this) {
00188       RepoId id = dw_servant->get_publication_id();
00189       GuidConverter converter(id);
00190       ACE_ERROR((LM_ERROR,
00191           ACE_TEXT("(%P|%t) PublisherImpl::delete_datawriter: ")
00192           ACE_TEXT("the data writer %C doesn't ")
00193           ACE_TEXT("belong to this subscriber \n"),
00194           OPENDDS_STRING(converter).c_str()));
00195       return DDS::RETCODE_PRECONDITION_NOT_MET;
00196     }
00197   }
00198 
00199 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
00200   // Trigger data to be persisted, i.e. made durable, if so
00201   // configured. This needs be called before unregister_instances
00202   // because unregister_instances may cause instance dispose.
00203   if (!dw_servant->persist_data() && DCPS_debug_level >= 2) {
00204     ACE_ERROR((LM_ERROR,
00205         ACE_TEXT("(%P|%t) ERROR: ")
00206         ACE_TEXT("PublisherImpl::delete_datawriter, ")
00207         ACE_TEXT("failed to make data durable.\n")));
00208   }
00209 #endif
00210 
00211   // Unregister all registered instances prior to deletion.
00212   DDS::Time_t source_timestamp = time_value_to_time(ACE_OS::gettimeofday());
00213   dw_servant->unregister_instances(source_timestamp);
00214 
00215   // Wait for any control messages to be transported during
00216   // unregistering of instances.
00217   dw_servant->wait_pending();
00218   dw_servant->wait_control_pending();
00219 
00220   RepoId publication_id  = GUID_UNKNOWN;
00221   {
00222     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00223         guard,
00224         this->pi_lock_,
00225         DDS::RETCODE_ERROR);
00226 
00227     publication_id = dw_servant->get_publication_id();
00228 
00229     PublicationMap::iterator it = publication_map_.find(publication_id);
00230 
00231     if (it == publication_map_.end()) {
00232       GuidConverter converter(publication_id);
00233       ACE_ERROR_RETURN((LM_ERROR,
00234           ACE_TEXT("(%P|%t) ERROR: ")
00235           ACE_TEXT("PublisherImpl::delete_datawriter, ")
00236           ACE_TEXT("datawriter %C not found.\n"),
00237           OPENDDS_STRING(converter).c_str()), DDS::RETCODE_ERROR);
00238     }
00239 
00240     // We can not erase the datawriter from datawriter map by the topic name
00241     // because the map might have multiple datawriters with the same topic
00242     // name.
00243     // Find the iterator to the datawriter in the datawriter map and erase
00244     // by the iterator.
00245     DataWriterMap::iterator writ;
00246     DataWriterMap::iterator the_writ = datawriter_map_.end();
00247 
00248     for (writ = datawriter_map_.begin();
00249         writ != datawriter_map_.end();
00250         ++writ) {
00251       if (writ->second == it->second) {
00252         the_writ = writ;
00253         break;
00254       }
00255     }
00256 
00257     if (the_writ != datawriter_map_.end()) {
00258       datawriter_map_.erase(the_writ);
00259     }
00260 
00261     publication_map_.erase(it);
00262 
00263     // Release pi_lock_ before making call to transport layer to avoid
00264     // some deadlock situations that threads acquire locks(PublisherImpl
00265     // pi_lock_, TransportClient reservation_lock and TransportImpl
00266     // lock_) in reverse order.
00267     ACE_GUARD_RETURN(reverse_lock_type, reverse_monitor, this->reverse_pi_lock_,
00268         DDS::RETCODE_ERROR);
00269     // Wait for pending samples to drain prior to removing associations
00270     // and unregistering the publication.
00271     dw_servant->wait_pending();
00272     // Call remove association before unregistering the datawriter
00273     // with the transport, otherwise some callbacks resulted from
00274     // remove_association may lost.
00275     dw_servant->remove_all_associations();
00276     dw_servant->cleanup();
00277   }
00278 
00279   if (this->monitor_) {
00280     this->monitor_->report();
00281   }
00282 
00283   // not just unregister but remove any pending writes/sends.
00284   dw_servant->unregister_all();
00285 
00286   RcHandle<DomainParticipantImpl> participant = this->participant_.lock();
00287 
00288   Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
00289   if (!disco->remove_publication(
00290       this->domain_id_,
00291       participant->get_id(),
00292       publication_id)) {
00293     ACE_ERROR_RETURN((LM_ERROR,
00294         ACE_TEXT("(%P|%t) ERROR: ")
00295         ACE_TEXT("PublisherImpl::delete_datawriter, ")
00296         ACE_TEXT("publication not removed from discovery.\n")),
00297         DDS::RETCODE_ERROR);
00298   }
00299 
00300   participant->remove_adjust_liveliness_timers();
00301 
00302   return DDS::RETCODE_OK;
00303 }
00304 
00305 DDS::DataWriter_ptr
00306 PublisherImpl::lookup_datawriter(const char* topic_name)
00307 {
00308   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00309       guard,
00310       this->pi_lock_,
00311       DDS::DataWriter::_nil());
00312 
00313   // If multiple entries whose key is "topic_name" then which one is
00314   // returned ? Spec does not limit which one should give.
00315   DataWriterMap::iterator it = datawriter_map_.find(topic_name);
00316 
00317   if (it == datawriter_map_.end()) {
00318     if (DCPS_debug_level >= 2) {
00319       ACE_DEBUG((LM_DEBUG,
00320           ACE_TEXT("(%P|%t) ")
00321           ACE_TEXT("PublisherImpl::lookup_datawriter, ")
00322           ACE_TEXT("The datawriter(topic_name=%C) is not found\n"),
00323           topic_name));
00324     }
00325 
00326     return DDS::DataWriter::_nil();
00327 
00328   } else {
00329     return DDS::DataWriter::_duplicate(it->second.in());
00330   }
00331 }
00332 
00333 DDS::ReturnCode_t
00334 PublisherImpl::delete_contained_entities()
00335 {
00336   // mark that the entity is being deleted
00337   set_deleted(true);
00338 
00339   while (true) {
00340     PublicationId pub_id = GUID_UNKNOWN;
00341     DataWriterImpl_rch a_datawriter;
00342 
00343     {
00344       ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00345           guard,
00346           this->pi_lock_,
00347           DDS::RETCODE_ERROR);
00348 
00349       if (datawriter_map_.empty()) {
00350         break;
00351       } else {
00352         a_datawriter = datawriter_map_.begin()->second;
00353         pub_id = a_datawriter->get_publication_id();
00354       }
00355     }
00356 
00357     const DDS::ReturnCode_t ret = delete_datawriter(a_datawriter.in());
00358 
00359     if (ret != DDS::RETCODE_OK) {
00360       GuidConverter converter(pub_id);
00361       ACE_ERROR_RETURN((LM_ERROR,
00362           ACE_TEXT("(%P|%t) ERROR: ")
00363           ACE_TEXT("PublisherImpl::")
00364           ACE_TEXT("delete_contained_entities: ")
00365           ACE_TEXT("failed to delete ")
00366           ACE_TEXT("datawriter %C.\n"),
00367           OPENDDS_STRING(converter).c_str()),ret);
00368     }
00369   }
00370 
00371   // the publisher can now start creating new publications
00372   set_deleted(false);
00373 
00374   return DDS::RETCODE_OK;
00375 }
00376 
00377 DDS::ReturnCode_t
00378 PublisherImpl::set_qos(const DDS::PublisherQos & qos)
00379 {
00380 
00381   OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00382 
00383   if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
00384     if (qos_ == qos)
00385       return DDS::RETCODE_OK;
00386 
00387     // for the not changeable qos, it can be changed before enable
00388     if (!Qos_Helper::changeable(qos_, qos) && enabled_ == true) {
00389       return DDS::RETCODE_IMMUTABLE_POLICY;
00390 
00391     } else {
00392       qos_ = qos;
00393 
00394       DwIdToQosMap idToQosMap;
00395       {
00396         ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00397             guard,
00398             this->pi_lock_,
00399             DDS::RETCODE_ERROR);
00400 
00401         for (PublicationMap::iterator iter = publication_map_.begin();
00402             iter != publication_map_.end();
00403             ++iter) {
00404           DDS::DataWriterQos qos;
00405           iter->second->get_qos(qos);
00406           RepoId id = iter->second->get_publication_id();
00407           std::pair<DwIdToQosMap::iterator, bool> pair =
00408               idToQosMap.insert(DwIdToQosMap::value_type(id, qos));
00409 
00410           if (pair.second == false) {
00411             GuidConverter converter(id);
00412             ACE_ERROR_RETURN((LM_ERROR,
00413                 ACE_TEXT("(%P|%t) ")
00414                 ACE_TEXT("PublisherImpl::set_qos: ")
00415                 ACE_TEXT("insert id %C to DwIdToQosMap ")
00416                 ACE_TEXT("failed.\n"),
00417                 OPENDDS_STRING(converter).c_str()), DDS::RETCODE_ERROR);
00418           }
00419         }
00420       }
00421 
00422       DwIdToQosMap::iterator iter = idToQosMap.begin();
00423 
00424       while (iter != idToQosMap.end()) {
00425         Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
00426         bool status = false;
00427 
00428         RcHandle<DomainParticipantImpl> participant = this->participant_.lock();
00429         if (participant)
00430           status = disco->update_publication_qos(
00431               participant->get_domain_id(),
00432               participant->get_id(),
00433               iter->first,
00434               iter->second,
00435               this->qos_);
00436 
00437         if (!status) {
00438           ACE_ERROR_RETURN((LM_ERROR,
00439               ACE_TEXT("(%P|%t) PublisherImpl::set_qos, ")
00440               ACE_TEXT("failed. \n")),
00441               DDS::RETCODE_ERROR);
00442         }
00443 
00444         ++iter;
00445       }
00446     }
00447 
00448     return DDS::RETCODE_OK;
00449 
00450   } else {
00451     return DDS::RETCODE_INCONSISTENT_POLICY;
00452   }
00453 }
00454 
00455 DDS::ReturnCode_t
00456 PublisherImpl::get_qos(DDS::PublisherQos & qos)
00457 {
00458   qos = qos_;
00459   return DDS::RETCODE_OK;
00460 }
00461 
00462 DDS::ReturnCode_t
00463 PublisherImpl::set_listener(DDS::PublisherListener_ptr a_listener,
00464     DDS::StatusMask            mask)
00465 {
00466   listener_mask_ = mask;
00467   //note: OK to duplicate  a nil object ref
00468   listener_ = DDS::PublisherListener::_duplicate(a_listener);
00469   return DDS::RETCODE_OK;
00470 }
00471 
00472 DDS::PublisherListener_ptr
00473 PublisherImpl::get_listener()
00474 {
00475   return DDS::PublisherListener::_duplicate(listener_.in());
00476 }
00477 
00478 DDS::ReturnCode_t
00479 PublisherImpl::suspend_publications()
00480 {
00481   if (enabled_ == false) {
00482     ACE_ERROR_RETURN((LM_ERROR,
00483         ACE_TEXT("(%P|%t) ERROR: ")
00484         ACE_TEXT("PublisherImpl::suspend_publications, ")
00485         ACE_TEXT(" Entity is not enabled. \n")),
00486         DDS::RETCODE_NOT_ENABLED);
00487   }
00488 
00489   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00490       guard,
00491       this->pi_lock_,
00492       DDS::RETCODE_ERROR);
00493   ++suspend_depth_count_;
00494   return DDS::RETCODE_OK;
00495 }
00496 
00497 bool
00498 PublisherImpl::is_suspended() const
00499 {
00500   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00501       guard,
00502       this->pi_lock_,
00503       false);
00504   return suspend_depth_count_;
00505 }
00506 
00507 DDS::ReturnCode_t
00508 PublisherImpl::resume_publications()
00509 {
00510   if (enabled_ == false) {
00511     ACE_ERROR_RETURN((LM_ERROR,
00512         ACE_TEXT("(%P|%t) ERROR: ")
00513         ACE_TEXT("PublisherImpl::resume_publications, ")
00514         ACE_TEXT(" Entity is not enabled. \n")),
00515         DDS::RETCODE_NOT_ENABLED);
00516   }
00517 
00518   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00519       guard,
00520       this->pi_lock_,
00521       DDS::RETCODE_ERROR);
00522 
00523   --suspend_depth_count_;
00524 
00525   if (suspend_depth_count_ < 0) {
00526     suspend_depth_count_ = 0;
00527     return DDS::RETCODE_PRECONDITION_NOT_MET;
00528   }
00529 
00530   if (suspend_depth_count_ == 0) {
00531 
00532     for (PublicationMap::iterator it = this->publication_map_.begin();
00533         it != this->publication_map_.end(); ++it) {
00534       it->second->send_suspended_data();
00535     }
00536   }
00537 
00538   return DDS::RETCODE_OK;
00539 }
00540 
00541 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00542 
00543 DDS::ReturnCode_t
00544 PublisherImpl::begin_coherent_changes()
00545 {
00546   if (enabled_ == false) {
00547     ACE_ERROR_RETURN((LM_ERROR,
00548         ACE_TEXT("(%P|%t) ERROR: PublisherImpl::begin_coherent_changes:")
00549         ACE_TEXT(" Publisher is not enabled!\n")),
00550         DDS::RETCODE_NOT_ENABLED);
00551   }
00552 
00553   if (!qos_.presentation.coherent_access) {
00554     ACE_ERROR_RETURN((LM_ERROR,
00555         ACE_TEXT("(%P|%t) ERROR: PublisherImpl::begin_coherent_changes:")
00556         ACE_TEXT(" QoS policy does not support coherent access!\n")),
00557         DDS::RETCODE_ERROR);
00558   }
00559 
00560   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00561       guard,
00562       this->pi_lock_,
00563       DDS::RETCODE_ERROR);
00564 
00565   ++this->change_depth_;
00566 
00567   if (qos_.presentation.access_scope == DDS::INSTANCE_PRESENTATION_QOS) {
00568     // INSTANCE access scope essentially behaves
00569     // as a no-op. (see: 7.1.3.6)
00570     return DDS::RETCODE_OK;
00571   }
00572 
00573   // We should only notify publications on the first
00574   // and last change to the current change set:
00575   if (this->change_depth_ == 1) {
00576     for (PublicationMap::iterator it = this->publication_map_.begin();
00577         it != this->publication_map_.end(); ++it) {
00578       it->second->begin_coherent_changes();
00579     }
00580   }
00581 
00582   return DDS::RETCODE_OK;
00583 }
00584 
00585 DDS::ReturnCode_t
00586 PublisherImpl::end_coherent_changes()
00587 {
00588   if (enabled_ == false) {
00589     ACE_ERROR_RETURN((LM_ERROR,
00590         ACE_TEXT("(%P|%t) ERROR: PublisherImpl::end_coherent_changes:")
00591         ACE_TEXT(" Publisher is not enabled!\n")),
00592         DDS::RETCODE_NOT_ENABLED);
00593   }
00594 
00595   if (!qos_.presentation.coherent_access) {
00596     ACE_ERROR_RETURN((LM_ERROR,
00597         ACE_TEXT("(%P|%t) ERROR: PublisherImpl::end_coherent_changes:")
00598         ACE_TEXT(" QoS policy does not support coherent access!\n")),
00599         DDS::RETCODE_ERROR);
00600   }
00601 
00602   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00603       guard,
00604       this->pi_lock_,
00605       DDS::RETCODE_ERROR);
00606 
00607   if (this->change_depth_ == 0) {
00608     ACE_ERROR_RETURN((LM_ERROR,
00609         ACE_TEXT("(%P|%t) ERROR: PublisherImpl::end_coherent_changes:")
00610         ACE_TEXT(" No matching call to begin_coherent_changes!\n")),
00611         DDS::RETCODE_PRECONDITION_NOT_MET);
00612   }
00613 
00614   --this->change_depth_;
00615 
00616   if (qos_.presentation.access_scope == DDS::INSTANCE_PRESENTATION_QOS) {
00617     // INSTANCE access scope essentially behaves
00618     // as a no-op. (see: 7.1.3.6)
00619     return DDS::RETCODE_OK;
00620   }
00621 
00622   // We should only notify publications on the first
00623   // and last change to the current change set:
00624   if (this->change_depth_ == 0) {
00625     GroupCoherentSamples group_samples;
00626     for (PublicationMap::iterator it = this->publication_map_.begin();
00627         it != this->publication_map_.end(); ++it) {
00628 
00629       if (it->second->coherent_samples_ == 0) {
00630         continue;
00631       }
00632 
00633       std::pair<GroupCoherentSamples::iterator, bool> pair =
00634           group_samples.insert(GroupCoherentSamples::value_type(
00635               it->second->get_publication_id(),
00636               WriterCoherentSample(it->second->coherent_samples_,
00637                   it->second->sequence_number_)));
00638 
00639       if (pair.second == false) {
00640         ACE_ERROR_RETURN((LM_ERROR,
00641             ACE_TEXT("(%P|%t) ERROR: PublisherImpl::end_coherent_changes: ")
00642             ACE_TEXT("failed to insert to GroupCoherentSamples.\n")),
00643             DDS::RETCODE_ERROR);
00644       }
00645     }
00646 
00647     for (PublicationMap::iterator it = this->publication_map_.begin();
00648         it != this->publication_map_.end(); ++it) {
00649       if (it->second->coherent_samples_ == 0) {
00650         continue;
00651       }
00652 
00653       it->second->end_coherent_changes(group_samples);
00654     }
00655   }
00656 
00657   return DDS::RETCODE_OK;
00658 }
00659 
00660 #endif // OPENDDS_NO_OBJECT_MODEL_PROFILE
00661 
00662 DDS::ReturnCode_t
00663 PublisherImpl::wait_for_acknowledgments(
00664     const DDS::Duration_t& max_wait)
00665 {
00666   if (enabled_ == false) {
00667     ACE_ERROR_RETURN((LM_ERROR,
00668         ACE_TEXT("(%P|%t) ERROR: PublisherImpl::wait_for_acknowledgments, ")
00669         ACE_TEXT("Entity is not enabled.\n")),
00670         DDS::RETCODE_NOT_ENABLED);
00671   }
00672 
00673   typedef OPENDDS_MAP(DataWriterImpl*, DataWriterImpl::AckToken) DataWriterAckMap;
00674   DataWriterAckMap ack_writers;
00675   {
00676     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00677         guard,
00678         this->pi_lock_,
00679         DDS::RETCODE_ERROR);
00680 
00681     // Collect writers to request acks
00682     for (DataWriterMap::iterator it(this->datawriter_map_.begin());
00683         it != this->datawriter_map_.end(); ++it) {
00684       DataWriterImpl_rch writer = it->second;
00685       if (writer->qos_.reliability.kind != DDS::RELIABLE_RELIABILITY_QOS)
00686         continue;
00687       if (writer->should_ack()) {
00688         DataWriterImpl::AckToken token = writer->create_ack_token(max_wait);
00689 
00690         std::pair<DataWriterAckMap::iterator, bool> pair =
00691             ack_writers.insert(DataWriterAckMap::value_type(writer.in(), token));
00692 
00693         if (!pair.second) {
00694           ACE_ERROR_RETURN((LM_ERROR,
00695               ACE_TEXT("(%P|%t) ERROR: PublisherImpl::wait_for_acknowledgments, ")
00696               ACE_TEXT("Unable to insert AckToken into DataWriterAckMap!\n")),
00697               DDS::RETCODE_ERROR);
00698         }
00699       }
00700     }
00701   }
00702 
00703   if (ack_writers.empty()) {
00704     if (DCPS_debug_level > 0) {
00705       ACE_DEBUG((LM_DEBUG,
00706           ACE_TEXT("(%P|%t) PublisherImpl::wait_for_acknowledgments() - ")
00707           ACE_TEXT("not blocking due to no writers requiring acks.\n")));
00708     }
00709 
00710     return DDS::RETCODE_OK;
00711   }
00712 
00713   // Wait for ack responses from all associated readers
00714   for (DataWriterAckMap::iterator it(ack_writers.begin());
00715       it != ack_writers.end(); ++it) {
00716     DataWriterImpl::AckToken token = it->second;
00717 
00718     it->first->wait_for_specific_ack(token);
00719   }
00720 
00721   return DDS::RETCODE_OK;
00722 }
00723 
00724 DDS::DomainParticipant_ptr
00725 PublisherImpl::get_participant()
00726 {
00727   return participant_.lock()._retn();
00728 }
00729 
00730 DDS::ReturnCode_t
00731 PublisherImpl::set_default_datawriter_qos(const DDS::DataWriterQos & qos)
00732 {
00733   if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
00734     default_datawriter_qos_ = qos;
00735     return DDS::RETCODE_OK;
00736 
00737   } else {
00738     return DDS::RETCODE_INCONSISTENT_POLICY;
00739   }
00740 }
00741 
00742 DDS::ReturnCode_t
00743 PublisherImpl::get_default_datawriter_qos(DDS::DataWriterQos & qos)
00744 {
00745   qos = default_datawriter_qos_;
00746   return DDS::RETCODE_OK;
00747 }
00748 
00749 DDS::ReturnCode_t
00750 PublisherImpl::copy_from_topic_qos(DDS::DataWriterQos &  a_datawriter_qos,
00751     const DDS::TopicQos & a_topic_qos)
00752 {
00753   if (Qos_Helper::copy_from_topic_qos(a_datawriter_qos, a_topic_qos)) {
00754     return DDS::RETCODE_OK;
00755   } else {
00756     return DDS::RETCODE_INCONSISTENT_POLICY;
00757   }
00758 }
00759 
00760 DDS::ReturnCode_t
00761 PublisherImpl::enable()
00762 {
00763   //According spec:
00764   // - Calling enable on an already enabled Entity returns OK and has no
00765   // effect.
00766   // - Calling enable on an Entity whose factory is not enabled will fail
00767   // and return PRECONDITION_NOT_MET.
00768 
00769   if (this->is_enabled()) {
00770     return DDS::RETCODE_OK;
00771   }
00772 
00773   RcHandle<DomainParticipantImpl> participant = this->participant_.lock();
00774   if (!participant || participant->is_enabled() == false) {
00775     return DDS::RETCODE_PRECONDITION_NOT_MET;
00776   }
00777 
00778   if (this->monitor_) {
00779     this->monitor_->report();
00780   }
00781 
00782   this->set_enabled();
00783 
00784   if (qos_.entity_factory.autoenable_created_entities) {
00785     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, pi_lock_, DDS::RETCODE_ERROR);
00786     DataWriterSet writers;
00787     writers_not_enabled_.swap(writers);
00788     for (DataWriterSet::iterator it = writers.begin(); it != writers.end(); ++it) {
00789       (*it)->enable();
00790     }
00791   }
00792 
00793   return DDS::RETCODE_OK;
00794 }
00795 
00796 bool
00797 PublisherImpl::is_clean() const
00798 {
00799   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00800       guard,
00801       this->pi_lock_,
00802       false);
00803   return datawriter_map_.empty() && publication_map_.empty();
00804 }
00805 
00806 DDS::ReturnCode_t
00807 PublisherImpl::writer_enabled(const char*     topic_name,
00808     DataWriterImpl* writer_ptr)
00809 {
00810   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00811       guard,
00812       this->pi_lock_,
00813       DDS::RETCODE_ERROR);
00814   DataWriterImpl_rch writer = rchandle_from(writer_ptr);
00815   writers_not_enabled_.erase(writer);
00816 
00817   datawriter_map_.insert(DataWriterMap::value_type(topic_name, writer));
00818 
00819   const RepoId publication_id = writer->get_publication_id();
00820 
00821   std::pair<PublicationMap::iterator, bool> pair =
00822       publication_map_.insert(PublicationMap::value_type(publication_id, writer));
00823 
00824   if (pair.second == false) {
00825     GuidConverter converter(publication_id);
00826     ACE_ERROR_RETURN((LM_ERROR,
00827         ACE_TEXT("(%P|%t) ERROR: ")
00828         ACE_TEXT("PublisherImpl::writer_enabled: ")
00829         ACE_TEXT("insert publication %C failed.\n"),
00830         OPENDDS_STRING(converter).c_str()), DDS::RETCODE_ERROR);
00831   }
00832 
00833   if (this->monitor_) {
00834     this->monitor_->report();
00835   }
00836 
00837   return DDS::RETCODE_OK;
00838 }
00839 
00840 
00841 DDS::PublisherListener_ptr
00842 PublisherImpl::listener_for(DDS::StatusKind kind)
00843 {
00844   // per 2.1.4.3.1 Listener Access to Plain Communication Status
00845   // use this entities factory if listener is mask not enabled
00846   // for this kind.
00847   RcHandle<DomainParticipantImpl> participant = this->participant_.lock();
00848 
00849   if (!participant)
00850     return 0;
00851 
00852   if (CORBA::is_nil(listener_.in()) || (listener_mask_ & kind) == 0) {
00853     return participant->listener_for(kind);
00854 
00855   } else {
00856     return DDS::PublisherListener::_duplicate(listener_.in());
00857   }
00858 }
00859 
00860 DDS::ReturnCode_t
00861 PublisherImpl::assert_liveliness_by_participant()
00862 {
00863   DDS::ReturnCode_t ret = DDS::RETCODE_OK;
00864 
00865   for (DataWriterMap::iterator it(datawriter_map_.begin());
00866       it != datawriter_map_.end(); ++it) {
00867     DDS::ReturnCode_t dw_ret = it->second->assert_liveliness_by_participant();
00868 
00869     if (dw_ret != DDS::RETCODE_OK) {
00870       ret = dw_ret;
00871     }
00872   }
00873 
00874   return ret;
00875 }
00876 
00877 ACE_Time_Value
00878 PublisherImpl::liveliness_check_interval(DDS::LivelinessQosPolicyKind kind)
00879 {
00880   ACE_Time_Value tv = ACE_Time_Value::max_time;
00881   for (DataWriterMap::iterator it(datawriter_map_.begin());
00882       it != datawriter_map_.end(); ++it) {
00883     tv = std::min (tv, it->second->liveliness_check_interval(kind));
00884   }
00885   return tv;
00886 }
00887 
00888 bool
00889 PublisherImpl::participant_liveliness_activity_after(const ACE_Time_Value& tv)
00890 {
00891   for (DataWriterMap::iterator it(datawriter_map_.begin());
00892       it != datawriter_map_.end(); ++it) {
00893     if (it->second->participant_liveliness_activity_after(tv)) {
00894       return true;
00895     }
00896   }
00897   return false;
00898 }
00899 
00900 void
00901 PublisherImpl::get_publication_ids(PublicationIdVec& pubs)
00902 {
00903   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00904       guard,
00905       this->pi_lock_,
00906   );
00907 
00908   pubs.reserve(publication_map_.size());
00909   for (PublicationMap::iterator iter = publication_map_.begin();
00910       iter != publication_map_.end();
00911       ++iter) {
00912     pubs.push_back(iter->first);
00913   }
00914 }
00915 
00916 RcHandle<EntityImpl>
00917 PublisherImpl::parent() const
00918 {
00919   return this->participant_.lock();
00920 }
00921 
00922 bool
00923 PublisherImpl::validate_datawriter_qos(const DDS::DataWriterQos& qos,
00924     const DDS::DataWriterQos& default_qos,
00925     DDS::Topic_ptr            a_topic,
00926     DDS::DataWriterQos&       dw_qos)
00927 {
00928   if (CORBA::is_nil(a_topic)) {
00929     ACE_ERROR((LM_ERROR,
00930         ACE_TEXT("(%P|%t) ERROR: ")
00931         ACE_TEXT("PublisherImpl::create_datawriter, ")
00932         ACE_TEXT("topic is nil.\n")));
00933     return DDS::DataWriter::_nil();
00934   }
00935 
00936   if (qos == DATAWRITER_QOS_DEFAULT) {
00937     dw_qos = default_qos;
00938 
00939   } else if (qos == DATAWRITER_QOS_USE_TOPIC_QOS) {
00940     DDS::TopicQos topic_qos;
00941     a_topic->get_qos(topic_qos);
00942     dw_qos = default_qos;
00943 
00944     Qos_Helper::copy_from_topic_qos(dw_qos, topic_qos);
00945 
00946   } else {
00947     dw_qos = qos;
00948   }
00949 
00950   OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(dw_qos, DDS::DataWriter::_nil());
00951   OPENDDS_NO_OWNERSHIP_STRENGTH_COMPATIBILITY_CHECK(dw_qos, DDS::DataWriter::_nil());
00952   OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(dw_qos, DDS::DataWriter::_nil());
00953   OPENDDS_NO_DURABILITY_SERVICE_COMPATIBILITY_CHECK(dw_qos, DDS::DataWriter::_nil());
00954   OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(dw_qos, DDS::DataWriter::_nil());
00955 
00956   if (!Qos_Helper::valid(dw_qos)) {
00957     ACE_ERROR((LM_ERROR,
00958         ACE_TEXT("(%P|%t) ERROR: ")
00959         ACE_TEXT("PublisherImpl::create_datawriter, ")
00960         ACE_TEXT("invalid qos.\n")));
00961     return DDS::DataWriter::_nil();
00962   }
00963 
00964   if (!Qos_Helper::consistent(dw_qos)) {
00965     ACE_ERROR((LM_ERROR,
00966         ACE_TEXT("(%P|%t) ERROR: ")
00967         ACE_TEXT("PublisherImpl::create_datawriter, ")
00968         ACE_TEXT("inconsistent qos.\n")));
00969     return DDS::DataWriter::_nil();
00970   }
00971   return true;
00972 }
00973 
00974 OPENDDS_END_VERSIONED_NAMESPACE_DECL
00975 
00976 } // namespace DCPS
00977 } // namespace OpenDDS
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1