RecorderImpl.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
00009 #include "tao/ORB_Core.h"
00010 #include "SubscriptionInstance.h"
00011 #include "ReceivedDataElementList.h"
00012 #include "DomainParticipantImpl.h"
00013 #include "Service_Participant.h"
00014 #include "Qos_Helper.h"
00015 #include "FeatureDisabledQosCheck.h"
00016 #include "GuidConverter.h"
00017 #include "TopicImpl.h"
00018 #include "Serializer.h"
00019 #include "SubscriberImpl.h"
00020 #include "Transient_Kludge.h"
00021 #include "Util.h"
00022 #include "RequestedDeadlineWatchdog.h"
00023 #include "QueryConditionImpl.h"
00024 #include "ReadConditionImpl.h"
00025 #include "MonitorFactory.h"
00026 #include "dds/DCPS/transport/framework/EntryExit.h"
00027 #include "dds/DCPS/transport/framework/TransportExceptions.h"
00028 #include "dds/DdsDcpsCoreC.h"
00029 #include "dds/DdsDcpsGuidTypeSupportImpl.h"
00030 #include "dds/DCPS/SafetyProfileStreams.h"
00031 #if !defined (DDS_HAS_MINIMUM_BIT)
00032 #include "BuiltInTopicUtils.h"
00033 #include "dds/DdsDcpsCoreTypeSupportC.h"
00034 #endif // !defined (DDS_HAS_MINIMUM_BIT)
00035 #include "RecorderImpl.h"
00036 #include "PoolAllocator.h"
00037 
00038 #include "ace/Reactor.h"
00039 #include "ace/Auto_Ptr.h"
00040 
00041 #include <stdexcept>
00042 
00043 
00044 namespace OpenDDS {
00045 namespace DCPS {
00046 
00047 RecorderImpl::RecorderImpl()
00048   : qos_(TheServiceParticipant->initial_DataReaderQos()),
00049   participant_servant_(0),
00050   topic_servant_(0),
00051 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00052   is_exclusive_ownership_ (false),
00053 #endif
00054 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00055   owner_manager_ (0),
00056 #endif
00057   subqos_ (TheServiceParticipant->initial_SubscriberQos()),
00058   topic_desc_(0),
00059   listener_mask_(DEFAULT_STATUS_MASK),
00060   domain_id_(0),
00061   remove_association_sweeper_(
00062     new RemoveAssociationSweeper<RecorderImpl>(TheServiceParticipant->reactor(),
00063                                          TheServiceParticipant->reactor_owner(),
00064                                          this)),
00065   is_bit_(false)
00066 {
00067 
00068   requested_incompatible_qos_status_.total_count = 0;
00069   requested_incompatible_qos_status_.total_count_change = 0;
00070   requested_incompatible_qos_status_.last_policy_id = 0;
00071   requested_incompatible_qos_status_.policies.length(0);
00072 
00073   subscription_match_status_.total_count = 0;
00074   subscription_match_status_.total_count_change = 0;
00075   subscription_match_status_.current_count = 0;
00076   subscription_match_status_.current_count_change = 0;
00077   subscription_match_status_.last_publication_handle =
00078     DDS::HANDLE_NIL;
00079 
00080 }
00081 
00082 // This method is called when there are no longer any reference to the
00083 // the servant.
00084 RecorderImpl::~RecorderImpl()
00085 {
00086   DBG_ENTRY_LVL("RecorderImpl","~RecorderImpl",6);
00087   {
00088     ACE_READ_GUARD(ACE_RW_Thread_Mutex,
00089                    read_guard,
00090                    this->writers_lock_);
00091     // Cancel any uncancelled sweeper timers to decrement reference count.
00092     WriterMapType::iterator writer;
00093     for (writer = writers_.begin(); writer != writers_.end(); ++writer) {
00094       remove_association_sweeper_->cancel_timer(writer->second);
00095     }
00096   }
00097 
00098   remove_association_sweeper_->wait();
00099   remove_association_sweeper_->destroy();
00100 }
00101 
00102 
00103 DDS::ReturnCode_t
00104 RecorderImpl::cleanup()
00105 {
00106 
00107   Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
00108   if (!disco->remove_subscription(this->domain_id_,
00109                                   participant_servant_->get_id(),
00110                                   this->subscription_id_)) {
00111     ACE_ERROR_RETURN((LM_ERROR,
00112                       ACE_TEXT("(%P|%t) ERROR: ")
00113                       ACE_TEXT("RecorderImpl::cleanup: ")
00114                       ACE_TEXT(" could not remove subscription from discovery.\n")),
00115                      DDS::RETCODE_ERROR);
00116   }
00117 
00118   // Call remove association before unregistering the datareader from the transport,
00119   // otherwise some callbacks resulted from remove_association may lost.
00120 
00121   this->remove_all_associations();
00122 
00123   if (topic_servant_) {
00124     topic_servant_->remove_entity_ref();
00125     topic_servant_->_remove_ref();
00126   }
00127   {
00128     ACE_READ_GUARD_RETURN(ACE_RW_Thread_Mutex,
00129                    read_guard,
00130                    this->writers_lock_,
00131                    0);
00132     // Cancel any uncancelled sweeper timers
00133     WriterMapType::iterator writer;
00134     for (writer = writers_.begin(); writer != writers_.end(); ++writer) {
00135       remove_association_sweeper_->cancel_timer(writer->second);
00136     }
00137   }
00138 
00139   remove_association_sweeper_->wait();
00140   return DDS::RETCODE_OK;
00141 }
00142 
00143 void RecorderImpl::init(
00144   TopicDescriptionImpl*      a_topic_desc,
00145   const DDS::DataReaderQos & qos,
00146   RecorderListener_rch       a_listener,
00147   const DDS::StatusMask &    mask,
00148   DomainParticipantImpl*     participant,
00149   DDS::SubscriberQos         subqos)
00150 {
00151   //
00152   if (DCPS_debug_level >= 1) {
00153 
00154     ACE_DEBUG((LM_DEBUG,
00155                ACE_TEXT("(%P|%t) RecorderImpl::init \n")));
00156   }
00157 
00158 
00159   topic_desc_ = DDS::TopicDescription::_duplicate(a_topic_desc);
00160   if (TopicImpl* a_topic = dynamic_cast<TopicImpl*>(a_topic_desc)) {
00161     topic_servant_ = a_topic;
00162     topic_servant_->_add_ref();
00163 
00164     topic_servant_->add_entity_ref();
00165   }
00166 
00167   CORBA::String_var topic_name = a_topic_desc->get_name();
00168 
00169 #if !defined (DDS_HAS_MINIMUM_BIT)
00170   is_bit_ = ACE_OS::strcmp(topic_name.in(), BUILT_IN_PARTICIPANT_TOPIC) == 0
00171             || ACE_OS::strcmp(topic_name.in(), BUILT_IN_TOPIC_TOPIC) == 0
00172             || ACE_OS::strcmp(topic_name.in(), BUILT_IN_SUBSCRIPTION_TOPIC) == 0
00173             || ACE_OS::strcmp(topic_name.in(), BUILT_IN_PUBLICATION_TOPIC) == 0;
00174 #endif   // !defined (DDS_HAS_MINIMUM_BIT)
00175 
00176   qos_ = qos;
00177 
00178 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00179   is_exclusive_ownership_ = this->qos_.ownership.kind == ::DDS::EXCLUSIVE_OWNERSHIP_QOS;
00180 #endif
00181 
00182   listener_ = a_listener;
00183   listener_mask_ = mask;
00184 
00185   // Only store the participant pointer, since it is our "grand"
00186   // parent, we will exist as long as it does
00187   participant_servant_ = participant;
00188 
00189 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00190   if (is_exclusive_ownership_) {
00191     owner_manager_ = participant_servant_->ownership_manager ();
00192   }
00193 #endif
00194 
00195   domain_id_ = participant_servant_->get_domain_id();
00196   subqos_ = subqos;
00197 }
00198 
00199 bool RecorderImpl::check_transport_qos(const TransportInst& ti)
00200 {
00201   if (this->qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS) {
00202     return ti.is_reliable();
00203   }
00204   return true;
00205 }
00206 
00207 const RepoId& RecorderImpl::get_repo_id() const
00208 {
00209   return this->subscription_id_;
00210 }
00211 
00212 CORBA::Long RecorderImpl::get_priority_value(const AssociationData& data) const
00213 {
00214   return data.publication_transport_priority_;
00215 }
00216 
00217 
00218 void RecorderImpl::data_received(const ReceivedDataSample& sample)
00219 {
00220   DBG_ENTRY_LVL("RecorderImpl","data_received",6);
00221 
00222   // ensure some other thread is not changing the sample container
00223   // or statuses related to samples.
00224   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_);
00225 
00226   if (DCPS_debug_level > 9) {
00227     GuidConverter converter(subscription_id_);
00228     ACE_DEBUG((LM_DEBUG,
00229                ACE_TEXT("(%P|%t) RecorderImpl::data_received: ")
00230                ACE_TEXT("%C received sample: %C.\n"),
00231                OPENDDS_STRING(converter).c_str(),
00232                to_string(sample.header_).c_str()));
00233   }
00234 
00235   // we only support SAMPLE_DATA messages
00236   if (sample.header_.message_id_ != SAMPLE_DATA)
00237     return;
00238 
00239   RawDataSample rawSample(static_cast<MessageId> (sample.header_.message_id_),
00240                           sample.header_.source_timestamp_sec_,
00241                           sample.header_.source_timestamp_nanosec_,
00242                           sample.header_.publication_id_,
00243                           sample.header_.byte_order_,
00244                           sample.sample_);
00245 
00246   if (listener_.in()) {
00247     listener_->on_sample_data_received(this, rawSample);
00248   }
00249 
00250 }
00251 
00252 void RecorderImpl::notify_subscription_disconnected(const WriterIdSeq&)
00253 {
00254 }
00255 
00256 void RecorderImpl::notify_subscription_reconnected(const WriterIdSeq&)
00257 {
00258 
00259 }
00260 
00261 void
00262 RecorderImpl::notify_subscription_lost(const DDS::InstanceHandleSeq&)
00263 {
00264 }
00265 
00266 void RecorderImpl::notify_subscription_lost(const WriterIdSeq&)
00267 {
00268 
00269 }
00270 
00271 void RecorderImpl::notify_connection_deleted(const RepoId&)
00272 {
00273 
00274 }
00275 
00276 
00277 
00278 void
00279 RecorderImpl::add_association(const RepoId&            yourId,
00280                               const WriterAssociation& writer,
00281                               bool                     active)
00282 {
00283     ACE_DEBUG((LM_DEBUG, "RecorderImpl::add_association\n"));
00284   //
00285   // The following block is for diagnostic purposes only.
00286   //
00287   if (DCPS_debug_level >= 1) {
00288     GuidConverter reader_converter(yourId);
00289     GuidConverter writer_converter(writer.writerId);
00290     ACE_DEBUG((LM_DEBUG,
00291                ACE_TEXT("(%P|%t) RecorderImpl::add_association - ")
00292                ACE_TEXT("bit %d local %C remote %C\n"),
00293                is_bit_,
00294                OPENDDS_STRING(reader_converter).c_str(),
00295                OPENDDS_STRING(writer_converter).c_str()));
00296   }
00297 
00298   //
00299   // This block prevents adding associations to deleted readers.
00300   // Presumably this is a "good thing(tm)".
00301   //
00302   // if (entity_deleted_ == true) {
00303   //   if (DCPS_debug_level >= 1)
00304   //     ACE_DEBUG((LM_DEBUG,
00305   //                ACE_TEXT("(%P|%t) RecorderImpl::add_association")
00306   //                ACE_TEXT(" This is a deleted datareader, ignoring add.\n")));
00307   //
00308   //   return;
00309   // }
00310 
00311   //
00312   // We are being called back from the repository before we are done
00313   // processing after our call to the repository that caused this call
00314   // (from the repository) to be made.
00315   //
00316   if (GUID_UNKNOWN == subscription_id_) {
00317     // add_associations was invoked before DCSPInfoRepo::add_subscription() returned.
00318     subscription_id_ = yourId;
00319   }
00320 
00321   //
00322   // We do the following while holding the publication_handle_lock_.
00323   //
00324   {
00325     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->publication_handle_lock_);
00326 
00327     //
00328     // For each writer in the list of writers to associate with, we
00329     // create a WriterInfo and a WriterStats object and store them in
00330     // our internal maps.
00331     //
00332     {
00333       ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_);
00334 
00335       const PublicationId& writer_id = writer.writerId;
00336       RcHandle<WriterInfo> info = new WriterInfo(this, writer_id, writer.writerQos);
00337       /*std::pair<WriterMapType::iterator, bool> bpair =*/
00338       this->writers_.insert(
00339         // This insertion is idempotent.
00340         WriterMapType::value_type(
00341           writer_id,
00342           info));
00343       // this->statistics_.insert(
00344       //   StatsMapType::value_type(
00345       //     writer_id,
00346       //     WriterStats(
00347       //       this->raw_latency_buffer_size_,
00348       //       this->raw_latency_buffer_type_)));
00349 
00350       // if (DCPS_debug_level > 4) {
00351       //   GuidConverter converter(writer_id);
00352       //   ACE_DEBUG((LM_DEBUG,
00353       //              "(%P|%t) RecorderImpl::add_association: "
00354       //              "inserted writer %C.return %d \n",
00355       //              OPENDDS_STRING(converter).c_str(), bpair.second));
00356       //
00357       //   WriterMapType::iterator iter = writers_.find(writer_id);
00358       //   if (iter != writers_.end()) {
00359       //     // This may not be an error since it could happen that the sample
00360       //     // is delivered to the datareader after the write is dis-associated
00361       //     // with this datareader.
00362       //     GuidConverter reader_converter(subscription_id_);
00363       //     GuidConverter writer_converter(writer_id);
00364       //     ACE_DEBUG((LM_DEBUG,
00365       //               ACE_TEXT("(%P|%t) RecorderImpl::add_association: ")
00366       //               ACE_TEXT("reader %C is associated with writer %C.\n"),
00367       //               OPENDDS_STRING(reader_converter).c_str(),
00368       //               OPENDDS_STRING(writer_converter).c_str()));
00369       //   }
00370       // }
00371     }
00372 
00373     //
00374     // Propagate the add_associations processing down into the Transport
00375     // layer here.  This will establish the transport support and reserve
00376     // usage of an existing connection or initiate creation of a new
00377     // connection if no suitable connection is available.
00378     //
00379     AssociationData data;
00380     data.remote_id_ = writer.writerId;
00381     data.remote_data_ = writer.writerTransInfo;
00382     data.publication_transport_priority_ =
00383       writer.writerQos.transport_priority.value;
00384     data.remote_reliable_ =
00385       (writer.writerQos.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS);
00386     data.remote_durable_ =
00387       (writer.writerQos.durability.kind > DDS::VOLATILE_DURABILITY_QOS);
00388 
00389     if (!this->associate(data, active)) {
00390       if (DCPS_debug_level) {
00391         ACE_DEBUG((LM_ERROR,
00392                    ACE_TEXT("(%P|%t) RecorderImpl::add_association: ")
00393                    ACE_TEXT("ERROR: transport layer failed to associate.\n")));
00394       }
00395       return;
00396     }
00397 
00398     // Check if any publications have already sent a REQUEST_ACK message.
00399     // {
00400     //   ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
00401     //
00402     //   WriterMapType::iterator where = this->writers_.find(writer.writerId);
00403     //
00404     //   if (where != this->writers_.end()) {
00405     //     const ACE_Time_Value now = ACE_OS::gettimeofday();
00406     //
00407     //     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_);
00408     //
00409     //     if (where->second->should_ack(now)) {
00410     //       const SequenceNumber sequence = where->second->ack_sequence();
00411     //       const DDS::Time_t timenow = time_value_to_time(now);
00412     //       if (this->send_sample_ack(writer.writerId, sequence, timenow)) {
00413     //         where->second->clear_acks(sequence);
00414     //       }
00415     //     }
00416     //   }
00417     // }
00418 
00419     //
00420     // LIVELINESS policy timers are managed here.
00421     //
00422     // if (liveliness_lease_duration_ != ACE_Time_Value::zero) {
00423     //   // this call will start the timer if it is not already set
00424     //   const ACE_Time_Value now = ACE_OS::gettimeofday();
00425     //
00426     //   if (DCPS_debug_level >= 5) {
00427     //     GuidConverter converter(subscription_id_);
00428     //     ACE_DEBUG((LM_DEBUG,
00429     //                ACE_TEXT("(%P|%t) RecorderImpl::add_association: ")
00430     //                ACE_TEXT("starting/resetting liveliness timer for reader %C\n"),
00431     //                OPENDDS_STRING(converter).c_str()));
00432     //   }
00433     //
00434     //   this->handle_timeout(now, this);
00435     // }
00436 
00437     // else - no timer needed when LIVELINESS.lease_duration is INFINITE
00438 
00439   }
00440   //
00441   // We no longer hold the publication_handle_lock_.
00442   //
00443 
00444   //
00445   // We only do the following processing for readers that are *not*
00446   // readers of Builtin Topics.
00447   //
00448   if (!is_bit_) {
00449 
00450     DDS::InstanceHandle_t handle =
00451       this->participant_servant_->id_to_handle(writer.writerId);
00452 
00453     //
00454     // We acquire the publication_handle_lock_ for the remainder of our
00455     // processing.
00456     //
00457     {
00458       ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->publication_handle_lock_);
00459 
00460       // This insertion is idempotent.
00461       this->id_to_handle_map_.insert(
00462         RepoIdToHandleMap::value_type(writer.writerId, handle));
00463 
00464       if (DCPS_debug_level > 4) {
00465         GuidConverter converter(writer.writerId);
00466         ACE_DEBUG((LM_DEBUG,
00467                    ACE_TEXT("(%P|%t) RecorderImpl::add_association: ")
00468                    ACE_TEXT("id_to_handle_map_[ %C] = 0x%x.\n"),
00469                    OPENDDS_STRING(converter).c_str(),
00470                    handle));
00471       }
00472 
00473       // We need to adjust these after the insertions have all completed
00474       // since insertions are not guaranteed to increase the number of
00475       // currently matched publications.
00476       int matchedPublications = static_cast<int>(this->id_to_handle_map_.size());
00477       this->subscription_match_status_.current_count_change
00478         = matchedPublications - this->subscription_match_status_.current_count;
00479       this->subscription_match_status_.current_count = matchedPublications;
00480 
00481       ++this->subscription_match_status_.total_count;
00482       ++this->subscription_match_status_.total_count_change;
00483 
00484       this->subscription_match_status_.last_publication_handle = handle;
00485 
00486       // set_status_changed_flag(DDS::SUBSCRIPTION_MATCHED_STATUS, true);
00487 
00488 
00489       if (listener_.in()) {
00490         listener_->on_recorder_matched(
00491           this,
00492           this->subscription_match_status_);
00493 
00494         // TBD - why does the spec say to change this but not change
00495         //       the ChangeFlagStatus after a listener call?
00496 
00497         // Client will look at it so next time it looks the change should be 0
00498         this->subscription_match_status_.total_count_change = 0;
00499         this->subscription_match_status_.current_count_change = 0;
00500       }
00501 
00502       // notify_status_condition();
00503     }
00504 
00505     {
00506       ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_);
00507 
00508       this->writers_[writer.writerId]->handle_ = handle;
00509     }
00510   }
00511 
00512   if (!active) {
00513     Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
00514     disco->association_complete(this->domain_id_,
00515                                 this->participant_servant_->get_id(),
00516                                 this->subscription_id_, writer.writerId);
00517   }
00518 
00519   // if (this->monitor_) {
00520   //   this->monitor_->report();
00521   // }
00522 }
00523 
00524 void
00525 RecorderImpl::association_complete(const RepoId& /*remote_id*/)
00526 {
00527   // For the current DCPSInfoRepo implementation, the DataReader side will
00528   // always be passive, so association_complete() will not be called.
00529 }
00530 
00531 void
00532 RecorderImpl::remove_associations(const WriterIdSeq& writers,
00533                                   bool               notify_lost)
00534 {
00535   DBG_ENTRY_LVL("RecorderImpl", "remove_associations", 6);
00536   if (writers.length() == 0) {
00537     return;
00538   }
00539 
00540   if (DCPS_debug_level >= 1) {
00541     GuidConverter reader_converter(subscription_id_);
00542     GuidConverter writer_converter(writers[0]);
00543     ACE_DEBUG((LM_DEBUG,
00544                ACE_TEXT("(%P|%t) RecorderImpl::remove_associations: ")
00545                ACE_TEXT("bit %d local %C remote %C num remotes %d \n"),
00546                is_bit_,
00547                OPENDDS_STRING(reader_converter).c_str(),
00548                OPENDDS_STRING(writer_converter).c_str(),
00549                writers.length()));
00550   }
00551   if (!this->entity_deleted_.value()) {
00552     // stop pending associations for these writer ids
00553     this->stop_associating(writers.get_buffer(), writers.length());
00554 
00555     // writers which are considered non-active and can
00556     // be removed immediately
00557     WriterIdSeq non_active_writers;
00558     {
00559       CORBA::ULong wr_len = writers.length();
00560       ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_);
00561 
00562       for (CORBA::ULong i = 0; i < wr_len; i++) {
00563         PublicationId writer_id = writers[i];
00564 
00565         WriterMapType::iterator it = this->writers_.find(writer_id);
00566         if (it != this->writers_.end() &&
00567             it->second->active(TheServiceParticipant->pending_timeout())) {
00568           remove_association_sweeper_->schedule_timer(it->second, notify_lost);
00569         } else {
00570           push_back(non_active_writers, writer_id);
00571         }
00572       }
00573     }
00574     remove_associations_i(non_active_writers, notify_lost);
00575   } else {
00576     remove_associations_i(writers, notify_lost);
00577   }
00578 }
00579 
00580 void
00581 RecorderImpl::remove_or_reschedule(const PublicationId& pub_id)
00582 {
00583   ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_);
00584   WriterMapType::iterator where = writers_.find(pub_id);
00585   if (writers_.end() != where) {
00586     WriterInfo& info = *where->second;
00587     WriterIdSeq writers;
00588     push_back(writers, pub_id);
00589     bool notify = info.notify_lost_;
00590     if (info.removal_deadline_ < ACE_OS::gettimeofday()) {
00591       write_guard.release();
00592       remove_associations_i(writers, notify);
00593     } else {
00594       write_guard.release();
00595       remove_associations(writers, notify);
00596     }
00597   }
00598 }
00599 
00600 void
00601 RecorderImpl::remove_associations_i(const WriterIdSeq& writers,
00602     bool notify_lost)
00603 {
00604   DBG_ENTRY_LVL("RecorderImpl", "remove_associations_i", 6);
00605 
00606   if (writers.length() == 0) {
00607     return;
00608   }
00609 
00610   if (DCPS_debug_level >= 1) {
00611     GuidConverter reader_converter(subscription_id_);
00612     GuidConverter writer_converter(writers[0]);
00613     ACE_DEBUG((LM_DEBUG,
00614                ACE_TEXT("(%P|%t) RecorderImpl::remove_associations_i: ")
00615                ACE_TEXT("bit %d local %C remote %C num remotes %d \n"),
00616                is_bit_,
00617                OPENDDS_STRING(reader_converter).c_str(),
00618                OPENDDS_STRING(writer_converter).c_str(),
00619                writers.length()));
00620   }
00621   DDS::InstanceHandleSeq handles;
00622 
00623   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->publication_handle_lock_);
00624 
00625   // This is used to hold the list of writers which were actually
00626   // removed, which is a proper subset of the writers which were
00627   // requested to be removed.
00628   WriterIdSeq updated_writers;
00629 
00630   CORBA::ULong wr_len;
00631 
00632   //Remove the writers from writer list. If the supplied writer
00633   //is not in the cached writers list then it is already removed.
00634   //We just need remove the writers in the list that have not been
00635   //removed.
00636   {
00637     ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_);
00638 
00639     wr_len = writers.length();
00640 
00641     for (CORBA::ULong i = 0; i < wr_len; i++) {
00642       PublicationId writer_id = writers[i];
00643 
00644       WriterMapType::iterator it = this->writers_.find(writer_id);
00645 
00646       if (it != this->writers_.end()) {
00647         it->second->removed();
00648         remove_association_sweeper_->cancel_timer(it->second);
00649       }
00650 
00651       if (this->writers_.erase(writer_id) == 0) {
00652         if (DCPS_debug_level >= 1) {
00653           GuidConverter converter(writer_id);
00654           ACE_DEBUG((LM_DEBUG,
00655                      ACE_TEXT("(%P|%t) RecorderImpl::remove_associations_i: ")
00656                      ACE_TEXT("the writer local %C was already removed.\n"),
00657                      OPENDDS_STRING(converter).c_str()));
00658         }
00659 
00660       } else {
00661         push_back(updated_writers, writer_id);
00662       }
00663     }
00664   }
00665 
00666   wr_len = updated_writers.length();
00667 
00668   // Return now if the supplied writers have been removed already.
00669   if (wr_len == 0) {
00670     return;
00671   }
00672 
00673   if (!is_bit_) {
00674     // The writer should be in the id_to_handle map at this time.  Note
00675     // it if it not there.
00676     if (this->lookup_instance_handles(updated_writers, handles) == false) {
00677       if (DCPS_debug_level > 4) {
00678         ACE_DEBUG((LM_DEBUG,
00679                    ACE_TEXT("(%P|%t) RecorderImpl::remove_associations_i: ")
00680                    ACE_TEXT("lookup_instance_handles failed.\n")));
00681       }
00682     }
00683     for (CORBA::ULong i = 0; i < wr_len; ++i) {
00684       id_to_handle_map_.erase(updated_writers[i]);
00685     }
00686   }
00687   for (CORBA::ULong i = 0; i < updated_writers.length(); ++i) {
00688     this->disassociate(updated_writers[i]);
00689   }
00690 
00691   // Mirror the add_associations SUBSCRIPTION_MATCHED_STATUS processing.
00692   if (!this->is_bit_) {
00693     // Derive the change in the number of publications writing to this reader.
00694     int matchedPublications = static_cast<int>(this->id_to_handle_map_.size());
00695     this->subscription_match_status_.current_count_change
00696       = matchedPublications - this->subscription_match_status_.current_count;
00697 
00698     // Only process status if the number of publications has changed.
00699     if (this->subscription_match_status_.current_count_change != 0) {
00700       this->subscription_match_status_.current_count = matchedPublications;
00701       /// Section 7.1.4.1: total_count will not decrement.
00702 
00703       /// @TODO: Reconcile this with the verbiage in section 7.1.4.1
00704       this->subscription_match_status_.last_publication_handle
00705         = handles[ wr_len - 1];
00706 
00707       // set_status_changed_flag(DDS::SUBSCRIPTION_MATCHED_STATUS, true);
00708 
00709       // DDS::DataReaderListener_var listener
00710       // = listener_for(DDS::SUBSCRIPTION_MATCHED_STATUS);
00711 
00712       if (listener_.in()) {
00713         listener_->on_recorder_matched(
00714           this,
00715           this->subscription_match_status_);
00716 
00717         // Client will look at it so next time it looks the change should be 0
00718         this->subscription_match_status_.total_count_change = 0;
00719         this->subscription_match_status_.current_count_change = 0;
00720       }
00721 
00722       // notify_status_condition();
00723     }
00724   }
00725 
00726   // If this remove_association is invoked when the InfoRepo
00727   // detects a lost writer then make a callback to notify
00728   // subscription lost.
00729   if (notify_lost) {
00730     this->notify_subscription_lost(handles);
00731   }
00732 
00733   // if (this->monitor_) {
00734   //   this->monitor_->report();
00735   // }
00736 }
00737 
00738 void
00739 RecorderImpl::remove_all_associations()
00740 {
00741   DBG_ENTRY_LVL("RecorderImpl","remove_all_associations",6);
00742 
00743   OpenDDS::DCPS::WriterIdSeq writers;
00744   int size;
00745 
00746   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->publication_handle_lock_);
00747 
00748   {
00749     ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
00750 
00751     size = static_cast<int>(writers_.size());
00752     writers.length(size);
00753 
00754     WriterMapType::iterator curr_writer = writers_.begin();
00755     WriterMapType::iterator end_writer = writers_.end();
00756 
00757     int i = 0;
00758 
00759     while (curr_writer != end_writer) {
00760       writers[i++] = curr_writer->first;
00761       ++curr_writer;
00762     }
00763   }
00764 
00765   try {
00766     CORBA::Boolean dont_notify_lost = 0;
00767 
00768     if (0 < size) {
00769       remove_associations(writers, dont_notify_lost);
00770     }
00771 
00772   } catch (const CORBA::Exception&) {
00773   }
00774 }
00775 
00776 void
00777 RecorderImpl::update_incompatible_qos(const IncompatibleQosStatus& status)
00778 {
00779 
00780 
00781   ACE_GUARD(ACE_Recursive_Thread_Mutex,
00782             guard,
00783             this->publication_handle_lock_);
00784 
00785   if (this->requested_incompatible_qos_status_.total_count == status.total_count) {
00786     // This test should make the method idempotent.
00787     return;
00788   }
00789 
00790   // set_status_changed_flag(DDS::REQUESTED_INCOMPATIBLE_QOS_STATUS,
00791   //                         true);
00792 
00793   // copy status and increment change
00794   requested_incompatible_qos_status_.total_count = status.total_count;
00795   requested_incompatible_qos_status_.total_count_change +=
00796     status.count_since_last_send;
00797   requested_incompatible_qos_status_.last_policy_id =
00798     status.last_policy_id;
00799   requested_incompatible_qos_status_.policies = status.policies;
00800 
00801   // if (!CORBA::is_nil(listener.in())) {
00802   //   listener->on_requested_incompatible_qos(dr_local_objref_.in(),
00803   //                                           requested_incompatible_qos_status_);
00804   //
00805   //   // TBD - why does the spec say to change total_count_change but not
00806   //   // change the ChangeFlagStatus after a listener call?
00807   //
00808   //   // client just looked at it so next time it looks the
00809   //   // change should be 0
00810   //   requested_incompatible_qos_status_.total_count_change = 0;
00811   // }
00812   //
00813   // notify_status_condition();
00814 }
00815 
00816 void
00817 RecorderImpl::inconsistent_topic()
00818 {
00819   topic_servant_->inconsistent_topic();
00820 }
00821 
00822 void
00823 RecorderImpl::signal_liveliness(const RepoId& remote_participant)
00824 {
00825   RepoId prefix = remote_participant;
00826   prefix.entityId = EntityId_t();
00827 
00828   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_);
00829 
00830   typedef std::pair<RepoId, RcHandle<WriterInfo> > WriterSetElement;
00831   typedef OPENDDS_VECTOR(WriterSetElement) WriterSet;
00832   WriterSet writers;
00833 
00834   {
00835     ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
00836     for (WriterMapType::iterator pos = writers_.lower_bound(prefix),
00837            limit = writers_.end();
00838          pos != limit && GuidPrefixEqual() (pos->first.guidPrefix, prefix.guidPrefix);
00839          ++pos) {
00840       writers.push_back(std::make_pair(pos->first, pos->second));
00841     }
00842   }
00843 
00844   ACE_Time_Value when = ACE_OS::gettimeofday();
00845   for (WriterSet::iterator pos = writers.begin(), limit = writers.end();
00846        pos != limit;
00847        ++pos) {
00848     pos->second->received_activity(when);
00849   }
00850 }
00851 
00852 DDS::ReturnCode_t RecorderImpl::set_qos(
00853   const DDS::SubscriberQos & subscriber_qos,
00854   const DDS::DataReaderQos & qos)
00855 {
00856 
00857   OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK(subscriber_qos, DDS::RETCODE_UNSUPPORTED);
00858 
00859   if (Qos_Helper::valid(subscriber_qos) && Qos_Helper::consistent(subscriber_qos)) {
00860     if (subqos_ != subscriber_qos) {
00861       // for the not changeable qos, it can be changed before enable
00862       if (!Qos_Helper::changeable(subqos_, subscriber_qos) && enabled_ == true) {
00863         return DDS::RETCODE_IMMUTABLE_POLICY;
00864 
00865       } else {
00866         subqos_ = subscriber_qos;
00867       }
00868     }
00869   } else {
00870     return DDS::RETCODE_INCONSISTENT_POLICY;
00871   }
00872 
00873   OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00874   OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00875   OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00876 
00877   if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
00878     if (qos_ == qos)
00879       return DDS::RETCODE_OK;
00880 
00881     if (!Qos_Helper::changeable(qos_, qos) && this->is_enabled()) {
00882       return DDS::RETCODE_IMMUTABLE_POLICY;
00883 
00884     } else {
00885       Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
00886       const bool status =
00887         disco->update_subscription_qos(
00888           this->participant_servant_->get_domain_id(),
00889           this->participant_servant_->get_id(),
00890           this->subscription_id_,
00891           qos,
00892           subscriber_qos);
00893       if (!status) {
00894         ACE_ERROR_RETURN((LM_ERROR,
00895                           ACE_TEXT("(%P|%t) RecorderImpl::set_qos, ")
00896                           ACE_TEXT("qos not updated. \n")),
00897                          DDS::RETCODE_ERROR);
00898       }
00899     }
00900 
00901     qos_ = qos;
00902     subqos_ = subscriber_qos;
00903 
00904     return DDS::RETCODE_OK;
00905 
00906   } else {
00907     return DDS::RETCODE_INCONSISTENT_POLICY;
00908   }
00909 }
00910 
00911 DDS::ReturnCode_t
00912 RecorderImpl::get_qos(
00913   DDS::SubscriberQos & subscriber_qos,
00914   DDS::DataReaderQos & qos)
00915 {
00916   qos = qos_;
00917   subscriber_qos = subqos_;
00918   return DDS::RETCODE_OK;
00919 }
00920 
00921 DDS::ReturnCode_t
00922 RecorderImpl::set_listener(const RecorderListener_rch& a_listener,
00923                            DDS::StatusMask             mask)
00924 {
00925   listener_mask_ = mask;
00926   //note: OK to duplicate  a nil object ref
00927   listener_ = a_listener;
00928   return DDS::RETCODE_OK;
00929 }
00930 
00931 RecorderListener_rch
00932 RecorderImpl::get_listener()
00933 {
00934   return listener_;
00935 }
00936 
00937 bool
00938 RecorderImpl::lookup_instance_handles(const WriterIdSeq&       ids,
00939                                       DDS::InstanceHandleSeq & hdls)
00940 {
00941   if (DCPS_debug_level > 9) {
00942     CORBA::ULong const size = ids.length();
00943     OPENDDS_STRING separator = "";
00944     OPENDDS_STRING buffer;
00945 
00946     for (unsigned long i = 0; i < size; ++i) {
00947       buffer += separator + OPENDDS_STRING(GuidConverter(ids[i]));
00948       separator = ", ";
00949     }
00950 
00951     ACE_DEBUG((LM_DEBUG,
00952                ACE_TEXT("(%P|%t) RecorderImpl::lookup_instance_handles: ")
00953                ACE_TEXT("searching for handles for writer Ids: %C.\n"),
00954                buffer.c_str()));
00955   }
00956 
00957   CORBA::ULong const num_wrts = ids.length();
00958   hdls.length(num_wrts);
00959 
00960   for (CORBA::ULong i = 0; i < num_wrts; ++i) {
00961     hdls[i] = this->participant_servant_->id_to_handle(ids[i]);
00962   }
00963 
00964   return true;
00965 }
00966 
00967 DDS::ReturnCode_t
00968 RecorderImpl::enable()
00969 {
00970   if (DCPS_debug_level >= 1) {
00971 
00972     ACE_DEBUG((LM_DEBUG,
00973                ACE_TEXT("(%P|%t) RecorderImpl::enable\n")));
00974   }
00975   //According spec:
00976   // - Calling enable on an already enabled Entity returns OK and has no
00977   // effect.
00978   // - Calling enable on an Entity whose factory is not enabled will fail
00979   // and return PRECONDITION_NOT_MET.
00980 
00981   if (this->is_enabled()) {
00982     return DDS::RETCODE_OK;
00983   }
00984 
00985   this->set_enabled();
00986 
00987   // if (topic_servant_ && !transport_disabled_) {
00988   if (topic_servant_) {
00989 
00990     ACE_DEBUG((LM_DEBUG,
00991                ACE_TEXT("(%P|%t) RecorderImpl::enable_transport\n")));
00992 
00993     try {
00994       this->enable_transport(this->qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS,
00995                              this->qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS);
00996     } catch (const Transport::Exception&) {
00997       ACE_ERROR((LM_ERROR,
00998                  ACE_TEXT("(%P|%t) ERROR: RecorderImpl::enable, ")
00999                  ACE_TEXT("Transport Exception.\n")));
01000       return DDS::RETCODE_ERROR;
01001 
01002     }
01003 
01004     const TransportLocatorSeq& trans_conf_info = this->connection_info();
01005 
01006     CORBA::String_var filterClassName = "";
01007     CORBA::String_var filterExpression = "";
01008     DDS::StringSeq exprParams;
01009 
01010 
01011     Discovery_rch disco =
01012       TheServiceParticipant->get_discovery(this->domain_id_);
01013 
01014     ACE_DEBUG((LM_DEBUG,
01015                ACE_TEXT("(%P|%t) RecorderImpl::add_subscription\n")));
01016 
01017     this->subscription_id_ =
01018       disco->add_subscription(this->domain_id_,
01019                               this->participant_servant_->get_id(),
01020                               this->topic_servant_->get_id(),
01021                               this,
01022                               this->qos_,
01023                               trans_conf_info,
01024                               this->subqos_,
01025                               filterClassName,
01026                               filterExpression,
01027                               exprParams);
01028 
01029     if (this->subscription_id_ == OpenDDS::DCPS::GUID_UNKNOWN) {
01030       ACE_ERROR((LM_ERROR,
01031                  ACE_TEXT("(%P|%t) ERROR: RecorderImpl::enable, ")
01032                  ACE_TEXT("add_subscription returned invalid id.\n")));
01033       return DDS::RETCODE_ERROR;
01034     }
01035   }
01036 
01037   if (topic_servant_) {
01038     const CORBA::String_var name = topic_servant_->get_name();
01039     DDS::ReturnCode_t return_value = DDS::RETCODE_OK;
01040     //   this->participant_servant_->recorder_enabled(name.in(), this);
01041 
01042     return return_value;
01043   } else {
01044     return DDS::RETCODE_OK;
01045   }
01046 }
01047 
01048 DDS::InstanceHandle_t
01049 RecorderImpl::get_instance_handle()
01050 {
01051   return this->participant_servant_->id_to_handle(subscription_id_);
01052 }
01053 
01054 void
01055 RecorderImpl::register_for_writer(const RepoId& participant,
01056                                   const RepoId& readerid,
01057                                   const RepoId& writerid,
01058                                   const TransportLocatorSeq& locators,
01059                                   DiscoveryListener* listener)
01060 {
01061   TransportClient::register_for_writer(participant, readerid, writerid, locators, listener);
01062 }
01063 
01064 void
01065 RecorderImpl::unregister_for_writer(const RepoId& participant,
01066                                     const RepoId& readerid,
01067                                     const RepoId& writerid)
01068 {
01069   TransportClient::unregister_for_writer(participant, readerid, writerid);
01070 }
01071 
01072 #if !defined (DDS_HAS_MINIMUM_BIT)
01073 DDS::ReturnCode_t
01074 RecorderImpl::repoid_to_bit_key(const DCPS::RepoId&     id,
01075                                 DDS::BuiltinTopicKey_t& key)
01076 {
01077   DDS::InstanceHandle_t publication_handle = this->participant_servant_->id_to_handle(id);
01078 
01079   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01080                    guard,
01081                    this->publication_handle_lock_,
01082                    DDS::RETCODE_ERROR);
01083 
01084   BIT_Helper_1 < DDS::PublicationBuiltinTopicDataDataReader,
01085                  DDS::PublicationBuiltinTopicDataDataReader_var,
01086                  DDS::PublicationBuiltinTopicDataSeq > hh;
01087 
01088   DDS::PublicationBuiltinTopicDataSeq data;
01089 
01090   DDS::ReturnCode_t ret
01091     = hh.instance_handle_to_bit_data(participant_servant_,
01092                                      BUILT_IN_PUBLICATION_TOPIC,
01093                                      publication_handle,
01094                                      data);
01095 
01096   if (ret == DDS::RETCODE_OK) {
01097     key = data[0].key;
01098   }
01099 
01100   return ret;
01101 }
01102 #endif // !defined (DDS_HAS_MINIMUM_BIT)
01103 
01104 } // namespace DCPS
01105 } // namespace

Generated on Fri Feb 12 20:05:25 2016 for OpenDDS by  doxygen 1.4.7