ReplayerImpl.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
00009 #include "ReplayerImpl.h"
00010 #include "FeatureDisabledQosCheck.h"
00011 #include "DomainParticipantImpl.h"
00012 #include "PublisherImpl.h"
00013 #include "Service_Participant.h"
00014 #include "GuidConverter.h"
00015 #include "TopicImpl.h"
00016 #include "PublicationInstance.h"
00017 #include "SendStateDataSampleList.h"
00018 #include "DataSampleElement.h"
00019 #include "Serializer.h"
00020 #include "Transient_Kludge.h"
00021 #include "DataDurabilityCache.h"
00022 #include "OfferedDeadlineWatchdog.h"
00023 #include "MonitorFactory.h"
00024 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00025 #include "CoherentChangeControl.h"
00026 #endif
00027 #include "AssociationData.h"
00028 
00029 #if !defined (DDS_HAS_MINIMUM_BIT)
00030 #include "BuiltInTopicUtils.h"
00031 #endif // !defined (DDS_HAS_MINIMUM_BIT)
00032 
00033 #include "Util.h"
00034 
00035 #include "dds/DCPS/transport/framework/EntryExit.h"
00036 #include "dds/DCPS/transport/framework/TransportExceptions.h"
00037 #include "dds/DCPS/transport/framework/TransportSendElement.h"
00038 #include "dds/DCPS/transport/framework/TransportCustomizedElement.h"
00039 
00040 #include "ace/Reactor.h"
00041 #include "ace/Auto_Ptr.h"
00042 
00043 #include <stdexcept>
00044 
00045 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00046 
00047 namespace OpenDDS {
00048 namespace DCPS {
00049 
00050 
00051 ReplayerImpl::ReplayerImpl()
00052   : data_dropped_count_(0),
00053   data_delivered_count_(0),
00054   n_chunks_(TheServiceParticipant->n_chunks()),
00055   association_chunk_multiplier_(TheServiceParticipant->association_chunk_multiplier()),
00056   qos_(TheServiceParticipant->initial_DataWriterQos()),
00057   participant_servant_(0),
00058   topic_id_(GUID_UNKNOWN),
00059   topic_servant_(0),
00060   listener_mask_(DEFAULT_STATUS_MASK),
00061   domain_id_(0),
00062   publisher_servant_(0),
00063   publication_id_(GUID_UNKNOWN),
00064   sequence_number_(SequenceNumber::SEQUENCENUMBER_UNKNOWN()),
00065   // data_container_(0),
00066   // liveliness_lost_(false),
00067   // last_deadline_missed_total_count_(0),
00068   is_bit_(false),
00069   empty_condition_(lock_),
00070   pending_write_count_(0)
00071 {
00072   // liveliness_lost_status_.total_count = 0;
00073   // liveliness_lost_status_.total_count_change = 0;
00074   //
00075   // offered_deadline_missed_status_.total_count = 0;
00076   // offered_deadline_missed_status_.total_count_change = 0;
00077   // offered_deadline_missed_status_.last_instance_handle = DDS::HANDLE_NIL;
00078 
00079   offered_incompatible_qos_status_.total_count = 0;
00080   offered_incompatible_qos_status_.total_count_change = 0;
00081   offered_incompatible_qos_status_.last_policy_id = 0;
00082   offered_incompatible_qos_status_.policies.length(0);
00083 
00084   publication_match_status_.total_count = 0;
00085   publication_match_status_.total_count_change = 0;
00086   publication_match_status_.current_count = 0;
00087   publication_match_status_.current_count_change = 0;
00088   publication_match_status_.last_subscription_handle = DDS::HANDLE_NIL;
00089 
00090 }
00091 
00092 // This method is called when there are no longer any reference to the
00093 // the servant.
00094 ReplayerImpl::~ReplayerImpl()
00095 {
00096   DBG_ENTRY_LVL("ReplayerImpl","~ReplayerImpl",6);
00097 }
00098 
00099 // this method is called when delete_datawriter is called.
00100 DDS::ReturnCode_t
00101 ReplayerImpl::cleanup()
00102 {
00103 
00104   //     // Unregister all registered instances prior to deletion.
00105   //     // DDS::Time_t source_timestamp = time_value_to_time(ACE_OS::gettimeofday());
00106   //     // this->unregister_instances(source_timestamp);
00107   //
00108   //     // CORBA::String_var topic_name = this->get_Atopic_name();
00109   {
00110     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, DDS::RETCODE_ERROR);
00111 
00112     // Wait for pending samples to drain prior to removing associations
00113     // and unregistering the publication.
00114     while (this->pending_write_count_)
00115       this->empty_condition_.wait();
00116 
00117     // Call remove association before unregistering the datawriter
00118     // with the transport, otherwise some callbacks resulted from
00119     // remove_association may lost.
00120     this->remove_all_associations();
00121 
00122     // release our Topic_var
00123     topic_objref_ = DDS::Topic::_nil();
00124     topic_servant_ = 0;
00125 
00126   }
00127 
00128   // not just unregister but remove any pending writes/sends.
00129   // this->unregister_all();
00130 
00131   Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
00132   if (!disco->remove_publication(
00133         this->domain_id_,
00134         this->participant_servant_->get_id(),
00135         this->publication_id_)) {
00136     ACE_ERROR_RETURN((LM_ERROR,
00137                       ACE_TEXT("(%P|%t) ERROR: ")
00138                       ACE_TEXT("PublisherImpl::delete_datawriter, ")
00139                       ACE_TEXT("publication not removed from discovery.\n")),
00140                      DDS::RETCODE_ERROR);
00141   }
00142   return DDS::RETCODE_OK;
00143 }
00144 
00145 void
00146 ReplayerImpl::init(
00147   DDS::Topic_ptr                         topic,
00148   TopicImpl *                            topic_servant,
00149   const DDS::DataWriterQos &             qos,
00150   ReplayerListener_rch                   a_listener,
00151   const DDS::StatusMask &                mask,
00152   OpenDDS::DCPS::DomainParticipantImpl * participant_servant,
00153   const DDS::PublisherQos&               publisher_qos)
00154 {
00155   DBG_ENTRY_LVL("ReplayerImpl","init",6);
00156   topic_objref_ = DDS::Topic::_duplicate(topic);
00157   topic_servant_ = topic_servant;
00158   topic_name_    = topic_servant_->get_name();
00159   topic_id_      = topic_servant_->get_id();
00160   type_name_     = topic_servant_->get_type_name();
00161 
00162 #if !defined (DDS_HAS_MINIMUM_BIT)
00163   is_bit_ = topicIsBIT(topic_name_.in(), type_name_.in());
00164 #endif   // !defined (DDS_HAS_MINIMUM_BIT)
00165 
00166   qos_ = qos;
00167 
00168   //Note: OK to _duplicate(nil).
00169   listener_ = a_listener;
00170   listener_mask_ = mask;
00171 
00172   // Only store the participant pointer, since it is our "grand"
00173   // parent, we will exist as long as it does.
00174   participant_servant_ = participant_servant;
00175   domain_id_ = participant_servant_->get_domain_id();
00176 
00177   publisher_qos_ = publisher_qos;
00178 }
00179 
00180 
00181 DDS::ReturnCode_t ReplayerImpl::set_qos (const DDS::PublisherQos &  publisher_qos,
00182                                          const DDS::DataWriterQos & qos)
00183 {
00184 
00185   OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK(publisher_qos, DDS::RETCODE_UNSUPPORTED);
00186 
00187   if (Qos_Helper::valid(publisher_qos) && Qos_Helper::consistent(publisher_qos)) {
00188     if (publisher_qos_ == publisher_qos)
00189       return DDS::RETCODE_OK;
00190 
00191     // for the not changeable qos, it can be changed before enable
00192     if (!Qos_Helper::changeable(publisher_qos_, publisher_qos) && enabled_ == true) {
00193       return DDS::RETCODE_IMMUTABLE_POLICY;
00194 
00195     } else {
00196       publisher_qos_ = publisher_qos;
00197     }
00198   } else {
00199     return DDS::RETCODE_INCONSISTENT_POLICY;
00200   }
00201 
00202   OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00203   OPENDDS_NO_OWNERSHIP_STRENGTH_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00204   OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00205   OPENDDS_NO_DURABILITY_SERVICE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00206   OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00207 
00208   if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
00209     if (qos_ == qos)
00210       return DDS::RETCODE_OK;
00211 
00212     if (!Qos_Helper::changeable(qos_, qos) && enabled_ == true) {
00213       return DDS::RETCODE_IMMUTABLE_POLICY;
00214 
00215     } else {
00216       Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
00217       // DDS::PublisherQos publisherQos;
00218       // this->publisher_servant_->get_qos(publisherQos);
00219       DDS::PublisherQos publisherQos = this->publisher_qos_;
00220       const bool status
00221         = disco->update_publication_qos(this->participant_servant_->get_domain_id(),
00222                                         this->participant_servant_->get_id(),
00223                                         this->publication_id_,
00224                                         qos,
00225                                         publisherQos);
00226 
00227       if (!status) {
00228         ACE_ERROR_RETURN((LM_ERROR,
00229                           ACE_TEXT("(%P|%t) DataWriterImpl::set_qos, ")
00230                           ACE_TEXT("qos not updated. \n")),
00231                          DDS::RETCODE_ERROR);
00232       }
00233     }
00234 
00235     if (!(qos_ == qos)) {
00236       // Reset the deadline timer if the period has changed.
00237       // if (qos_.deadline.period.sec != qos.deadline.period.sec
00238       //     || qos_.deadline.period.nanosec != qos.deadline.period.nanosec) {
00239       //   if (qos_.deadline.period.sec == DDS::DURATION_INFINITE_SEC
00240       //       && qos_.deadline.period.nanosec == DDS::DURATION_INFINITE_NSEC) {
00241       //     ACE_auto_ptr_reset(this->watchdog_,
00242       //                        new OfferedDeadlineWatchdog(
00243       //                          this->reactor_,
00244       //                          this->lock_,
00245       //                          qos.deadline,
00246       //                          this,
00247       //                          this,
00248       //                          this->offered_deadline_missed_status_,
00249       //                          this->last_deadline_missed_total_count_));
00250       //
00251       //   } else if (qos.deadline.period.sec == DDS::DURATION_INFINITE_SEC
00252       //              && qos.deadline.period.nanosec == DDS::DURATION_INFINITE_NSEC) {
00253       //     this->watchdog_->cancel_all();
00254       //     this->watchdog_.reset();
00255       //
00256       //   } else {
00257       //     this->watchdog_->reset_interval(
00258       //       duration_to_time_value(qos.deadline.period));
00259       //   }
00260       // }
00261 
00262       qos_ = qos;
00263     }
00264 
00265     return DDS::RETCODE_OK;
00266 
00267   } else {
00268     return DDS::RETCODE_INCONSISTENT_POLICY;
00269   }
00270 }
00271 
00272 DDS::ReturnCode_t ReplayerImpl::get_qos (DDS::PublisherQos &  publisher_qos,
00273                                          DDS::DataWriterQos & qos)
00274 {
00275   qos = qos_;
00276   publisher_qos = publisher_qos_;
00277   return DDS::RETCODE_OK;
00278 }
00279 
00280 
00281 DDS::ReturnCode_t ReplayerImpl::set_listener (const ReplayerListener_rch & a_listener,
00282                                               DDS::StatusMask              mask)
00283 {
00284   listener_ = a_listener;
00285   listener_mask_ = mask;
00286   return DDS::RETCODE_OK;
00287 }
00288 
00289 ReplayerListener_rch ReplayerImpl::get_listener ()
00290 {
00291   return listener_;
00292 }
00293 
00294 DDS::ReturnCode_t
00295 ReplayerImpl::enable()
00296 {
00297   //According spec:
00298   // - Calling enable on an already enabled Entity returns OK and has no
00299   // effect.
00300   // - Calling enable on an Entity whose factory is not enabled will fail
00301   // and return PRECONDITION_NOT_MET.
00302 
00303   if (this->is_enabled()) {
00304     return DDS::RETCODE_OK;
00305   }
00306 
00307   // if (this->publisher_servant_->is_enabled() == false) {
00308   //   return DDS::RETCODE_PRECONDITION_NOT_MET;
00309   // }
00310   //
00311   const bool reliable = qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS;
00312 
00313   if (qos_.resource_limits.max_samples != DDS::LENGTH_UNLIMITED) {
00314     n_chunks_ = qos_.resource_limits.max_samples;
00315   }
00316   // +1 because we might allocate one before releasing another
00317   // TBD - see if this +1 can be removed.
00318   mb_allocator_.reset(new MessageBlockAllocator(n_chunks_ * association_chunk_multiplier_));
00319   db_allocator_.reset(new DataBlockAllocator(n_chunks_+1));
00320   header_allocator_.reset(new DataSampleHeaderAllocator(n_chunks_+1));
00321 
00322   sample_list_element_allocator_.reset(new DataSampleElementAllocator(2 * n_chunks_));
00323 
00324 
00325   if (DCPS_debug_level >= 2) {
00326     ACE_DEBUG((LM_DEBUG,
00327                "(%P|%t) ReplayerImpl::enable-mb"
00328                " Cached_Allocator_With_Overflow %x with %d chunks\n",
00329                mb_allocator_.get(),
00330                n_chunks_));
00331 
00332     ACE_DEBUG((LM_DEBUG,
00333                "(%P|%t) ReplayerImpl::enable-db"
00334                " Cached_Allocator_With_Overflow %x with %d chunks\n",
00335                db_allocator_.get(),
00336                n_chunks_));
00337 
00338     ACE_DEBUG((LM_DEBUG,
00339                "(%P|%t) ReplayerImpl::enable-header"
00340                " Cached_Allocator_With_Overflow %x with %d chunks\n",
00341                header_allocator_.get(),
00342                n_chunks_));
00343   }
00344 
00345   this->set_enabled();
00346 
00347   try {
00348     this->enable_transport(reliable,
00349                            this->qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS);
00350 
00351   } catch (const Transport::Exception&) {
00352     ACE_ERROR((LM_ERROR,
00353                ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::enable, ")
00354                ACE_TEXT("Transport Exception.\n")));
00355     return DDS::RETCODE_ERROR;
00356 
00357   }
00358 
00359   const TransportLocatorSeq& trans_conf_info = connection_info();
00360 
00361 
00362   Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
00363   this->publication_id_ =
00364     disco->add_publication(this->domain_id_,
00365                            this->participant_servant_->get_id(),
00366                            this->topic_servant_->get_id(),
00367                            this,
00368                            this->qos_,
00369                            trans_conf_info,
00370                            this->publisher_qos_);
00371 
00372   if (this->publication_id_ == GUID_UNKNOWN) {
00373     ACE_ERROR((LM_ERROR,
00374                ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::enable, ")
00375                ACE_TEXT("add_publication returned invalid id. \n")));
00376     return DDS::RETCODE_ERROR;
00377   }
00378 
00379   return DDS::RETCODE_OK;
00380 }
00381 
00382 
00383 
00384 void
00385 ReplayerImpl::add_association(const RepoId&            yourId,
00386                               const ReaderAssociation& reader,
00387                               bool                     active)
00388 {
00389   DBG_ENTRY_LVL("ReplayerImpl", "add_association", 6);
00390 
00391   if (DCPS_debug_level >= 1) {
00392     GuidConverter writer_converter(yourId);
00393     GuidConverter reader_converter(reader.readerId);
00394     ACE_DEBUG((LM_DEBUG,
00395                ACE_TEXT("(%P|%t) ReplayerImpl::add_association - ")
00396                ACE_TEXT("bit %d local %C remote %C\n"),
00397                is_bit_,
00398                OPENDDS_STRING(writer_converter).c_str(),
00399                OPENDDS_STRING(reader_converter).c_str()));
00400   }
00401 
00402   // if (entity_deleted_ == true) {
00403   //   if (DCPS_debug_level >= 1)
00404   //     ACE_DEBUG((LM_DEBUG,
00405   //                ACE_TEXT("(%P|%t) ReplayerImpl::add_association")
00406   //                ACE_TEXT(" This is a deleted datawriter, ignoring add.\n")));
00407   //
00408   //   return;
00409   // }
00410 
00411   if (GUID_UNKNOWN == publication_id_) {
00412     publication_id_ = yourId;
00413   }
00414 
00415   {
00416     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00417     reader_info_.insert(std::make_pair(reader.readerId,
00418                                        ReaderInfo(TheServiceParticipant->publisher_content_filter() ? reader.filterExpression : "",
00419                                                   reader.exprParams, participant_servant_,
00420                                                   reader.readerQos.durability.kind > DDS::VOLATILE_DURABILITY_QOS)));
00421   }
00422 
00423   if (DCPS_debug_level > 4) {
00424     GuidConverter converter(publication_id_);
00425     ACE_DEBUG((LM_DEBUG,
00426                ACE_TEXT("(%P|%t) ReplayerImpl::add_association(): ")
00427                ACE_TEXT("adding subscription to publication %C with priority %d.\n"),
00428                OPENDDS_STRING(converter).c_str(),
00429                qos_.transport_priority.value));
00430   }
00431 
00432   AssociationData data;
00433   data.remote_id_ = reader.readerId;
00434   data.remote_data_ = reader.readerTransInfo;
00435   data.remote_reliable_ =
00436     (reader.readerQos.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS);
00437   data.remote_durable_ =
00438     (reader.readerQos.durability.kind > DDS::VOLATILE_DURABILITY_QOS);
00439 
00440   if (!this->associate(data, active)) {
00441     //FUTURE: inform inforepo and try again as passive peer
00442     if (DCPS_debug_level) {
00443       ACE_ERROR((LM_ERROR,
00444                  ACE_TEXT("(%P|%t) ReplayerImpl::add_association: ")
00445                  ACE_TEXT("ERROR: transport layer failed to associate.\n")));
00446     }
00447     return;
00448   }
00449 
00450   if (active) {
00451     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00452 
00453     // Have we already received an association_complete() callback?
00454     if (assoc_complete_readers_.count(reader.readerId)) {
00455       assoc_complete_readers_.erase(reader.readerId);
00456       association_complete_i(reader.readerId);
00457 
00458       // Add to pending_readers_ -> pending means we are waiting
00459       // for the association_complete() callback.
00460     } else if (OpenDDS::DCPS::insert(pending_readers_, reader.readerId) == -1) {
00461       GuidConverter converter(reader.readerId);
00462       ACE_ERROR((LM_ERROR,
00463                  ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::add_association: ")
00464                  ACE_TEXT("failed to mark %C as pending.\n"),
00465                  OPENDDS_STRING(converter).c_str()));
00466 
00467     } else {
00468       if (DCPS_debug_level > 0) {
00469         GuidConverter converter(reader.readerId);
00470         ACE_DEBUG((LM_DEBUG,
00471                    ACE_TEXT("(%P|%t) ReplayerImpl::add_association: ")
00472                    ACE_TEXT("marked %C as pending.\n"),
00473                    OPENDDS_STRING(converter).c_str()));
00474       }
00475     }
00476   } else {
00477     // In the current implementation, DataWriter is always active, so this
00478     // code will not be applicable.
00479     Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
00480     disco->association_complete(this->domain_id_,
00481                                 this->participant_servant_->get_id(),
00482                                 this->publication_id_, reader.readerId);
00483   }
00484 }
00485 
00486 
00487 ReplayerImpl::ReaderInfo::ReaderInfo(const char*            filter,
00488                                      const DDS::StringSeq&  params,
00489                                      DomainParticipantImpl* participant,
00490                                      bool                   durable)
00491   : expected_sequence_(SequenceNumber::SEQUENCENUMBER_UNKNOWN())
00492   , durable_(durable)
00493 {
00494   ACE_UNUSED_ARG(filter);
00495   ACE_UNUSED_ARG(params);
00496   ACE_UNUSED_ARG(participant);
00497 }
00498 
00499 
00500 ReplayerImpl::ReaderInfo::~ReaderInfo()
00501 {
00502 }
00503 
00504 
00505 void
00506 ReplayerImpl::association_complete(const RepoId& remote_id)
00507 {
00508   DBG_ENTRY_LVL("ReplayerImpl", "association_complete", 6);
00509 
00510   if (DCPS_debug_level >= 1) {
00511     GuidConverter writer_converter(this->publication_id_);
00512     GuidConverter reader_converter(remote_id);
00513     ACE_DEBUG((LM_DEBUG,
00514                ACE_TEXT("(%P|%t) ReplayerImpl::association_complete - ")
00515                ACE_TEXT("bit %d local %C remote %C\n"),
00516                is_bit_,
00517                OPENDDS_STRING(writer_converter).c_str(),
00518                OPENDDS_STRING(reader_converter).c_str()));
00519   }
00520 
00521   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00522   if (OpenDDS::DCPS::remove(pending_readers_, remote_id) == -1) {
00523     // Not found in pending_readers_, defer calling association_complete_i()
00524     // until add_association() resumes and sees this ID in assoc_complete_readers_.
00525     assoc_complete_readers_.insert(remote_id);
00526   } else {
00527     association_complete_i(remote_id);
00528   }
00529 }
00530 
00531 void
00532 ReplayerImpl::association_complete_i(const RepoId& remote_id)
00533 {
00534   DBG_ENTRY_LVL("ReplayerImpl", "association_complete_i", 6);
00535   // bool reader_durable = false;
00536   {
00537     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00538     if (OpenDDS::DCPS::insert(readers_, remote_id) == -1) {
00539       GuidConverter converter(remote_id);
00540       ACE_ERROR((LM_ERROR,
00541                  ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::association_complete_i: ")
00542                  ACE_TEXT("insert %C from pending failed.\n"),
00543                  OPENDDS_STRING(converter).c_str()));
00544     }
00545     // RepoIdToReaderInfoMap::const_iterator it = reader_info_.find(remote_id);
00546     // if (it != reader_info_.end()) {
00547     //   reader_durable = it->second.durable_;
00548     // }
00549   }
00550 
00551   if (!is_bit_) {
00552 
00553     DDS::InstanceHandle_t handle =
00554       this->participant_servant_->id_to_handle(remote_id);
00555 
00556     {
00557       // protect publication_match_status_ and status changed flags.
00558       ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00559 
00560       // update the publication_match_status_
00561       ++publication_match_status_.total_count;
00562       ++publication_match_status_.total_count_change;
00563       ++publication_match_status_.current_count;
00564       ++publication_match_status_.current_count_change;
00565 
00566       if (OpenDDS::DCPS::bind(id_to_handle_map_, remote_id, handle) != 0) {
00567         GuidConverter converter(remote_id);
00568         ACE_DEBUG((LM_WARNING,
00569                    ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::association_complete_i: ")
00570                    ACE_TEXT("id_to_handle_map_%C = 0x%x failed.\n"),
00571                    OPENDDS_STRING(converter).c_str(),
00572                    handle));
00573         return;
00574 
00575       } else if (DCPS_debug_level > 4) {
00576         GuidConverter converter(remote_id);
00577         ACE_DEBUG((LM_DEBUG,
00578                    ACE_TEXT("(%P|%t) ReplayerImpl::association_complete_i: ")
00579                    ACE_TEXT("id_to_handle_map_%C = 0x%x.\n"),
00580                    OPENDDS_STRING(converter).c_str(),
00581                    handle));
00582       }
00583 
00584       publication_match_status_.last_subscription_handle = handle;
00585 
00586     }
00587 
00588 
00589     if (listener_.in()) {
00590       listener_->on_replayer_matched(this,
00591                                      publication_match_status_);
00592 
00593       // TBD - why does the spec say to change this but not
00594       // change the ChangeFlagStatus after a listener call?
00595       publication_match_status_.total_count_change = 0;
00596       publication_match_status_.current_count_change = 0;
00597     }
00598 
00599   }
00600 
00601 }
00602 
00603 void
00604 ReplayerImpl::remove_associations(const ReaderIdSeq & readers,
00605                                   CORBA::Boolean      notify_lost)
00606 {
00607   if (DCPS_debug_level >= 1) {
00608     GuidConverter writer_converter(publication_id_);
00609     GuidConverter reader_converter(readers[0]);
00610     ACE_DEBUG((LM_DEBUG,
00611                ACE_TEXT("(%P|%t) ReplayerImpl::remove_associations: ")
00612                ACE_TEXT("bit %d local %C remote %C num remotes %d\n"),
00613                is_bit_,
00614                OPENDDS_STRING(writer_converter).c_str(),
00615                OPENDDS_STRING(reader_converter).c_str(),
00616                readers.length()));
00617   }
00618 
00619   this->stop_associating(readers.get_buffer(), readers.length());
00620 
00621   ReaderIdSeq fully_associated_readers;
00622   CORBA::ULong fully_associated_len = 0;
00623   ReaderIdSeq rds;
00624   CORBA::ULong rds_len = 0;
00625   DDS::InstanceHandleSeq handles;
00626 
00627   {
00628     // Ensure the same acquisition order as in wait_for_acknowledgments().
00629     // ACE_GUARD(ACE_SYNCH_MUTEX, wfaGuard, this->wfaLock_);
00630     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00631 
00632     //Remove the readers from fully associated reader list.
00633     //If the supplied reader is not in the cached reader list then it is
00634     //already removed. We just need remove the readers in the list that have
00635     //not been removed.
00636 
00637     CORBA::ULong len = readers.length();
00638 
00639     for (CORBA::ULong i = 0; i < len; ++i) {
00640       //Remove the readers from fully associated reader list. If it's not
00641       //in there, the association_complete() is not called yet and remove it
00642       //from pending list.
00643 
00644       if (OpenDDS::DCPS::remove(readers_, readers[i]) == 0) {
00645         ++fully_associated_len;
00646         fully_associated_readers.length(fully_associated_len);
00647         fully_associated_readers [fully_associated_len - 1] = readers[i];
00648 
00649         // Remove this reader from the ACK sequence map if its there.
00650         // This is where we need to be holding the wfaLock_ obtained
00651         // above.
00652         RepoIdToSequenceMap::iterator where
00653           = this->idToSequence_.find(readers[i]);
00654 
00655         if (where != this->idToSequence_.end()) {
00656           this->idToSequence_.erase(where);
00657 
00658           // It is possible that this subscription was causing the wait
00659           // to continue, so give the opportunity to find out.
00660           // this->wfaCondition_.broadcast();
00661         }
00662 
00663         ++rds_len;
00664         rds.length(rds_len);
00665         rds [rds_len - 1] = readers[i];
00666 
00667       } else if (OpenDDS::DCPS::remove(pending_readers_, readers[i]) == 0) {
00668         ++rds_len;
00669         rds.length(rds_len);
00670         rds [rds_len - 1] = readers[i];
00671 
00672         GuidConverter converter(readers[i]);
00673         ACE_DEBUG((LM_WARNING,
00674                    ACE_TEXT("(%P|%t) WARNING: ReplayerImpl::remove_associations: ")
00675                    ACE_TEXT("removing reader %C before association_complete() call.\n"),
00676                    OPENDDS_STRING(converter).c_str()));
00677       }
00678       reader_info_.erase(readers[i]);
00679       //else reader is already removed which indicates remove_association()
00680       //is called multiple times.
00681     }
00682 
00683     if (fully_associated_len > 0 && !is_bit_) {
00684       // The reader should be in the id_to_handle map at this time
00685       this->lookup_instance_handles(fully_associated_readers, handles);
00686 
00687       for (CORBA::ULong i = 0; i < fully_associated_len; ++i) {
00688         id_to_handle_map_.erase(fully_associated_readers[i]);
00689       }
00690     }
00691 
00692     // wfaGuard.release();
00693 
00694     // Mirror the PUBLICATION_MATCHED_STATUS processing from
00695     // association_complete() here.
00696     if (!this->is_bit_) {
00697 
00698       // Derive the change in the number of subscriptions reading this writer.
00699       int matchedSubscriptions =
00700         static_cast<int>(this->id_to_handle_map_.size());
00701       this->publication_match_status_.current_count_change =
00702         matchedSubscriptions - this->publication_match_status_.current_count;
00703 
00704       // Only process status if the number of subscriptions has changed.
00705       if (this->publication_match_status_.current_count_change != 0) {
00706         this->publication_match_status_.current_count = matchedSubscriptions;
00707 
00708         /// Section 7.1.4.1: total_count will not decrement.
00709 
00710         /// @TODO: Reconcile this with the verbiage in section 7.1.4.1
00711         /// TODO: Should rds_len really be fully_associated_len here??
00712         this->publication_match_status_.last_subscription_handle =
00713           handles[rds_len - 1];
00714 
00715 
00716         if (listener_.in()) {
00717           listener_->on_replayer_matched(
00718             this,
00719             this->publication_match_status_);
00720 
00721           // Listener consumes the change.
00722           this->publication_match_status_.total_count_change = 0;
00723           this->publication_match_status_.current_count_change = 0;
00724         }
00725 
00726       }
00727     }
00728   }
00729 
00730   for (CORBA::ULong i = 0; i < rds.length(); ++i) {
00731     this->disassociate(rds[i]);
00732   }
00733 
00734   // If this remove_association is invoked when the InfoRepo
00735   // detects a lost reader then make a callback to notify
00736   // subscription lost.
00737   if (notify_lost && handles.length() > 0) {
00738     this->notify_publication_lost(handles);
00739   }
00740 }
00741 
00742 void ReplayerImpl::remove_all_associations()
00743 {
00744   this->stop_associating();
00745 
00746   OpenDDS::DCPS::ReaderIdSeq readers;
00747   CORBA::ULong size;
00748   CORBA::ULong num_pending_readers;
00749   {
00750     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, lock_);
00751 
00752     num_pending_readers = static_cast<CORBA::ULong>(pending_readers_.size());
00753     size = static_cast<CORBA::ULong>(readers_.size()) + num_pending_readers;
00754     readers.length(size);
00755 
00756     RepoIdSet::iterator itEnd = readers_.end();
00757     int i = 0;
00758 
00759     for (RepoIdSet::iterator it = readers_.begin(); it != itEnd; ++it) {
00760       readers[i++] = *it;
00761     }
00762 
00763     itEnd = pending_readers_.end();
00764     for (RepoIdSet::iterator it = pending_readers_.begin(); it != itEnd; ++it) {
00765       readers[i++] = *it;
00766     }
00767 
00768     if (num_pending_readers > 0) {
00769       ACE_DEBUG((LM_WARNING,
00770                  ACE_TEXT("(%P|%t) WARNING: ReplayerImpl::remove_all_associations() - ")
00771                  ACE_TEXT("%d subscribers were pending and never fully associated.\n"),
00772                  num_pending_readers));
00773     }
00774   }
00775 
00776   try {
00777     if (0 < size) {
00778       CORBA::Boolean dont_notify_lost = false;
00779       this->remove_associations(readers, dont_notify_lost);
00780     }
00781 
00782   } catch (const CORBA::Exception&) {
00783   }
00784 }
00785 
00786 void
00787 ReplayerImpl::register_for_reader(const RepoId& participant,
00788                                   const RepoId& writerid,
00789                                   const RepoId& readerid,
00790                                   const TransportLocatorSeq& locators,
00791                                   DiscoveryListener* listener)
00792 {
00793   TransportClient::register_for_reader(participant, writerid, readerid, locators, listener);
00794 }
00795 
00796 void
00797 ReplayerImpl::unregister_for_reader(const RepoId& participant,
00798                                     const RepoId& writerid,
00799                                     const RepoId& readerid)
00800 {
00801   TransportClient::unregister_for_reader(participant, writerid, readerid);
00802 }
00803 
00804 void
00805 ReplayerImpl::update_incompatible_qos(const IncompatibleQosStatus& status)
00806 {
00807 
00808 
00809   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00810 
00811   // copy status and increment change
00812   offered_incompatible_qos_status_.total_count = status.total_count;
00813   offered_incompatible_qos_status_.total_count_change +=
00814     status.count_since_last_send;
00815   offered_incompatible_qos_status_.last_policy_id = status.last_policy_id;
00816   offered_incompatible_qos_status_.policies = status.policies;
00817 
00818 }
00819 
00820 void
00821 ReplayerImpl::update_subscription_params(const RepoId&         readerId,
00822                                          const DDS::StringSeq& params)
00823 {
00824   ACE_UNUSED_ARG(readerId);
00825   ACE_UNUSED_ARG(params);
00826 }
00827 
00828 void
00829 ReplayerImpl::inconsistent_topic()
00830 {
00831   topic_servant_->inconsistent_topic();
00832 }
00833 
00834 bool
00835 ReplayerImpl::check_transport_qos(const TransportInst&)
00836 {
00837   // DataWriter does not impose any constraints on which transports
00838   // may be used based on QoS.
00839   return true;
00840 }
00841 
00842 const RepoId&
00843 ReplayerImpl::get_repo_id() const
00844 {
00845   return this->publication_id_;
00846 }
00847 
00848 CORBA::Long
00849 ReplayerImpl::get_priority_value(const AssociationData&) const
00850 {
00851   return this->qos_.transport_priority.value;
00852 }
00853 
00854 void
00855 ReplayerImpl::data_delivered(const DataSampleElement* sample)
00856 {
00857   DBG_ENTRY_LVL("ReplayerImpl","data_delivered",6);
00858   if (!(sample->get_pub_id() == this->publication_id_)) {
00859     GuidConverter sample_converter(sample->get_pub_id());
00860     GuidConverter writer_converter(publication_id_);
00861     ACE_ERROR((LM_ERROR,
00862                ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::data_delivered: ")
00863                ACE_TEXT(" The publication id %C from delivered element ")
00864                ACE_TEXT("does not match the datawriter's id %C\n"),
00865                OPENDDS_STRING(sample_converter).c_str(),
00866                OPENDDS_STRING(writer_converter).c_str()));
00867     return;
00868   }
00869   DataSampleElement* elem = const_cast<DataSampleElement*>(sample);
00870   // this->data_container_->data_delivered(sample);
00871   ACE_DES_FREE(elem, sample_list_element_allocator_->free, DataSampleElement);
00872   ++data_delivered_count_;
00873 
00874   {
00875     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00876     if ((--pending_write_count_) == 0) {
00877       empty_condition_.broadcast();
00878     }
00879   }
00880 }
00881 
00882 void
00883 ReplayerImpl::control_delivered(const Message_Block_Ptr& sample)
00884 {
00885   ACE_UNUSED_ARG(sample);
00886 }
00887 
00888 void
00889 ReplayerImpl::data_dropped(const DataSampleElement* sample,
00890                            bool                         dropped_by_transport)
00891 {
00892   DBG_ENTRY_LVL("ReplayerImpl","data_dropped",6);
00893   // this->data_container_->data_dropped(element, dropped_by_transport);
00894   ACE_UNUSED_ARG(dropped_by_transport);
00895   DataSampleElement* elem = const_cast<DataSampleElement*>(sample);
00896   ACE_DES_FREE(elem, sample_list_element_allocator_->free, DataSampleElement);
00897   ++data_dropped_count_;
00898   {
00899     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00900     if ((--pending_write_count_) == 0) {
00901       empty_condition_.broadcast();
00902     }
00903   }
00904 }
00905 
00906 void
00907 ReplayerImpl::control_dropped(const Message_Block_Ptr& sample,
00908                               bool /* dropped_by_transport */)
00909 {
00910   ACE_UNUSED_ARG(sample);
00911 }
00912 
00913 void
00914 ReplayerImpl::notify_publication_disconnected(const ReaderIdSeq& subids)
00915 {
00916   ACE_UNUSED_ARG(subids);
00917 }
00918 
00919 void
00920 ReplayerImpl::notify_publication_reconnected(const ReaderIdSeq& subids)
00921 {
00922   ACE_UNUSED_ARG(subids);
00923 }
00924 
00925 void
00926 ReplayerImpl::notify_publication_lost(const ReaderIdSeq& subids)
00927 {
00928   ACE_UNUSED_ARG(subids);
00929 }
00930 
00931 void
00932 ReplayerImpl::notify_publication_lost(const DDS::InstanceHandleSeq& handles)
00933 {
00934   ACE_UNUSED_ARG(handles);
00935 }
00936 
00937 
00938 void
00939 ReplayerImpl::retrieve_inline_qos_data(TransportSendListener::InlineQosData& qos_data) const
00940 {
00941   qos_data.pub_qos = this->publisher_qos_;
00942   qos_data.dw_qos = this->qos_;
00943   qos_data.topic_name = this->topic_name_.in();
00944 }
00945 
00946 DDS::ReturnCode_t
00947 ReplayerImpl::write (const RawDataSample*   samples,
00948                      int                    num_samples,
00949                      DDS::InstanceHandle_t* reader_ih_ptr)
00950 {
00951   DBG_ENTRY_LVL("ReplayerImpl","write",6);
00952 
00953   OpenDDS::DCPS::RepoId repo_id;
00954   if (reader_ih_ptr) {
00955     repo_id = this->participant_servant_->get_repoid(*reader_ih_ptr);
00956     if (repo_id == GUID_UNKNOWN) {
00957       ACE_ERROR_RETURN((LM_ERROR,
00958                         ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::write: ")
00959                         ACE_TEXT("Invalid reader instance handle (%d)\n"), *reader_ih_ptr),
00960                        DDS::RETCODE_ERROR);
00961     }
00962   }
00963 
00964   SendStateDataSampleList list;
00965 
00966   for (int i = 0; i < num_samples; ++i) {
00967     DataSampleElement* element = 0;
00968 
00969     ACE_NEW_MALLOC_RETURN(
00970       element,
00971       static_cast<DataSampleElement*>(
00972         sample_list_element_allocator_->malloc(
00973           sizeof(DataSampleElement))),
00974       DataSampleElement(publication_id_,
00975                             this,
00976                             PublicationInstance_rch()),
00977       DDS::RETCODE_ERROR);
00978 
00979     element->get_header().byte_order_ = samples[i].sample_byte_order_;
00980     element->get_header().publication_id_ = this->publication_id_;
00981     list.enqueue_tail(element);
00982     Message_Block_Ptr temp;
00983     Message_Block_Ptr sample(samples[i].sample_->duplicate());
00984     DDS::ReturnCode_t ret = create_sample_data_message(move(sample),
00985                                                        element->get_header(),
00986                                                        temp,
00987                                                        samples[i].source_timestamp_,
00988                                                        false);
00989     element->set_sample(move(temp));
00990     if (reader_ih_ptr) {
00991       element->set_num_subs(1);
00992       element->set_sub_id(0, repo_id);
00993     }
00994 
00995     if (ret != DDS::RETCODE_OK) {
00996       // we need to free the list
00997       while (list.dequeue(element)) {
00998         ACE_DES_FREE(element, sample_list_element_allocator_->free, DataSampleElement);
00999       }
01000 
01001       return ret;
01002     }
01003   }
01004 
01005   {
01006     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, DDS::RETCODE_ERROR);
01007     ++pending_write_count_;
01008   }
01009 
01010   this->send(list);
01011 
01012   for (RepoIdToReaderInfoMap::iterator iter = reader_info_.begin(),
01013        end = reader_info_.end(); iter != end; ++iter) {
01014     iter->second.expected_sequence_ = sequence_number_;
01015   }
01016 
01017   return DDS::RETCODE_OK;
01018 }
01019 
01020 DDS::ReturnCode_t
01021 ReplayerImpl::write(const RawDataSample& sample)
01022 {
01023   return this->write(&sample, 1, 0);
01024 }
01025 
01026 DDS::ReturnCode_t
01027 ReplayerImpl::create_sample_data_message(Message_Block_Ptr   data,
01028                                          DataSampleHeader&   header_data,
01029                                          Message_Block_Ptr&  message,
01030                                          const DDS::Time_t&  source_timestamp,
01031                                          bool                content_filter)
01032 {
01033   header_data.message_id_ = SAMPLE_DATA;
01034   header_data.coherent_change_ = content_filter;
01035 
01036   header_data.content_filter_ = 0;
01037   header_data.cdr_encapsulation_ = this->cdr_encapsulation();
01038   header_data.message_length_ = static_cast<ACE_UINT32>(data->total_length());
01039   header_data.sequence_repair_ = need_sequence_repair();
01040   if (this->sequence_number_ == SequenceNumber::SEQUENCENUMBER_UNKNOWN()) {
01041     this->sequence_number_ = SequenceNumber();
01042   } else {
01043     ++this->sequence_number_;
01044   }
01045   header_data.sequence_ = this->sequence_number_;
01046   header_data.source_timestamp_sec_ = source_timestamp.sec;
01047   header_data.source_timestamp_nanosec_ = source_timestamp.nanosec;
01048 
01049   if (qos_.lifespan.duration.sec != DDS::DURATION_INFINITE_SEC
01050       || qos_.lifespan.duration.nanosec != DDS::DURATION_INFINITE_NSEC) {
01051     header_data.lifespan_duration_ = true;
01052     header_data.lifespan_duration_sec_ = qos_.lifespan.duration.sec;
01053     header_data.lifespan_duration_nanosec_ = qos_.lifespan.duration.nanosec;
01054   }
01055 
01056   // header_data.publication_id_ = publication_id_;
01057   // header_data.publisher_id_ = this->publisher_servant_->publisher_id_;
01058   size_t max_marshaled_size = header_data.max_marshaled_size();
01059   ACE_Message_Block* tmp;
01060   ACE_NEW_MALLOC_RETURN(tmp,
01061                         static_cast<ACE_Message_Block*>(
01062                           mb_allocator_->malloc(sizeof(ACE_Message_Block))),
01063                         ACE_Message_Block(max_marshaled_size,
01064                                           ACE_Message_Block::MB_DATA,
01065                                           data.release(),   //cont
01066                                           0,   //data
01067                                           header_allocator_.get(),   //alloc_strategy
01068                                           0,   //locking_strategy
01069                                           ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
01070                                           ACE_Time_Value::zero,
01071                                           ACE_Time_Value::max_time,
01072                                           db_allocator_.get(),
01073                                           mb_allocator_.get()),
01074                         DDS::RETCODE_ERROR);
01075   message.reset(tmp);
01076   *message << header_data;
01077   return DDS::RETCODE_OK;
01078 }
01079 
01080 void
01081 ReplayerImpl::lookup_instance_handles(const ReaderIdSeq&       ids,
01082                                       DDS::InstanceHandleSeq & hdls)
01083 {
01084   CORBA::ULong const num_rds = ids.length();
01085 
01086   if (DCPS_debug_level > 9) {
01087     OPENDDS_STRING separator;
01088     OPENDDS_STRING buffer;
01089 
01090     for (CORBA::ULong i = 0; i < num_rds; ++i) {
01091       buffer += separator + OPENDDS_STRING(GuidConverter(ids[i]));
01092       separator = ", ";
01093     }
01094 
01095     ACE_DEBUG((LM_DEBUG,
01096                ACE_TEXT("(%P|%t) ReplayerImpl::lookup_instance_handles: ")
01097                ACE_TEXT("searching for handles for reader Ids: %C.\n"),
01098                buffer.c_str()));
01099   }
01100 
01101   hdls.length(num_rds);
01102 
01103   for (CORBA::ULong i = 0; i < num_rds; ++i) {
01104     hdls[i] = this->participant_servant_->id_to_handle(ids[i]);
01105   }
01106 }
01107 
01108 bool
01109 ReplayerImpl::need_sequence_repair() const
01110 {
01111   for (RepoIdToReaderInfoMap::const_iterator it = reader_info_.begin(),
01112        end = reader_info_.end(); it != end; ++it) {
01113     if (it->second.expected_sequence_ != sequence_number_) {
01114       return true;
01115     }
01116   }
01117   return false;
01118 }
01119 
01120 DDS::InstanceHandle_t
01121 ReplayerImpl::get_instance_handle()
01122 {
01123   return this->participant_servant_->id_to_handle(publication_id_);
01124 }
01125 
01126 DDS::ReturnCode_t
01127 ReplayerImpl::write_to_reader (DDS::InstanceHandle_t subscription,
01128                                const RawDataSample&  sample )
01129 {
01130   return write(&sample, 1, &subscription);
01131 }
01132 
01133 DDS::ReturnCode_t
01134 ReplayerImpl::write_to_reader (DDS::InstanceHandle_t    subscription,
01135                                const RawDataSampleList& samples )
01136 {
01137   if (samples.size())
01138     return write(&samples[0], static_cast<int>(samples.size()), &subscription);
01139   return DDS::RETCODE_ERROR;
01140 }
01141 
01142 } // namespace DCPS
01143 } // namespace
01144 
01145 OPENDDS_END_VERSIONED_NAMESPACE_DECL
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1