ReplayerImpl.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
00009 // #include "DataWriterImpl.h"
00010 #include "FeatureDisabledQosCheck.h"
00011 #include "DomainParticipantImpl.h"
00012 #include "PublisherImpl.h"
00013 #include "Service_Participant.h"
00014 #include "GuidConverter.h"
00015 #include "TopicImpl.h"
00016 #include "PublicationInstance.h"
00017 #include "SendStateDataSampleList.h"
00018 #include "DataSampleElement.h"
00019 #include "Serializer.h"
00020 #include "Transient_Kludge.h"
00021 #include "DataDurabilityCache.h"
00022 #include "OfferedDeadlineWatchdog.h"
00023 #include "MonitorFactory.h"
00024 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00025 #include "CoherentChangeControl.h"
00026 #endif
00027 #include "AssociationData.h"
00028 
00029 #if !defined (DDS_HAS_MINIMUM_BIT)
00030 #include "BuiltInTopicUtils.h"
00031 #endif // !defined (DDS_HAS_MINIMUM_BIT)
00032 
00033 #include "Util.h"
00034 
00035 #include "dds/DCPS/transport/framework/EntryExit.h"
00036 #include "dds/DCPS/transport/framework/TransportExceptions.h"
00037 #include "dds/DCPS/transport/framework/TransportSendElement.h"
00038 #include "dds/DCPS/transport/framework/TransportCustomizedElement.h"
00039 
00040 #include "tao/ORB_Core.h"
00041 #include "ace/Reactor.h"
00042 #include "ace/Auto_Ptr.h"
00043 
00044 #include <stdexcept>
00045 
00046 #include "ReplayerImpl.h"
00047 
00048 namespace OpenDDS {
00049 namespace DCPS {
00050 
00051 
00052 ReplayerImpl::ReplayerImpl()
00053   : data_dropped_count_(0),
00054   data_delivered_count_(0),
00055   n_chunks_(TheServiceParticipant->n_chunks()),
00056   association_chunk_multiplier_(TheServiceParticipant->association_chunk_multiplier()),
00057   qos_(TheServiceParticipant->initial_DataWriterQos()),
00058   participant_servant_(0),
00059   topic_id_(GUID_UNKNOWN),
00060   topic_servant_(0),
00061   listener_mask_(DEFAULT_STATUS_MASK),
00062   domain_id_(0),
00063   publisher_servant_(0),
00064   publication_id_(GUID_UNKNOWN),
00065   sequence_number_(SequenceNumber::SEQUENCENUMBER_UNKNOWN()),
00066   // data_container_(0),
00067   // liveliness_lost_(false),
00068   // last_deadline_missed_total_count_(0),
00069   is_bit_(false),
00070   empty_condition_(lock_),
00071   pending_write_count_(0)
00072 {
00073   // liveliness_lost_status_.total_count = 0;
00074   // liveliness_lost_status_.total_count_change = 0;
00075   //
00076   // offered_deadline_missed_status_.total_count = 0;
00077   // offered_deadline_missed_status_.total_count_change = 0;
00078   // offered_deadline_missed_status_.last_instance_handle = DDS::HANDLE_NIL;
00079 
00080   offered_incompatible_qos_status_.total_count = 0;
00081   offered_incompatible_qos_status_.total_count_change = 0;
00082   offered_incompatible_qos_status_.last_policy_id = 0;
00083   offered_incompatible_qos_status_.policies.length(0);
00084 
00085   publication_match_status_.total_count = 0;
00086   publication_match_status_.total_count_change = 0;
00087   publication_match_status_.current_count = 0;
00088   publication_match_status_.current_count_change = 0;
00089   publication_match_status_.last_subscription_handle = DDS::HANDLE_NIL;
00090 
00091 }
00092 
00093 // This method is called when there are no longer any reference to the
00094 // the servant.
00095 ReplayerImpl::~ReplayerImpl()
00096 {
00097   DBG_ENTRY_LVL("ReplayerImpl","~ReplayerImpl",6);
00098 }
00099 
00100 // this method is called when delete_datawriter is called.
00101 DDS::ReturnCode_t
00102 ReplayerImpl::cleanup()
00103 {
00104 
00105   //     // Unregister all registered instances prior to deletion.
00106   //     // DDS::Time_t source_timestamp = time_value_to_time(ACE_OS::gettimeofday());
00107   //     // this->unregister_instances(source_timestamp);
00108   //
00109   //     // CORBA::String_var topic_name = this->get_Atopic_name();
00110   {
00111     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, DDS::RETCODE_ERROR);
00112 
00113     // Wait for pending samples to drain prior to removing associations
00114     // and unregistering the publication.
00115     while (this->pending_write_count_)
00116       this->empty_condition_.wait();
00117 
00118     // Call remove association before unregistering the datawriter
00119     // with the transport, otherwise some callbacks resulted from
00120     // remove_association may lost.
00121     this->remove_all_associations();
00122 
00123     // release our Topic_var
00124     topic_objref_ = DDS::Topic::_nil();
00125     topic_servant_->remove_entity_ref();
00126     topic_servant_->_remove_ref();
00127     topic_servant_ = 0;
00128 
00129   }
00130 
00131   // not just unregister but remove any pending writes/sends.
00132   // this->unregister_all();
00133 
00134   Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
00135   if (!disco->remove_publication(
00136         this->domain_id_,
00137         this->participant_servant_->get_id(),
00138         this->publication_id_)) {
00139     ACE_ERROR_RETURN((LM_ERROR,
00140                       ACE_TEXT("(%P|%t) ERROR: ")
00141                       ACE_TEXT("PublisherImpl::delete_datawriter, ")
00142                       ACE_TEXT("publication not removed from discovery.\n")),
00143                      DDS::RETCODE_ERROR);
00144   }
00145   return DDS::RETCODE_OK;
00146 }
00147 
00148 void
00149 ReplayerImpl::init(
00150   DDS::Topic_ptr                         topic,
00151   TopicImpl *                            topic_servant,
00152   const DDS::DataWriterQos &             qos,
00153   ReplayerListener_rch                   a_listener,
00154   const DDS::StatusMask &                mask,
00155   OpenDDS::DCPS::DomainParticipantImpl * participant_servant,
00156   const DDS::PublisherQos&               publisher_qos)
00157 {
00158   DBG_ENTRY_LVL("ReplayerImpl","init",6);
00159   topic_objref_ = DDS::Topic::_duplicate(topic);
00160   topic_servant_ = topic_servant;
00161   topic_servant_->_add_ref();
00162   topic_servant_->add_entity_ref();
00163   topic_name_    = topic_servant_->get_name();
00164   topic_id_      = topic_servant_->get_id();
00165   type_name_     = topic_servant_->get_type_name();
00166 
00167 #if !defined (DDS_HAS_MINIMUM_BIT)
00168   is_bit_ = ACE_OS::strcmp(topic_name_.in(), BUILT_IN_PARTICIPANT_TOPIC) == 0
00169             || ACE_OS::strcmp(topic_name_.in(), BUILT_IN_TOPIC_TOPIC) == 0
00170             || ACE_OS::strcmp(topic_name_.in(), BUILT_IN_SUBSCRIPTION_TOPIC) == 0
00171             || ACE_OS::strcmp(topic_name_.in(), BUILT_IN_PUBLICATION_TOPIC) == 0;
00172 #endif   // !defined (DDS_HAS_MINIMUM_BIT)
00173 
00174   qos_ = qos;
00175 
00176   //Note: OK to _duplicate(nil).
00177   listener_ = a_listener;
00178   listener_mask_ = mask;
00179 
00180   // Only store the participant pointer, since it is our "grand"
00181   // parent, we will exist as long as it does.
00182   participant_servant_ = participant_servant;
00183   domain_id_ = participant_servant_->get_domain_id();
00184 
00185   publisher_qos_ = publisher_qos;
00186 }
00187 
00188 
00189 DDS::ReturnCode_t ReplayerImpl::set_qos (const ::DDS::PublisherQos & publisher_qos,
00190                                          const DDS::DataWriterQos &  qos)
00191 {
00192 
00193   OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK(publisher_qos, DDS::RETCODE_UNSUPPORTED);
00194 
00195   if (Qos_Helper::valid(publisher_qos) && Qos_Helper::consistent(publisher_qos)) {
00196     if (publisher_qos_ == publisher_qos)
00197       return DDS::RETCODE_OK;
00198 
00199     // for the not changeable qos, it can be changed before enable
00200     if (!Qos_Helper::changeable(publisher_qos_, publisher_qos) && enabled_ == true) {
00201       return DDS::RETCODE_IMMUTABLE_POLICY;
00202 
00203     } else {
00204       publisher_qos_ = publisher_qos;
00205     }
00206   } else {
00207     return DDS::RETCODE_INCONSISTENT_POLICY;
00208   }
00209 
00210   OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00211   OPENDDS_NO_OWNERSHIP_STRENGTH_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00212   OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00213   OPENDDS_NO_DURABILITY_SERVICE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00214   OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00215 
00216   if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
00217     if (qos_ == qos)
00218       return DDS::RETCODE_OK;
00219 
00220     if (!Qos_Helper::changeable(qos_, qos) && enabled_ == true) {
00221       return DDS::RETCODE_IMMUTABLE_POLICY;
00222 
00223     } else {
00224       Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
00225       // DDS::PublisherQos publisherQos;
00226       // this->publisher_servant_->get_qos(publisherQos);
00227       DDS::PublisherQos publisherQos = this->publisher_qos_;
00228       const bool status
00229         = disco->update_publication_qos(this->participant_servant_->get_domain_id(),
00230                                         this->participant_servant_->get_id(),
00231                                         this->publication_id_,
00232                                         qos,
00233                                         publisherQos);
00234 
00235       if (!status) {
00236         ACE_ERROR_RETURN((LM_ERROR,
00237                           ACE_TEXT("(%P|%t) DataWriterImpl::set_qos, ")
00238                           ACE_TEXT("qos not updated. \n")),
00239                          DDS::RETCODE_ERROR);
00240       }
00241     }
00242 
00243     if (!(qos_ == qos)) {
00244       // Reset the deadline timer if the period has changed.
00245       // if (qos_.deadline.period.sec != qos.deadline.period.sec
00246       //     || qos_.deadline.period.nanosec != qos.deadline.period.nanosec) {
00247       //   if (qos_.deadline.period.sec == DDS::DURATION_INFINITE_SEC
00248       //       && qos_.deadline.period.nanosec == DDS::DURATION_INFINITE_NSEC) {
00249       //     ACE_auto_ptr_reset(this->watchdog_,
00250       //                        new OfferedDeadlineWatchdog(
00251       //                          this->reactor_,
00252       //                          this->lock_,
00253       //                          qos.deadline,
00254       //                          this,
00255       //                          this->dw_local_objref_.in(),
00256       //                          this->offered_deadline_missed_status_,
00257       //                          this->last_deadline_missed_total_count_));
00258       //
00259       //   } else if (qos.deadline.period.sec == DDS::DURATION_INFINITE_SEC
00260       //              && qos.deadline.period.nanosec == DDS::DURATION_INFINITE_NSEC) {
00261       //     this->watchdog_->cancel_all();
00262       //     this->watchdog_.reset();
00263       //
00264       //   } else {
00265       //     this->watchdog_->reset_interval(
00266       //       duration_to_time_value(qos.deadline.period));
00267       //   }
00268       // }
00269 
00270       qos_ = qos;
00271     }
00272 
00273     return DDS::RETCODE_OK;
00274 
00275   } else {
00276     return DDS::RETCODE_INCONSISTENT_POLICY;
00277   }
00278 }
00279 
00280 DDS::ReturnCode_t ReplayerImpl::get_qos (DDS::PublisherQos &  publisher_qos,
00281                                          DDS::DataWriterQos & qos)
00282 {
00283   qos = qos_;
00284   publisher_qos = publisher_qos_;
00285   return DDS::RETCODE_OK;
00286 }
00287 
00288 
00289 DDS::ReturnCode_t ReplayerImpl::set_listener (const ReplayerListener_rch & a_listener,
00290                                               DDS::StatusMask              mask)
00291 {
00292   listener_ = a_listener;
00293   listener_mask_ = mask;
00294   return DDS::RETCODE_OK;
00295 }
00296 
00297 ReplayerListener_rch ReplayerImpl::get_listener ()
00298 {
00299   return listener_;
00300 }
00301 
00302 DDS::ReturnCode_t
00303 ReplayerImpl::enable()
00304 {
00305   //According spec:
00306   // - Calling enable on an already enabled Entity returns OK and has no
00307   // effect.
00308   // - Calling enable on an Entity whose factory is not enabled will fail
00309   // and return PRECONDITION_NOT_MET.
00310 
00311   if (this->is_enabled()) {
00312     return DDS::RETCODE_OK;
00313   }
00314 
00315   // if (this->publisher_servant_->is_enabled() == false) {
00316   //   return DDS::RETCODE_PRECONDITION_NOT_MET;
00317   // }
00318   //
00319   const bool reliable = qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS;
00320 
00321   if (qos_.resource_limits.max_samples != DDS::LENGTH_UNLIMITED) {
00322     n_chunks_ = qos_.resource_limits.max_samples;
00323   }
00324   // +1 because we might allocate one before releasing another
00325   // TBD - see if this +1 can be removed.
00326   ACE_auto_ptr_reset(mb_allocator_,
00327                      new MessageBlockAllocator(n_chunks_ * association_chunk_multiplier_));
00328   ACE_auto_ptr_reset(db_allocator_,
00329                      new DataBlockAllocator(n_chunks_+1));
00330   ACE_auto_ptr_reset(header_allocator_,
00331                      new DataSampleHeaderAllocator(n_chunks_+1));
00332 
00333   ACE_auto_ptr_reset(sample_list_element_allocator_,
00334                      new DataSampleElementAllocator(2 * n_chunks_));
00335 
00336   ACE_auto_ptr_reset(transport_send_element_allocator_,
00337                      new TransportSendElementAllocator(2 * n_chunks_,
00338                                                        sizeof(TransportSendElement)));
00339   ACE_auto_ptr_reset(transport_customized_element_allocator_,
00340                      new TransportCustomizedElementAllocator(2 * n_chunks_,
00341                                                              sizeof(TransportCustomizedElement)));
00342 
00343   if (DCPS_debug_level >= 2) {
00344     ACE_DEBUG((LM_DEBUG,
00345                "(%P|%t) ReplayerImpl::enable-mb"
00346                " Cached_Allocator_With_Overflow %x with %d chunks\n",
00347                mb_allocator_.get(),
00348                n_chunks_));
00349 
00350     ACE_DEBUG((LM_DEBUG,
00351                "(%P|%t) ReplayerImpl::enable-db"
00352                " Cached_Allocator_With_Overflow %x with %d chunks\n",
00353                db_allocator_.get(),
00354                n_chunks_));
00355 
00356     ACE_DEBUG((LM_DEBUG,
00357                "(%P|%t) ReplayerImpl::enable-header"
00358                " Cached_Allocator_With_Overflow %x with %d chunks\n",
00359                header_allocator_.get(),
00360                n_chunks_));
00361   }
00362 
00363   this->set_enabled();
00364 
00365   try {
00366     this->enable_transport(reliable,
00367                            this->qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS);
00368 
00369   } catch (const Transport::Exception&) {
00370     ACE_ERROR((LM_ERROR,
00371                ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::enable, ")
00372                ACE_TEXT("Transport Exception.\n")));
00373     return DDS::RETCODE_ERROR;
00374 
00375   }
00376 
00377   const TransportLocatorSeq& trans_conf_info = connection_info();
00378 
00379 
00380   Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
00381   this->publication_id_ =
00382     disco->add_publication(this->domain_id_,
00383                            this->participant_servant_->get_id(),
00384                            this->topic_servant_->get_id(),
00385                            this,
00386                            this->qos_,
00387                            trans_conf_info,
00388                            this->publisher_qos_);
00389 
00390   if (this->publication_id_ == GUID_UNKNOWN) {
00391     ACE_ERROR((LM_ERROR,
00392                ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::enable, ")
00393                ACE_TEXT("add_publication returned invalid id. \n")));
00394     return DDS::RETCODE_ERROR;
00395   }
00396 
00397   return DDS::RETCODE_OK;
00398 }
00399 
00400 
00401 
00402 void
00403 ReplayerImpl::add_association(const RepoId&            yourId,
00404                               const ReaderAssociation& reader,
00405                               bool                     active)
00406 {
00407   DBG_ENTRY_LVL("ReplayerImpl", "add_association", 6);
00408 
00409   if (DCPS_debug_level >= 1) {
00410     GuidConverter writer_converter(yourId);
00411     GuidConverter reader_converter(reader.readerId);
00412     ACE_DEBUG((LM_DEBUG,
00413                ACE_TEXT("(%P|%t) ReplayerImpl::add_association - ")
00414                ACE_TEXT("bit %d local %C remote %C\n"),
00415                is_bit_,
00416                OPENDDS_STRING(writer_converter).c_str(),
00417                OPENDDS_STRING(reader_converter).c_str()));
00418   }
00419 
00420   // if (entity_deleted_ == true) {
00421   //   if (DCPS_debug_level >= 1)
00422   //     ACE_DEBUG((LM_DEBUG,
00423   //                ACE_TEXT("(%P|%t) ReplayerImpl::add_association")
00424   //                ACE_TEXT(" This is a deleted datawriter, ignoring add.\n")));
00425   //
00426   //   return;
00427   // }
00428 
00429   if (GUID_UNKNOWN == publication_id_) {
00430     publication_id_ = yourId;
00431   }
00432 
00433   {
00434     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00435     reader_info_.insert(std::make_pair(reader.readerId,
00436                                        ReaderInfo(TheServiceParticipant->publisher_content_filter() ? reader.filterExpression : "",
00437                                                   reader.exprParams, participant_servant_,
00438                                                   reader.readerQos.durability.kind > DDS::VOLATILE_DURABILITY_QOS)));
00439   }
00440 
00441   if (DCPS_debug_level > 4) {
00442     GuidConverter converter(publication_id_);
00443     ACE_DEBUG((LM_DEBUG,
00444                ACE_TEXT("(%P|%t) ReplayerImpl::add_association(): ")
00445                ACE_TEXT("adding subscription to publication %C with priority %d.\n"),
00446                OPENDDS_STRING(converter).c_str(),
00447                qos_.transport_priority.value));
00448   }
00449 
00450   AssociationData data;
00451   data.remote_id_ = reader.readerId;
00452   data.remote_data_ = reader.readerTransInfo;
00453   data.remote_reliable_ =
00454     (reader.readerQos.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS);
00455   data.remote_durable_ =
00456     (reader.readerQos.durability.kind > DDS::VOLATILE_DURABILITY_QOS);
00457 
00458   if (!this->associate(data, active)) {
00459     //FUTURE: inform inforepo and try again as passive peer
00460     if (DCPS_debug_level) {
00461       ACE_DEBUG((LM_ERROR,
00462                  ACE_TEXT("(%P|%t) ReplayerImpl::add_association: ")
00463                  ACE_TEXT("ERROR: transport layer failed to associate.\n")));
00464     }
00465     return;
00466   }
00467 
00468   if (active) {
00469     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00470 
00471     // Have we already received an association_complete() callback?
00472     if (assoc_complete_readers_.count(reader.readerId)) {
00473       assoc_complete_readers_.erase(reader.readerId);
00474       association_complete_i(reader.readerId);
00475 
00476       // Add to pending_readers_ -> pending means we are waiting
00477       // for the association_complete() callback.
00478     } else if (OpenDDS::DCPS::insert(pending_readers_, reader.readerId) == -1) {
00479       GuidConverter converter(reader.readerId);
00480       ACE_ERROR((LM_ERROR,
00481                  ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::add_association: ")
00482                  ACE_TEXT("failed to mark %C as pending.\n"),
00483                  OPENDDS_STRING(converter).c_str()));
00484 
00485     } else {
00486       if (DCPS_debug_level > 0) {
00487         GuidConverter converter(reader.readerId);
00488         ACE_DEBUG((LM_DEBUG,
00489                    ACE_TEXT("(%P|%t) ReplayerImpl::add_association: ")
00490                    ACE_TEXT("marked %C as pending.\n"),
00491                    OPENDDS_STRING(converter).c_str()));
00492       }
00493     }
00494   } else {
00495     // In the current implementation, DataWriter is always active, so this
00496     // code will not be applicable.
00497     Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
00498     disco->association_complete(this->domain_id_,
00499                                 this->participant_servant_->get_id(),
00500                                 this->publication_id_, reader.readerId);
00501   }
00502 }
00503 
00504 
00505 ReplayerImpl::ReaderInfo::ReaderInfo(const char*            filter,
00506                                      const DDS::StringSeq&  params,
00507                                      DomainParticipantImpl* participant,
00508                                      bool                   durable)
00509   : expected_sequence_(SequenceNumber::SEQUENCENUMBER_UNKNOWN())
00510   , durable_(durable)
00511 {
00512   ACE_UNUSED_ARG(filter);
00513   ACE_UNUSED_ARG(params);
00514   ACE_UNUSED_ARG(participant);
00515 }
00516 
00517 
00518 ReplayerImpl::ReaderInfo::~ReaderInfo()
00519 {
00520 }
00521 
00522 
00523 void
00524 ReplayerImpl::association_complete(const RepoId& remote_id)
00525 {
00526   DBG_ENTRY_LVL("ReplayerImpl", "association_complete", 6);
00527 
00528   if (DCPS_debug_level >= 1) {
00529     GuidConverter writer_converter(this->publication_id_);
00530     GuidConverter reader_converter(remote_id);
00531     ACE_DEBUG((LM_DEBUG,
00532                ACE_TEXT("(%P|%t) ReplayerImpl::association_complete - ")
00533                ACE_TEXT("bit %d local %C remote %C\n"),
00534                is_bit_,
00535                OPENDDS_STRING(writer_converter).c_str(),
00536                OPENDDS_STRING(reader_converter).c_str()));
00537   }
00538 
00539   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00540   if (OpenDDS::DCPS::remove(pending_readers_, remote_id) == -1) {
00541     // Not found in pending_readers_, defer calling association_complete_i()
00542     // until add_association() resumes and sees this ID in assoc_complete_readers_.
00543     assoc_complete_readers_.insert(remote_id);
00544   } else {
00545     association_complete_i(remote_id);
00546   }
00547 }
00548 
00549 void
00550 ReplayerImpl::association_complete_i(const RepoId& remote_id)
00551 {
00552   DBG_ENTRY_LVL("ReplayerImpl", "association_complete_i", 6);
00553   // bool reader_durable = false;
00554   {
00555     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00556     if (OpenDDS::DCPS::insert(readers_, remote_id) == -1) {
00557       GuidConverter converter(remote_id);
00558       ACE_ERROR((LM_ERROR,
00559                  ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::association_complete_i: ")
00560                  ACE_TEXT("insert %C from pending failed.\n"),
00561                  OPENDDS_STRING(converter).c_str()));
00562     }
00563     // RepoIdToReaderInfoMap::const_iterator it = reader_info_.find(remote_id);
00564     // if (it != reader_info_.end()) {
00565     //   reader_durable = it->second.durable_;
00566     // }
00567   }
00568 
00569   if (!is_bit_) {
00570 
00571     DDS::InstanceHandle_t handle =
00572       this->participant_servant_->id_to_handle(remote_id);
00573 
00574     {
00575       // protect publication_match_status_ and status changed flags.
00576       ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00577 
00578       // update the publication_match_status_
00579       ++publication_match_status_.total_count;
00580       ++publication_match_status_.total_count_change;
00581       ++publication_match_status_.current_count;
00582       ++publication_match_status_.current_count_change;
00583 
00584       if (OpenDDS::DCPS::bind(id_to_handle_map_, remote_id, handle) != 0) {
00585         GuidConverter converter(remote_id);
00586         ACE_DEBUG((LM_WARNING,
00587                    ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::association_complete_i: ")
00588                    ACE_TEXT("id_to_handle_map_%C = 0x%x failed.\n"),
00589                    OPENDDS_STRING(converter).c_str(),
00590                    handle));
00591         return;
00592 
00593       } else if (DCPS_debug_level > 4) {
00594         GuidConverter converter(remote_id);
00595         ACE_DEBUG((LM_DEBUG,
00596                    ACE_TEXT("(%P|%t) ReplayerImpl::association_complete_i: ")
00597                    ACE_TEXT("id_to_handle_map_%C = 0x%x.\n"),
00598                    OPENDDS_STRING(converter).c_str(),
00599                    handle));
00600       }
00601 
00602       publication_match_status_.last_subscription_handle = handle;
00603 
00604     }
00605 
00606 
00607     if (listener_.in()) {
00608       listener_->on_replayer_matched(this,
00609                                      publication_match_status_);
00610 
00611       // TBD - why does the spec say to change this but not
00612       // change the ChangeFlagStatus after a listener call?
00613       publication_match_status_.total_count_change = 0;
00614       publication_match_status_.current_count_change = 0;
00615     }
00616 
00617   }
00618 
00619 }
00620 
00621 void
00622 ReplayerImpl::remove_associations(const ReaderIdSeq & readers,
00623                                   CORBA::Boolean      notify_lost)
00624 {
00625   if (DCPS_debug_level >= 1) {
00626     GuidConverter writer_converter(publication_id_);
00627     GuidConverter reader_converter(readers[0]);
00628     ACE_DEBUG((LM_DEBUG,
00629                ACE_TEXT("(%P|%t) ReplayerImpl::remove_associations: ")
00630                ACE_TEXT("bit %d local %C remote %C num remotes %d\n"),
00631                is_bit_,
00632                OPENDDS_STRING(writer_converter).c_str(),
00633                OPENDDS_STRING(reader_converter).c_str(),
00634                readers.length()));
00635   }
00636 
00637   this->stop_associating(readers.get_buffer(), readers.length());
00638 
00639   ReaderIdSeq fully_associated_readers;
00640   CORBA::ULong fully_associated_len = 0;
00641   ReaderIdSeq rds;
00642   CORBA::ULong rds_len = 0;
00643   DDS::InstanceHandleSeq handles;
00644 
00645   {
00646     // Ensure the same acquisition order as in wait_for_acknowledgments().
00647     // ACE_GUARD(ACE_SYNCH_MUTEX, wfaGuard, this->wfaLock_);
00648     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00649 
00650     //Remove the readers from fully associated reader list.
00651     //If the supplied reader is not in the cached reader list then it is
00652     //already removed. We just need remove the readers in the list that have
00653     //not been removed.
00654 
00655     CORBA::ULong len = readers.length();
00656 
00657     for (CORBA::ULong i = 0; i < len; ++i) {
00658       //Remove the readers from fully associated reader list. If it's not
00659       //in there, the association_complete() is not called yet and remove it
00660       //from pending list.
00661 
00662       if (OpenDDS::DCPS::remove(readers_, readers[i]) == 0) {
00663         ++fully_associated_len;
00664         fully_associated_readers.length(fully_associated_len);
00665         fully_associated_readers [fully_associated_len - 1] = readers[i];
00666 
00667         // Remove this reader from the ACK sequence map if its there.
00668         // This is where we need to be holding the wfaLock_ obtained
00669         // above.
00670         RepoIdToSequenceMap::iterator where
00671           = this->idToSequence_.find(readers[i]);
00672 
00673         if (where != this->idToSequence_.end()) {
00674           this->idToSequence_.erase(where);
00675 
00676           // It is possible that this subscription was causing the wait
00677           // to continue, so give the opportunity to find out.
00678           // this->wfaCondition_.broadcast();
00679         }
00680 
00681         ++rds_len;
00682         rds.length(rds_len);
00683         rds [rds_len - 1] = readers[i];
00684 
00685       } else if (OpenDDS::DCPS::remove(pending_readers_, readers[i]) == 0) {
00686         ++rds_len;
00687         rds.length(rds_len);
00688         rds [rds_len - 1] = readers[i];
00689 
00690         GuidConverter converter(readers[i]);
00691         ACE_DEBUG((LM_WARNING,
00692                    ACE_TEXT("(%P|%t) WARNING: ReplayerImpl::remove_associations: ")
00693                    ACE_TEXT("removing reader %C before association_complete() call.\n"),
00694                    OPENDDS_STRING(converter).c_str()));
00695       }
00696       reader_info_.erase(readers[i]);
00697       //else reader is already removed which indicates remove_association()
00698       //is called multiple times.
00699     }
00700 
00701     if (fully_associated_len > 0 && !is_bit_) {
00702       // The reader should be in the id_to_handle map at this time so
00703       // log with error.
00704       if (this->lookup_instance_handles(fully_associated_readers, handles) == false) {
00705         ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: ReplayerImpl::remove_associations: "
00706                    "lookup_instance_handles failed, notify %d \n", notify_lost));
00707         return;
00708       }
00709 
00710       for (CORBA::ULong i = 0; i < fully_associated_len; ++i) {
00711         id_to_handle_map_.erase(fully_associated_readers[i]);
00712       }
00713     }
00714 
00715     // wfaGuard.release();
00716 
00717     // Mirror the PUBLICATION_MATCHED_STATUS processing from
00718     // association_complete() here.
00719     if (!this->is_bit_) {
00720 
00721       // Derive the change in the number of subscriptions reading this writer.
00722       int matchedSubscriptions =
00723         static_cast<int>(this->id_to_handle_map_.size());
00724       this->publication_match_status_.current_count_change =
00725         matchedSubscriptions - this->publication_match_status_.current_count;
00726 
00727       // Only process status if the number of subscriptions has changed.
00728       if (this->publication_match_status_.current_count_change != 0) {
00729         this->publication_match_status_.current_count = matchedSubscriptions;
00730 
00731         /// Section 7.1.4.1: total_count will not decrement.
00732 
00733         /// @TODO: Reconcile this with the verbiage in section 7.1.4.1
00734         /// TODO: Should rds_len really be fully_associated_len here??
00735         this->publication_match_status_.last_subscription_handle =
00736           handles[rds_len - 1];
00737 
00738 
00739         if (listener_.in()) {
00740           listener_->on_replayer_matched(
00741             this,
00742             this->publication_match_status_);
00743 
00744           // Listener consumes the change.
00745           this->publication_match_status_.total_count_change = 0;
00746           this->publication_match_status_.current_count_change = 0;
00747         }
00748 
00749       }
00750     }
00751   }
00752 
00753   for (CORBA::ULong i = 0; i < rds.length(); ++i) {
00754     this->disassociate(rds[i]);
00755   }
00756 
00757   // If this remove_association is invoked when the InfoRepo
00758   // detects a lost reader then make a callback to notify
00759   // subscription lost.
00760   if (notify_lost && handles.length() > 0) {
00761     this->notify_publication_lost(handles);
00762   }
00763 }
00764 
00765 void ReplayerImpl::remove_all_associations()
00766 {
00767   this->stop_associating();
00768 
00769   OpenDDS::DCPS::ReaderIdSeq readers;
00770   CORBA::ULong size;
00771   CORBA::ULong num_pending_readers;
00772   {
00773     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, lock_);
00774 
00775     num_pending_readers = static_cast<CORBA::ULong>(pending_readers_.size());
00776     size = static_cast<CORBA::ULong>(readers_.size()) + num_pending_readers;
00777     readers.length(size);
00778 
00779     RepoIdSet::iterator itEnd = readers_.end();
00780     int i = 0;
00781 
00782     for (RepoIdSet::iterator it = readers_.begin(); it != itEnd; ++it) {
00783       readers[i++] = *it;
00784     }
00785 
00786     itEnd = pending_readers_.end();
00787     for (RepoIdSet::iterator it = pending_readers_.begin(); it != itEnd; ++it) {
00788       readers[i++] = *it;
00789     }
00790 
00791     if (num_pending_readers > 0) {
00792       ACE_DEBUG((LM_WARNING,
00793                  ACE_TEXT("(%P|%t) WARNING: ReplayerImpl::remove_all_associations() - ")
00794                  ACE_TEXT("%d subscribers were pending and never fully associated.\n"),
00795                  num_pending_readers));
00796     }
00797   }
00798 
00799   try {
00800     if (0 < size) {
00801       CORBA::Boolean dont_notify_lost = false;
00802       this->remove_associations(readers, dont_notify_lost);
00803     }
00804 
00805   } catch (const CORBA::Exception&) {
00806   }
00807 }
00808 
00809 void
00810 ReplayerImpl::register_for_reader(const RepoId& participant,
00811                                   const RepoId& writerid,
00812                                   const RepoId& readerid,
00813                                   const TransportLocatorSeq& locators,
00814                                   DiscoveryListener* listener)
00815 {
00816   TransportClient::register_for_reader(participant, writerid, readerid, locators, listener);
00817 }
00818 
00819 void
00820 ReplayerImpl::unregister_for_reader(const RepoId& participant,
00821                                     const RepoId& writerid,
00822                                     const RepoId& readerid)
00823 {
00824   TransportClient::unregister_for_reader(participant, writerid, readerid);
00825 }
00826 
00827 void
00828 ReplayerImpl::update_incompatible_qos(const IncompatibleQosStatus& status)
00829 {
00830 
00831 
00832   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00833 
00834   // copy status and increment change
00835   offered_incompatible_qos_status_.total_count = status.total_count;
00836   offered_incompatible_qos_status_.total_count_change +=
00837     status.count_since_last_send;
00838   offered_incompatible_qos_status_.last_policy_id = status.last_policy_id;
00839   offered_incompatible_qos_status_.policies = status.policies;
00840 
00841 }
00842 
00843 void
00844 ReplayerImpl::update_subscription_params(const RepoId&         readerId,
00845                                          const DDS::StringSeq& params)
00846 {
00847   ACE_UNUSED_ARG(readerId);
00848   ACE_UNUSED_ARG(params);
00849 }
00850 
00851 void
00852 ReplayerImpl::inconsistent_topic()
00853 {
00854   topic_servant_->inconsistent_topic();
00855 }
00856 
00857 bool
00858 ReplayerImpl::check_transport_qos(const TransportInst&)
00859 {
00860   // DataWriter does not impose any constraints on which transports
00861   // may be used based on QoS.
00862   return true;
00863 }
00864 
00865 const RepoId&
00866 ReplayerImpl::get_repo_id() const
00867 {
00868   return this->publication_id_;
00869 }
00870 
00871 CORBA::Long
00872 ReplayerImpl::get_priority_value(const AssociationData&) const
00873 {
00874   return this->qos_.transport_priority.value;
00875 }
00876 
00877 void
00878 ReplayerImpl::data_delivered(const DataSampleElement* sample)
00879 {
00880   DBG_ENTRY_LVL("ReplayerImpl","data_delivered",6);
00881   if (!(sample->get_pub_id() == this->publication_id_)) {
00882     GuidConverter sample_converter(sample->get_pub_id());
00883     GuidConverter writer_converter(publication_id_);
00884     ACE_ERROR((LM_ERROR,
00885                ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::data_delivered: ")
00886                ACE_TEXT(" The publication id %C from delivered element ")
00887                ACE_TEXT("does not match the datawriter's id %C\n"),
00888                OPENDDS_STRING(sample_converter).c_str(),
00889                OPENDDS_STRING(writer_converter).c_str()));
00890     return;
00891   }
00892   DataSampleElement* elem = const_cast<DataSampleElement*>(sample);
00893   // this->data_container_->data_delivered(sample);
00894   ACE_DES_FREE(elem, sample_list_element_allocator_->free, DataSampleElement);
00895   ++data_delivered_count_;
00896 
00897   {
00898     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00899     if ((--pending_write_count_) == 0) {
00900       empty_condition_.broadcast();
00901     }
00902   }
00903 }
00904 
00905 void
00906 ReplayerImpl::control_delivered(ACE_Message_Block* sample)
00907 {
00908   ACE_UNUSED_ARG(sample);
00909 }
00910 
00911 void
00912 ReplayerImpl::data_dropped(const DataSampleElement* sample,
00913                            bool                         dropped_by_transport)
00914 {
00915   DBG_ENTRY_LVL("ReplayerImpl","data_dropped",6);
00916   // this->data_container_->data_dropped(element, dropped_by_transport);
00917   ACE_UNUSED_ARG(dropped_by_transport);
00918   DataSampleElement* elem = const_cast<DataSampleElement*>(sample);
00919   ACE_DES_FREE(elem, sample_list_element_allocator_->free, DataSampleElement);
00920   ++data_dropped_count_;
00921   {
00922     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00923     if ((--pending_write_count_) == 0) {
00924       empty_condition_.broadcast();
00925     }
00926   }
00927 }
00928 
00929 void
00930 ReplayerImpl::control_dropped(ACE_Message_Block* sample,
00931                               bool /* dropped_by_transport */)
00932 {
00933   ACE_UNUSED_ARG(sample);
00934 }
00935 
00936 void
00937 ReplayerImpl::notify_publication_disconnected(const ReaderIdSeq& subids)
00938 {
00939   ACE_UNUSED_ARG(subids);
00940 }
00941 
00942 void
00943 ReplayerImpl::notify_publication_reconnected(const ReaderIdSeq& subids)
00944 {
00945   ACE_UNUSED_ARG(subids);
00946 }
00947 
00948 void
00949 ReplayerImpl::notify_publication_lost(const ReaderIdSeq& subids)
00950 {
00951   ACE_UNUSED_ARG(subids);
00952 }
00953 
00954 void
00955 ReplayerImpl::notify_publication_lost(const DDS::InstanceHandleSeq& handles)
00956 {
00957   ACE_UNUSED_ARG(handles);
00958 }
00959 
00960 void
00961 ReplayerImpl::notify_connection_deleted(const RepoId&)
00962 {
00963 }
00964 
00965 void
00966 ReplayerImpl::retrieve_inline_qos_data(TransportSendListener::InlineQosData& qos_data) const
00967 {
00968   qos_data.pub_qos = this->publisher_qos_;
00969   qos_data.dw_qos = this->qos_;
00970   qos_data.topic_name = this->topic_name_.in();
00971 }
00972 
00973 DDS::ReturnCode_t
00974 ReplayerImpl::write (const RawDataSample*   samples,
00975                      int                    num_samples,
00976                      DDS::InstanceHandle_t* reader_ih_ptr)
00977 {
00978   DBG_ENTRY_LVL("ReplayerImpl","write",6);
00979 
00980   OpenDDS::DCPS::RepoId repo_id;
00981   if (reader_ih_ptr) {
00982     repo_id = this->participant_servant_->get_repoid(*reader_ih_ptr);
00983     if (repo_id == GUID_UNKNOWN) {
00984       ACE_ERROR_RETURN((LM_ERROR,
00985                         ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::write: ")
00986                         ACE_TEXT("Invalid reader instance handle (%d)\n"), *reader_ih_ptr),
00987                        DDS::RETCODE_ERROR);
00988     }
00989   }
00990 
00991   SendStateDataSampleList list;
00992 
00993   for (int i = 0; i < num_samples; ++i) {
00994     DataSampleElement* element = 0;
00995 
00996     ACE_NEW_MALLOC_RETURN(
00997       element,
00998       static_cast<DataSampleElement*>(
00999         sample_list_element_allocator_->malloc(
01000           sizeof(DataSampleElement))),
01001       DataSampleElement(publication_id_,
01002                             this,
01003                             0,
01004                             transport_send_element_allocator_.get(),
01005                             transport_customized_element_allocator_.get()),
01006       DDS::RETCODE_ERROR);
01007 
01008     element->get_header().byte_order_ = samples[i].sample_byte_order_;
01009     element->get_header().publication_id_ = this->publication_id_;
01010     list.enqueue_tail(element);
01011     DataSample* temp;
01012     DDS::ReturnCode_t ret = create_sample_data_message(samples[i].sample_->duplicate(),
01013                                                        element->get_header(),
01014                                                        temp,
01015                                                        samples[i].source_timestamp_,
01016                                                        false);
01017     element->set_sample(temp);
01018     if (reader_ih_ptr) {
01019       element->set_num_subs(1);
01020       element->set_sub_id(0, repo_id);
01021     }
01022 
01023     if (ret != DDS::RETCODE_OK) {
01024       // we need to free the list
01025       while (list.dequeue(element)) {
01026         ACE_DES_FREE(element, sample_list_element_allocator_->free, DataSampleElement);
01027       }
01028 
01029       return ret;
01030     }
01031   }
01032 
01033   {
01034     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, DDS::RETCODE_ERROR);
01035     ++pending_write_count_;
01036   }
01037 
01038   this->send(list);
01039 
01040   for (RepoIdToReaderInfoMap::iterator iter = reader_info_.begin(),
01041        end = reader_info_.end(); iter != end; ++iter) {
01042     iter->second.expected_sequence_ = sequence_number_;
01043   }
01044 
01045   return DDS::RETCODE_OK;
01046 }
01047 
01048 DDS::ReturnCode_t
01049 ReplayerImpl::write(const RawDataSample& sample)
01050 {
01051   return this->write(&sample, 1, 0);
01052 }
01053 
01054 DDS::ReturnCode_t
01055 ReplayerImpl::create_sample_data_message(DataSample*         data,
01056                                          DataSampleHeader&   header_data,
01057                                          ACE_Message_Block*& message,
01058                                          const DDS::Time_t&  source_timestamp,
01059                                          bool                content_filter)
01060 {
01061   header_data.message_id_ = SAMPLE_DATA;
01062   header_data.coherent_change_ = content_filter;
01063 
01064   header_data.content_filter_ = 0;
01065   header_data.cdr_encapsulation_ = this->cdr_encapsulation();
01066   header_data.message_length_ = static_cast<ACE_UINT32>(data->total_length());
01067   header_data.sequence_repair_ = need_sequence_repair();
01068   if (this->sequence_number_ == SequenceNumber::SEQUENCENUMBER_UNKNOWN()) {
01069     this->sequence_number_ = SequenceNumber();
01070   } else {
01071     ++this->sequence_number_;
01072   }
01073   header_data.sequence_ = this->sequence_number_;
01074   header_data.source_timestamp_sec_ = source_timestamp.sec;
01075   header_data.source_timestamp_nanosec_ = source_timestamp.nanosec;
01076 
01077   if (qos_.lifespan.duration.sec != DDS::DURATION_INFINITE_SEC
01078       || qos_.lifespan.duration.nanosec != DDS::DURATION_INFINITE_NSEC) {
01079     header_data.lifespan_duration_ = true;
01080     header_data.lifespan_duration_sec_ = qos_.lifespan.duration.sec;
01081     header_data.lifespan_duration_nanosec_ = qos_.lifespan.duration.nanosec;
01082   }
01083 
01084   // header_data.publication_id_ = publication_id_;
01085   // header_data.publisher_id_ = this->publisher_servant_->publisher_id_;
01086   size_t max_marshaled_size = header_data.max_marshaled_size();
01087 
01088   ACE_NEW_MALLOC_RETURN(message,
01089                         static_cast<ACE_Message_Block*>(
01090                           mb_allocator_->malloc(sizeof(ACE_Message_Block))),
01091                         ACE_Message_Block(max_marshaled_size,
01092                                           ACE_Message_Block::MB_DATA,
01093                                           data,   //cont
01094                                           0,   //data
01095                                           header_allocator_.get(),   //alloc_strategy
01096                                           0,   //locking_strategy
01097                                           ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
01098                                           ACE_Time_Value::zero,
01099                                           ACE_Time_Value::max_time,
01100                                           db_allocator_.get(),
01101                                           mb_allocator_.get()),
01102                         DDS::RETCODE_ERROR);
01103 
01104   *message << header_data;
01105   return DDS::RETCODE_OK;
01106 }
01107 
01108 bool
01109 ReplayerImpl::lookup_instance_handles(const ReaderIdSeq&       ids,
01110                                       DDS::InstanceHandleSeq & hdls)
01111 {
01112   if (DCPS_debug_level > 9) {
01113     CORBA::ULong const size = ids.length();
01114     OPENDDS_STRING separator;
01115     OPENDDS_STRING buffer;
01116 
01117     for (unsigned long i = 0; i < size; ++i) {
01118       buffer += separator + OPENDDS_STRING(GuidConverter(ids[i]));
01119       separator = ", ";
01120     }
01121 
01122     ACE_DEBUG((LM_DEBUG,
01123                ACE_TEXT("(%P|%t) DataWriterImpl::lookup_instance_handles: ")
01124                ACE_TEXT("searching for handles for reader Ids: %C.\n"),
01125                buffer.c_str()));
01126   }
01127 
01128   CORBA::ULong const num_rds = ids.length();
01129   hdls.length(num_rds);
01130 
01131   for (CORBA::ULong i = 0; i < num_rds; ++i) {
01132     hdls[i] = this->participant_servant_->id_to_handle(ids[i]);
01133   }
01134 
01135   return true;
01136 }
01137 
01138 bool
01139 ReplayerImpl::need_sequence_repair() const
01140 {
01141   for (RepoIdToReaderInfoMap::const_iterator it = reader_info_.begin(),
01142        end = reader_info_.end(); it != end; ++it) {
01143     if (it->second.expected_sequence_ != sequence_number_) {
01144       return true;
01145     }
01146   }
01147   return false;
01148 }
01149 
01150 DDS::InstanceHandle_t
01151 ReplayerImpl::get_instance_handle()
01152 {
01153   return this->participant_servant_->id_to_handle(publication_id_);
01154 }
01155 
01156 
01157 
01158 
01159 DDS::ReturnCode_t
01160 ReplayerImpl::write_to_reader (DDS::InstanceHandle_t subscription,
01161                                const RawDataSample&  sample )
01162 {
01163   return write(&sample, 1, &subscription);
01164 }
01165 
01166 DDS::ReturnCode_t
01167 ReplayerImpl::write_to_reader (DDS::InstanceHandle_t    subscription,
01168                                const RawDataSampleList& samples )
01169 {
01170   if (samples.size())
01171     return write(&samples[0], static_cast<int>(samples.size()), &subscription);
01172   return DDS::RETCODE_ERROR;
01173 }
01174 
01175 } // namespace DCPS
01176 } // namespace

Generated on Fri Feb 12 20:05:25 2016 for OpenDDS by  doxygen 1.4.7