RecorderImpl.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
00009 #include "tao/ORB_Core.h"
00010 #include "SubscriptionInstance.h"
00011 #include "ReceivedDataElementList.h"
00012 #include "DomainParticipantImpl.h"
00013 #include "Service_Participant.h"
00014 #include "Qos_Helper.h"
00015 #include "FeatureDisabledQosCheck.h"
00016 #include "GuidConverter.h"
00017 #include "TopicImpl.h"
00018 #include "Serializer.h"
00019 #include "SubscriberImpl.h"
00020 #include "Transient_Kludge.h"
00021 #include "Util.h"
00022 #include "RequestedDeadlineWatchdog.h"
00023 #include "QueryConditionImpl.h"
00024 #include "ReadConditionImpl.h"
00025 #include "MonitorFactory.h"
00026 #include "dds/DCPS/transport/framework/EntryExit.h"
00027 #include "dds/DCPS/transport/framework/TransportExceptions.h"
00028 #include "dds/DdsDcpsCoreC.h"
00029 #include "dds/DdsDcpsGuidTypeSupportImpl.h"
00030 #include "dds/DCPS/SafetyProfileStreams.h"
00031 #if !defined (DDS_HAS_MINIMUM_BIT)
00032 #include "BuiltInTopicUtils.h"
00033 #include "dds/DdsDcpsCoreTypeSupportC.h"
00034 #endif // !defined (DDS_HAS_MINIMUM_BIT)
00035 #include "RecorderImpl.h"
00036 #include "PoolAllocator.h"
00037 
00038 #include "ace/Reactor.h"
00039 #include "ace/Auto_Ptr.h"
00040 #include "ace/Condition_Recursive_Thread_Mutex.h"
00041 
00042 #include <stdexcept>
00043 
00044 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00045 
00046 namespace OpenDDS {
00047 namespace DCPS {
00048 
00049 RecorderImpl::RecorderImpl()
00050   : qos_(TheServiceParticipant->initial_DataReaderQos()),
00051   participant_servant_(0),
00052   topic_servant_(0),
00053 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00054   is_exclusive_ownership_ (false),
00055 #endif
00056 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00057   owner_manager_ (0),
00058 #endif
00059   subqos_ (TheServiceParticipant->initial_SubscriberQos()),
00060   topic_desc_(0),
00061   listener_mask_(DEFAULT_STATUS_MASK),
00062   domain_id_(0),
00063   remove_association_sweeper_(
00064     make_rch<RemoveAssociationSweeper<RecorderImpl> >(TheServiceParticipant->reactor(),
00065                                          TheServiceParticipant->reactor_owner(),
00066                                          this)),
00067   is_bit_(false)
00068 {
00069 
00070   requested_incompatible_qos_status_.total_count = 0;
00071   requested_incompatible_qos_status_.total_count_change = 0;
00072   requested_incompatible_qos_status_.last_policy_id = 0;
00073   requested_incompatible_qos_status_.policies.length(0);
00074 
00075   subscription_match_status_.total_count = 0;
00076   subscription_match_status_.total_count_change = 0;
00077   subscription_match_status_.current_count = 0;
00078   subscription_match_status_.current_count_change = 0;
00079   subscription_match_status_.last_publication_handle =
00080     DDS::HANDLE_NIL;
00081 
00082 }
00083 
00084 // This method is called when there are no longer any reference to the
00085 // the servant.
00086 RecorderImpl::~RecorderImpl()
00087 {
00088   DBG_ENTRY_LVL("RecorderImpl","~RecorderImpl",6);
00089   {
00090     ACE_READ_GUARD(ACE_RW_Thread_Mutex,
00091                    read_guard,
00092                    this->writers_lock_);
00093     // Cancel any uncancelled sweeper timers to decrement reference count.
00094     WriterMapType::iterator writer;
00095     for (writer = writers_.begin(); writer != writers_.end(); ++writer) {
00096       remove_association_sweeper_->cancel_timer(writer->second);
00097     }
00098   }
00099 
00100   remove_association_sweeper_->wait();
00101 }
00102 
00103 
00104 DDS::ReturnCode_t
00105 RecorderImpl::cleanup()
00106 {
00107 
00108   Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
00109   if (!disco->remove_subscription(this->domain_id_,
00110                                   participant_servant_->get_id(),
00111                                   this->subscription_id_)) {
00112     ACE_ERROR_RETURN((LM_ERROR,
00113                       ACE_TEXT("(%P|%t) ERROR: ")
00114                       ACE_TEXT("RecorderImpl::cleanup: ")
00115                       ACE_TEXT(" could not remove subscription from discovery.\n")),
00116                      DDS::RETCODE_ERROR);
00117   }
00118 
00119   // Call remove association before unregistering the datareader from the transport,
00120   // otherwise some callbacks resulted from remove_association may lost.
00121 
00122   this->remove_all_associations();
00123 
00124   {
00125     ACE_READ_GUARD_RETURN(ACE_RW_Thread_Mutex,
00126                    read_guard,
00127                    this->writers_lock_,
00128                    0);
00129     // Cancel any uncancelled sweeper timers
00130     WriterMapType::iterator writer;
00131     for (writer = writers_.begin(); writer != writers_.end(); ++writer) {
00132       remove_association_sweeper_->cancel_timer(writer->second);
00133     }
00134   }
00135 
00136   remove_association_sweeper_->wait();
00137   return DDS::RETCODE_OK;
00138 }
00139 
00140 void RecorderImpl::init(
00141   TopicDescriptionImpl*      a_topic_desc,
00142   const DDS::DataReaderQos & qos,
00143   RecorderListener_rch       a_listener,
00144   const DDS::StatusMask &    mask,
00145   DomainParticipantImpl*     participant,
00146   DDS::SubscriberQos         subqos)
00147 {
00148   //
00149   if (DCPS_debug_level >= 1) {
00150 
00151     ACE_DEBUG((LM_DEBUG,
00152                ACE_TEXT("(%P|%t) RecorderImpl::init \n")));
00153   }
00154 
00155 
00156   topic_desc_ = DDS::TopicDescription::_duplicate(a_topic_desc);
00157   if (TopicImpl* a_topic = dynamic_cast<TopicImpl*>(a_topic_desc)) {
00158     topic_servant_ = a_topic;
00159   }
00160 
00161   CORBA::String_var topic_name = a_topic_desc->get_name();
00162 
00163 #if !defined (DDS_HAS_MINIMUM_BIT)
00164   is_bit_ = topicIsBIT(topic_name.in(), a_topic_desc->get_type_name());
00165 #endif   // !defined (DDS_HAS_MINIMUM_BIT)
00166 
00167   qos_ = qos;
00168 
00169 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00170   is_exclusive_ownership_ = this->qos_.ownership.kind == ::DDS::EXCLUSIVE_OWNERSHIP_QOS;
00171 #endif
00172 
00173   listener_ = a_listener;
00174   listener_mask_ = mask;
00175 
00176   // Only store the participant pointer, since it is our "grand"
00177   // parent, we will exist as long as it does
00178   participant_servant_ = participant;
00179 
00180 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00181   if (is_exclusive_ownership_) {
00182     owner_manager_ = participant_servant_->ownership_manager ();
00183   }
00184 #endif
00185 
00186   domain_id_ = participant_servant_->get_domain_id();
00187   subqos_ = subqos;
00188 }
00189 
00190 bool RecorderImpl::check_transport_qos(const TransportInst& ti)
00191 {
00192   if (this->qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS) {
00193     return ti.is_reliable();
00194   }
00195   return true;
00196 }
00197 
00198 const RepoId& RecorderImpl::get_repo_id() const
00199 {
00200   return this->subscription_id_;
00201 }
00202 
00203 CORBA::Long RecorderImpl::get_priority_value(const AssociationData& data) const
00204 {
00205   return data.publication_transport_priority_;
00206 }
00207 
00208 
00209 void RecorderImpl::data_received(const ReceivedDataSample& sample)
00210 {
00211   DBG_ENTRY_LVL("RecorderImpl","data_received",6);
00212 
00213   // ensure some other thread is not changing the sample container
00214   // or statuses related to samples.
00215   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_);
00216 
00217   if (DCPS_debug_level > 9) {
00218     GuidConverter converter(subscription_id_);
00219     ACE_DEBUG((LM_DEBUG,
00220                ACE_TEXT("(%P|%t) RecorderImpl::data_received: ")
00221                ACE_TEXT("%C received sample: %C.\n"),
00222                OPENDDS_STRING(converter).c_str(),
00223                to_string(sample.header_).c_str()));
00224   }
00225 
00226   // we only support SAMPLE_DATA messages
00227   if (sample.header_.message_id_ != SAMPLE_DATA)
00228     return;
00229 
00230   RawDataSample rawSample(static_cast<MessageId> (sample.header_.message_id_),
00231                           sample.header_.source_timestamp_sec_,
00232                           sample.header_.source_timestamp_nanosec_,
00233                           sample.header_.publication_id_,
00234                           sample.header_.byte_order_,
00235                           sample.sample_.get());
00236 
00237   if (listener_.in()) {
00238     listener_->on_sample_data_received(this, rawSample);
00239   }
00240 
00241 }
00242 
00243 void RecorderImpl::notify_subscription_disconnected(const WriterIdSeq&)
00244 {
00245 }
00246 
00247 void RecorderImpl::notify_subscription_reconnected(const WriterIdSeq&)
00248 {
00249 
00250 }
00251 
00252 void
00253 RecorderImpl::notify_subscription_lost(const DDS::InstanceHandleSeq&)
00254 {
00255 }
00256 
00257 void RecorderImpl::notify_subscription_lost(const WriterIdSeq&)
00258 {
00259 
00260 }
00261 
00262 
00263 void
00264 RecorderImpl::add_association(const RepoId&            yourId,
00265                               const WriterAssociation& writer,
00266                               bool                     active)
00267 {
00268     ACE_DEBUG((LM_DEBUG, "RecorderImpl::add_association\n"));
00269   //
00270   // The following block is for diagnostic purposes only.
00271   //
00272   if (DCPS_debug_level >= 1) {
00273     GuidConverter reader_converter(yourId);
00274     GuidConverter writer_converter(writer.writerId);
00275     ACE_DEBUG((LM_DEBUG,
00276                ACE_TEXT("(%P|%t) RecorderImpl::add_association - ")
00277                ACE_TEXT("bit %d local %C remote %C\n"),
00278                is_bit_,
00279                OPENDDS_STRING(reader_converter).c_str(),
00280                OPENDDS_STRING(writer_converter).c_str()));
00281   }
00282 
00283   //
00284   // This block prevents adding associations to deleted readers.
00285   // Presumably this is a "good thing(tm)".
00286   //
00287   // if (entity_deleted_ == true) {
00288   //   if (DCPS_debug_level >= 1)
00289   //     ACE_DEBUG((LM_DEBUG,
00290   //                ACE_TEXT("(%P|%t) RecorderImpl::add_association")
00291   //                ACE_TEXT(" This is a deleted datareader, ignoring add.\n")));
00292   //
00293   //   return;
00294   // }
00295 
00296   //
00297   // We are being called back from the repository before we are done
00298   // processing after our call to the repository that caused this call
00299   // (from the repository) to be made.
00300   //
00301   if (GUID_UNKNOWN == subscription_id_) {
00302     // add_associations was invoked before DCSPInfoRepo::add_subscription() returned.
00303     subscription_id_ = yourId;
00304   }
00305 
00306   //
00307   // We do the following while holding the publication_handle_lock_.
00308   //
00309   {
00310     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->publication_handle_lock_);
00311 
00312     //
00313     // For each writer in the list of writers to associate with, we
00314     // create a WriterInfo and a WriterStats object and store them in
00315     // our internal maps.
00316     //
00317     {
00318       ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_);
00319 
00320       const PublicationId& writer_id = writer.writerId;
00321       RcHandle<WriterInfo> info ( make_rch<WriterInfo>(static_cast<WriterInfoListener*>(this), writer_id, writer.writerQos));
00322       /*std::pair<WriterMapType::iterator, bool> bpair =*/
00323       this->writers_.insert(
00324         // This insertion is idempotent.
00325         WriterMapType::value_type(
00326           writer_id,
00327           info));
00328       // this->statistics_.insert(
00329       //   StatsMapType::value_type(
00330       //     writer_id,
00331       //     WriterStats(
00332       //       this->raw_latency_buffer_size_,
00333       //       this->raw_latency_buffer_type_)));
00334 
00335       // if (DCPS_debug_level > 4) {
00336       //   GuidConverter converter(writer_id);
00337       //   ACE_DEBUG((LM_DEBUG,
00338       //              "(%P|%t) RecorderImpl::add_association: "
00339       //              "inserted writer %C.return %d \n",
00340       //              OPENDDS_STRING(converter).c_str(), bpair.second));
00341       //
00342       //   WriterMapType::iterator iter = writers_.find(writer_id);
00343       //   if (iter != writers_.end()) {
00344       //     // This may not be an error since it could happen that the sample
00345       //     // is delivered to the datareader after the write is dis-associated
00346       //     // with this datareader.
00347       //     GuidConverter reader_converter(subscription_id_);
00348       //     GuidConverter writer_converter(writer_id);
00349       //     ACE_DEBUG((LM_DEBUG,
00350       //               ACE_TEXT("(%P|%t) RecorderImpl::add_association: ")
00351       //               ACE_TEXT("reader %C is associated with writer %C.\n"),
00352       //               OPENDDS_STRING(reader_converter).c_str(),
00353       //               OPENDDS_STRING(writer_converter).c_str()));
00354       //   }
00355       // }
00356     }
00357 
00358     //
00359     // Propagate the add_associations processing down into the Transport
00360     // layer here.  This will establish the transport support and reserve
00361     // usage of an existing connection or initiate creation of a new
00362     // connection if no suitable connection is available.
00363     //
00364     AssociationData data;
00365     data.remote_id_ = writer.writerId;
00366     data.remote_data_ = writer.writerTransInfo;
00367     data.publication_transport_priority_ =
00368       writer.writerQos.transport_priority.value;
00369     data.remote_reliable_ =
00370       (writer.writerQos.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS);
00371     data.remote_durable_ =
00372       (writer.writerQos.durability.kind > DDS::VOLATILE_DURABILITY_QOS);
00373 
00374     if (!this->associate(data, active)) {
00375       if (DCPS_debug_level) {
00376         ACE_ERROR((LM_ERROR,
00377                    ACE_TEXT("(%P|%t) RecorderImpl::add_association: ")
00378                    ACE_TEXT("ERROR: transport layer failed to associate.\n")));
00379       }
00380       return;
00381     }
00382 
00383     // Check if any publications have already sent a REQUEST_ACK message.
00384     // {
00385     //   ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
00386     //
00387     //   WriterMapType::iterator where = this->writers_.find(writer.writerId);
00388     //
00389     //   if (where != this->writers_.end()) {
00390     //     const ACE_Time_Value now = ACE_OS::gettimeofday();
00391     //
00392     //     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_);
00393     //
00394     //     if (where->second->should_ack(now)) {
00395     //       const SequenceNumber sequence = where->second->ack_sequence();
00396     //       const DDS::Time_t timenow = time_value_to_time(now);
00397     //       if (this->send_sample_ack(writer.writerId, sequence, timenow)) {
00398     //         where->second->clear_acks(sequence);
00399     //       }
00400     //     }
00401     //   }
00402     // }
00403 
00404     //
00405     // LIVELINESS policy timers are managed here.
00406     //
00407     // if (liveliness_lease_duration_ != ACE_Time_Value::zero) {
00408     //   // this call will start the timer if it is not already set
00409     //   const ACE_Time_Value now = ACE_OS::gettimeofday();
00410     //
00411     //   if (DCPS_debug_level >= 5) {
00412     //     GuidConverter converter(subscription_id_);
00413     //     ACE_DEBUG((LM_DEBUG,
00414     //                ACE_TEXT("(%P|%t) RecorderImpl::add_association: ")
00415     //                ACE_TEXT("starting/resetting liveliness timer for reader %C\n"),
00416     //                OPENDDS_STRING(converter).c_str()));
00417     //   }
00418     //
00419     //   this->handle_timeout(now, this);
00420     // }
00421 
00422     // else - no timer needed when LIVELINESS.lease_duration is INFINITE
00423 
00424   }
00425   //
00426   // We no longer hold the publication_handle_lock_.
00427   //
00428 
00429   //
00430   // We only do the following processing for readers that are *not*
00431   // readers of Builtin Topics.
00432   //
00433   if (!is_bit_) {
00434 
00435     DDS::InstanceHandle_t handle =
00436       this->participant_servant_->id_to_handle(writer.writerId);
00437 
00438     //
00439     // We acquire the publication_handle_lock_ for the remainder of our
00440     // processing.
00441     //
00442     {
00443       ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->publication_handle_lock_);
00444 
00445       // This insertion is idempotent.
00446       this->id_to_handle_map_.insert(
00447         RepoIdToHandleMap::value_type(writer.writerId, handle));
00448 
00449       if (DCPS_debug_level > 4) {
00450         GuidConverter converter(writer.writerId);
00451         ACE_DEBUG((LM_DEBUG,
00452                    ACE_TEXT("(%P|%t) RecorderImpl::add_association: ")
00453                    ACE_TEXT("id_to_handle_map_[ %C] = 0x%x.\n"),
00454                    OPENDDS_STRING(converter).c_str(),
00455                    handle));
00456       }
00457 
00458       // We need to adjust these after the insertions have all completed
00459       // since insertions are not guaranteed to increase the number of
00460       // currently matched publications.
00461       int matchedPublications = static_cast<int>(this->id_to_handle_map_.size());
00462       this->subscription_match_status_.current_count_change
00463         = matchedPublications - this->subscription_match_status_.current_count;
00464       this->subscription_match_status_.current_count = matchedPublications;
00465 
00466       ++this->subscription_match_status_.total_count;
00467       ++this->subscription_match_status_.total_count_change;
00468 
00469       this->subscription_match_status_.last_publication_handle = handle;
00470 
00471       // set_status_changed_flag(DDS::SUBSCRIPTION_MATCHED_STATUS, true);
00472 
00473 
00474       if (listener_.in()) {
00475         listener_->on_recorder_matched(
00476           this,
00477           this->subscription_match_status_);
00478 
00479         // TBD - why does the spec say to change this but not change
00480         //       the ChangeFlagStatus after a listener call?
00481 
00482         // Client will look at it so next time it looks the change should be 0
00483         this->subscription_match_status_.total_count_change = 0;
00484         this->subscription_match_status_.current_count_change = 0;
00485       }
00486 
00487       // notify_status_condition();
00488     }
00489 
00490     {
00491       ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_);
00492 
00493       this->writers_[writer.writerId]->handle_ = handle;
00494     }
00495   }
00496 
00497   if (!active) {
00498     Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
00499     disco->association_complete(this->domain_id_,
00500                                 this->participant_servant_->get_id(),
00501                                 this->subscription_id_, writer.writerId);
00502   }
00503 
00504   // if (this->monitor_) {
00505   //   this->monitor_->report();
00506   // }
00507 }
00508 
00509 void
00510 RecorderImpl::association_complete(const RepoId& /*remote_id*/)
00511 {
00512   // For the current DCPSInfoRepo implementation, the DataReader side will
00513   // always be passive, so association_complete() will not be called.
00514 }
00515 
00516 void
00517 RecorderImpl::remove_associations(const WriterIdSeq& writers,
00518                                   bool               notify_lost)
00519 {
00520   DBG_ENTRY_LVL("RecorderImpl", "remove_associations", 6);
00521   if (writers.length() == 0) {
00522     return;
00523   }
00524 
00525   if (DCPS_debug_level >= 1) {
00526     GuidConverter reader_converter(subscription_id_);
00527     GuidConverter writer_converter(writers[0]);
00528     ACE_DEBUG((LM_DEBUG,
00529                ACE_TEXT("(%P|%t) RecorderImpl::remove_associations: ")
00530                ACE_TEXT("bit %d local %C remote %C num remotes %d \n"),
00531                is_bit_,
00532                OPENDDS_STRING(reader_converter).c_str(),
00533                OPENDDS_STRING(writer_converter).c_str(),
00534                writers.length()));
00535   }
00536   if (!this->entity_deleted_.value()) {
00537     // stop pending associations for these writer ids
00538     this->stop_associating(writers.get_buffer(), writers.length());
00539 
00540     // writers which are considered non-active and can
00541     // be removed immediately
00542     WriterIdSeq non_active_writers;
00543     {
00544       CORBA::ULong wr_len = writers.length();
00545       ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_);
00546 
00547       for (CORBA::ULong i = 0; i < wr_len; i++) {
00548         PublicationId writer_id = writers[i];
00549 
00550         WriterMapType::iterator it = this->writers_.find(writer_id);
00551         if (it != this->writers_.end() &&
00552             it->second->active()) {
00553           remove_association_sweeper_->schedule_timer(it->second, notify_lost);
00554         } else {
00555           push_back(non_active_writers, writer_id);
00556         }
00557       }
00558     }
00559     remove_associations_i(non_active_writers, notify_lost);
00560   } else {
00561     remove_associations_i(writers, notify_lost);
00562   }
00563 }
00564 
00565 void
00566 RecorderImpl::remove_publication(const PublicationId& pub_id)
00567 {
00568   ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_);
00569   WriterMapType::iterator where = writers_.find(pub_id);
00570   if (writers_.end() != where) {
00571     WriterInfo& info = *where->second;
00572     WriterIdSeq writers;
00573     push_back(writers, pub_id);
00574     bool notify = info.notify_lost_;
00575     write_guard.release();
00576     remove_associations_i(writers, notify);
00577   }
00578 }
00579 
00580 void
00581 RecorderImpl::remove_associations_i(const WriterIdSeq& writers,
00582     bool notify_lost)
00583 {
00584   DBG_ENTRY_LVL("RecorderImpl", "remove_associations_i", 6);
00585 
00586   if (writers.length() == 0) {
00587     return;
00588   }
00589 
00590   if (DCPS_debug_level >= 1) {
00591     GuidConverter reader_converter(subscription_id_);
00592     GuidConverter writer_converter(writers[0]);
00593     ACE_DEBUG((LM_DEBUG,
00594                ACE_TEXT("(%P|%t) RecorderImpl::remove_associations_i: ")
00595                ACE_TEXT("bit %d local %C remote %C num remotes %d \n"),
00596                is_bit_,
00597                OPENDDS_STRING(reader_converter).c_str(),
00598                OPENDDS_STRING(writer_converter).c_str(),
00599                writers.length()));
00600   }
00601   DDS::InstanceHandleSeq handles;
00602 
00603   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->publication_handle_lock_);
00604 
00605   // This is used to hold the list of writers which were actually
00606   // removed, which is a proper subset of the writers which were
00607   // requested to be removed.
00608   WriterIdSeq updated_writers;
00609 
00610   CORBA::ULong wr_len;
00611 
00612   //Remove the writers from writer list. If the supplied writer
00613   //is not in the cached writers list then it is already removed.
00614   //We just need remove the writers in the list that have not been
00615   //removed.
00616   {
00617     ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_);
00618 
00619     wr_len = writers.length();
00620 
00621     for (CORBA::ULong i = 0; i < wr_len; i++) {
00622       PublicationId writer_id = writers[i];
00623 
00624       WriterMapType::iterator it = this->writers_.find(writer_id);
00625 
00626       if (it != this->writers_.end()) {
00627         it->second->removed();
00628         remove_association_sweeper_->cancel_timer(it->second);
00629       }
00630 
00631       if (this->writers_.erase(writer_id) == 0) {
00632         if (DCPS_debug_level >= 1) {
00633           GuidConverter converter(writer_id);
00634           ACE_DEBUG((LM_DEBUG,
00635                      ACE_TEXT("(%P|%t) RecorderImpl::remove_associations_i: ")
00636                      ACE_TEXT("the writer local %C was already removed.\n"),
00637                      OPENDDS_STRING(converter).c_str()));
00638         }
00639 
00640       } else {
00641         push_back(updated_writers, writer_id);
00642       }
00643     }
00644   }
00645 
00646   wr_len = updated_writers.length();
00647 
00648   // Return now if the supplied writers have been removed already.
00649   if (wr_len == 0) {
00650     return;
00651   }
00652 
00653   if (!is_bit_) {
00654     // The writer should be in the id_to_handle map at this time.  Note
00655     // it if it not there.
00656     this->lookup_instance_handles(updated_writers, handles);
00657 
00658     for (CORBA::ULong i = 0; i < wr_len; ++i) {
00659       id_to_handle_map_.erase(updated_writers[i]);
00660     }
00661   }
00662   for (CORBA::ULong i = 0; i < updated_writers.length(); ++i) {
00663     this->disassociate(updated_writers[i]);
00664   }
00665 
00666   // Mirror the add_associations SUBSCRIPTION_MATCHED_STATUS processing.
00667   if (!this->is_bit_) {
00668     // Derive the change in the number of publications writing to this reader.
00669     int matchedPublications = static_cast<int>(this->id_to_handle_map_.size());
00670     this->subscription_match_status_.current_count_change
00671       = matchedPublications - this->subscription_match_status_.current_count;
00672 
00673     // Only process status if the number of publications has changed.
00674     if (this->subscription_match_status_.current_count_change != 0) {
00675       this->subscription_match_status_.current_count = matchedPublications;
00676       /// Section 7.1.4.1: total_count will not decrement.
00677 
00678       /// @TODO: Reconcile this with the verbiage in section 7.1.4.1
00679       this->subscription_match_status_.last_publication_handle
00680         = handles[ wr_len - 1];
00681 
00682       // set_status_changed_flag(DDS::SUBSCRIPTION_MATCHED_STATUS, true);
00683 
00684       // DDS::DataReaderListener_var listener
00685       // = listener_for(DDS::SUBSCRIPTION_MATCHED_STATUS);
00686 
00687       if (listener_.in()) {
00688         listener_->on_recorder_matched(
00689           this,
00690           this->subscription_match_status_);
00691 
00692         // Client will look at it so next time it looks the change should be 0
00693         this->subscription_match_status_.total_count_change = 0;
00694         this->subscription_match_status_.current_count_change = 0;
00695       }
00696 
00697       // notify_status_condition();
00698     }
00699   }
00700 
00701   // If this remove_association is invoked when the InfoRepo
00702   // detects a lost writer then make a callback to notify
00703   // subscription lost.
00704   if (notify_lost) {
00705     this->notify_subscription_lost(handles);
00706   }
00707 
00708   // if (this->monitor_) {
00709   //   this->monitor_->report();
00710   // }
00711 }
00712 
00713 void
00714 RecorderImpl::remove_all_associations()
00715 {
00716   DBG_ENTRY_LVL("RecorderImpl","remove_all_associations",6);
00717 
00718   OpenDDS::DCPS::WriterIdSeq writers;
00719   int size;
00720 
00721   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->publication_handle_lock_);
00722 
00723   {
00724     ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
00725 
00726     size = static_cast<int>(writers_.size());
00727     writers.length(size);
00728 
00729     WriterMapType::iterator curr_writer = writers_.begin();
00730     WriterMapType::iterator end_writer = writers_.end();
00731 
00732     int i = 0;
00733 
00734     while (curr_writer != end_writer) {
00735       writers[i++] = curr_writer->first;
00736       ++curr_writer;
00737     }
00738   }
00739 
00740   try {
00741     CORBA::Boolean dont_notify_lost = 0;
00742 
00743     if (0 < size) {
00744       remove_associations(writers, dont_notify_lost);
00745     }
00746 
00747   } catch (const CORBA::Exception&) {
00748   }
00749 }
00750 
00751 void
00752 RecorderImpl::update_incompatible_qos(const IncompatibleQosStatus& status)
00753 {
00754 
00755 
00756   ACE_GUARD(ACE_Recursive_Thread_Mutex,
00757             guard,
00758             this->publication_handle_lock_);
00759 
00760   if (this->requested_incompatible_qos_status_.total_count == status.total_count) {
00761     // This test should make the method idempotent.
00762     return;
00763   }
00764 
00765   // set_status_changed_flag(DDS::REQUESTED_INCOMPATIBLE_QOS_STATUS,
00766   //                         true);
00767 
00768   // copy status and increment change
00769   requested_incompatible_qos_status_.total_count = status.total_count;
00770   requested_incompatible_qos_status_.total_count_change +=
00771     status.count_since_last_send;
00772   requested_incompatible_qos_status_.last_policy_id =
00773     status.last_policy_id;
00774   requested_incompatible_qos_status_.policies = status.policies;
00775 
00776   // if (!CORBA::is_nil(listener.in())) {
00777   //   listener->on_requested_incompatible_qos(this,
00778   //                                           requested_incompatible_qos_status_);
00779   //
00780   //   // TBD - why does the spec say to change total_count_change but not
00781   //   // change the ChangeFlagStatus after a listener call?
00782   //
00783   //   // client just looked at it so next time it looks the
00784   //   // change should be 0
00785   //   requested_incompatible_qos_status_.total_count_change = 0;
00786   // }
00787   //
00788   // notify_status_condition();
00789 }
00790 
00791 void
00792 RecorderImpl::inconsistent_topic()
00793 {
00794   topic_servant_->inconsistent_topic();
00795 }
00796 
00797 void
00798 RecorderImpl::signal_liveliness(const RepoId& remote_participant)
00799 {
00800   RepoId prefix = remote_participant;
00801   prefix.entityId = EntityId_t();
00802 
00803   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_);
00804 
00805   typedef std::pair<RepoId, RcHandle<WriterInfo> > WriterSetElement;
00806   typedef OPENDDS_VECTOR(WriterSetElement) WriterSet;
00807   WriterSet writers;
00808 
00809   {
00810     ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
00811     for (WriterMapType::iterator pos = writers_.lower_bound(prefix),
00812            limit = writers_.end();
00813          pos != limit && GuidPrefixEqual() (pos->first.guidPrefix, prefix.guidPrefix);
00814          ++pos) {
00815       writers.push_back(std::make_pair(pos->first, pos->second));
00816     }
00817   }
00818 
00819   ACE_Time_Value when = ACE_OS::gettimeofday();
00820   for (WriterSet::iterator pos = writers.begin(), limit = writers.end();
00821        pos != limit;
00822        ++pos) {
00823     pos->second->received_activity(when);
00824   }
00825 }
00826 
00827 DDS::ReturnCode_t RecorderImpl::set_qos(
00828   const DDS::SubscriberQos & subscriber_qos,
00829   const DDS::DataReaderQos & qos)
00830 {
00831 
00832   OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK(subscriber_qos, DDS::RETCODE_UNSUPPORTED);
00833 
00834   if (Qos_Helper::valid(subscriber_qos) && Qos_Helper::consistent(subscriber_qos)) {
00835     if (subqos_ != subscriber_qos) {
00836       // for the not changeable qos, it can be changed before enable
00837       if (!Qos_Helper::changeable(subqos_, subscriber_qos) && enabled_ == true) {
00838         return DDS::RETCODE_IMMUTABLE_POLICY;
00839 
00840       } else {
00841         subqos_ = subscriber_qos;
00842       }
00843     }
00844   } else {
00845     return DDS::RETCODE_INCONSISTENT_POLICY;
00846   }
00847 
00848   OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00849   OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00850   OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00851 
00852   if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
00853     if (qos_ == qos)
00854       return DDS::RETCODE_OK;
00855 
00856     if (!Qos_Helper::changeable(qos_, qos) && this->is_enabled()) {
00857       return DDS::RETCODE_IMMUTABLE_POLICY;
00858 
00859     } else {
00860       Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
00861       const bool status =
00862         disco->update_subscription_qos(
00863           this->participant_servant_->get_domain_id(),
00864           this->participant_servant_->get_id(),
00865           this->subscription_id_,
00866           qos,
00867           subscriber_qos);
00868       if (!status) {
00869         ACE_ERROR_RETURN((LM_ERROR,
00870                           ACE_TEXT("(%P|%t) RecorderImpl::set_qos, ")
00871                           ACE_TEXT("qos not updated. \n")),
00872                          DDS::RETCODE_ERROR);
00873       }
00874     }
00875 
00876     qos_ = qos;
00877     subqos_ = subscriber_qos;
00878 
00879     return DDS::RETCODE_OK;
00880 
00881   } else {
00882     return DDS::RETCODE_INCONSISTENT_POLICY;
00883   }
00884 }
00885 
00886 DDS::ReturnCode_t
00887 RecorderImpl::get_qos(
00888   DDS::SubscriberQos & subscriber_qos,
00889   DDS::DataReaderQos & qos)
00890 {
00891   qos = qos_;
00892   subscriber_qos = subqos_;
00893   return DDS::RETCODE_OK;
00894 }
00895 
00896 DDS::ReturnCode_t
00897 RecorderImpl::set_listener(const RecorderListener_rch& a_listener,
00898                            DDS::StatusMask             mask)
00899 {
00900   listener_mask_ = mask;
00901   //note: OK to duplicate  a nil object ref
00902   listener_ = a_listener;
00903   return DDS::RETCODE_OK;
00904 }
00905 
00906 RecorderListener_rch
00907 RecorderImpl::get_listener()
00908 {
00909   return listener_;
00910 }
00911 
00912 void
00913 RecorderImpl::lookup_instance_handles(const WriterIdSeq&       ids,
00914                                       DDS::InstanceHandleSeq & hdls)
00915 {
00916   CORBA::ULong const num_wrts = ids.length();
00917 
00918   if (DCPS_debug_level > 9) {
00919     OPENDDS_STRING separator = "";
00920     OPENDDS_STRING buffer;
00921 
00922     for (CORBA::ULong i = 0; i < num_wrts; ++i) {
00923       buffer += separator + OPENDDS_STRING(GuidConverter(ids[i]));
00924       separator = ", ";
00925     }
00926 
00927     ACE_DEBUG((LM_DEBUG,
00928                ACE_TEXT("(%P|%t) RecorderImpl::lookup_instance_handles: ")
00929                ACE_TEXT("searching for handles for writer Ids: %C.\n"),
00930                buffer.c_str()));
00931   }
00932 
00933   hdls.length(num_wrts);
00934 
00935   for (CORBA::ULong i = 0; i < num_wrts; ++i) {
00936     hdls[i] = this->participant_servant_->id_to_handle(ids[i]);
00937   }
00938 }
00939 
00940 DDS::ReturnCode_t
00941 RecorderImpl::enable()
00942 {
00943   if (DCPS_debug_level >= 1) {
00944 
00945     ACE_DEBUG((LM_DEBUG,
00946                ACE_TEXT("(%P|%t) RecorderImpl::enable\n")));
00947   }
00948   //According spec:
00949   // - Calling enable on an already enabled Entity returns OK and has no
00950   // effect.
00951   // - Calling enable on an Entity whose factory is not enabled will fail
00952   // and return PRECONDITION_NOT_MET.
00953 
00954   if (this->is_enabled()) {
00955     return DDS::RETCODE_OK;
00956   }
00957 
00958   this->set_enabled();
00959 
00960   // if (topic_servant_ && !transport_disabled_) {
00961   if (topic_servant_) {
00962 
00963     ACE_DEBUG((LM_DEBUG,
00964                ACE_TEXT("(%P|%t) RecorderImpl::enable_transport\n")));
00965 
00966     try {
00967       this->enable_transport(this->qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS,
00968                              this->qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS);
00969     } catch (const Transport::Exception&) {
00970       ACE_ERROR((LM_ERROR,
00971                  ACE_TEXT("(%P|%t) ERROR: RecorderImpl::enable, ")
00972                  ACE_TEXT("Transport Exception.\n")));
00973       return DDS::RETCODE_ERROR;
00974 
00975     }
00976 
00977     const TransportLocatorSeq& trans_conf_info = this->connection_info();
00978 
00979     CORBA::String_var filterClassName = "";
00980     CORBA::String_var filterExpression = "";
00981     DDS::StringSeq exprParams;
00982 
00983 
00984     Discovery_rch disco =
00985       TheServiceParticipant->get_discovery(this->domain_id_);
00986 
00987     ACE_DEBUG((LM_DEBUG,
00988                ACE_TEXT("(%P|%t) RecorderImpl::add_subscription\n")));
00989 
00990     this->subscription_id_ =
00991       disco->add_subscription(this->domain_id_,
00992                               this->participant_servant_->get_id(),
00993                               this->topic_servant_->get_id(),
00994                               this,
00995                               this->qos_,
00996                               trans_conf_info,
00997                               this->subqos_,
00998                               filterClassName,
00999                               filterExpression,
01000                               exprParams);
01001 
01002     if (this->subscription_id_ == OpenDDS::DCPS::GUID_UNKNOWN) {
01003       ACE_ERROR((LM_ERROR,
01004                  ACE_TEXT("(%P|%t) ERROR: RecorderImpl::enable, ")
01005                  ACE_TEXT("add_subscription returned invalid id.\n")));
01006       return DDS::RETCODE_ERROR;
01007     }
01008   }
01009 
01010   if (topic_servant_) {
01011     const CORBA::String_var name = topic_servant_->get_name();
01012     DDS::ReturnCode_t return_value = DDS::RETCODE_OK;
01013     //   this->participant_servant_->recorder_enabled(name.in(), this);
01014 
01015     return return_value;
01016   } else {
01017     return DDS::RETCODE_OK;
01018   }
01019 }
01020 
01021 DDS::InstanceHandle_t
01022 RecorderImpl::get_instance_handle()
01023 {
01024   return this->participant_servant_->id_to_handle(subscription_id_);
01025 }
01026 
01027 void
01028 RecorderImpl::register_for_writer(const RepoId& participant,
01029                                   const RepoId& readerid,
01030                                   const RepoId& writerid,
01031                                   const TransportLocatorSeq& locators,
01032                                   DiscoveryListener* listener)
01033 {
01034   TransportClient::register_for_writer(participant, readerid, writerid, locators, listener);
01035 }
01036 
01037 void
01038 RecorderImpl::unregister_for_writer(const RepoId& participant,
01039                                     const RepoId& readerid,
01040                                     const RepoId& writerid)
01041 {
01042   TransportClient::unregister_for_writer(participant, readerid, writerid);
01043 }
01044 
01045 #if !defined (DDS_HAS_MINIMUM_BIT)
01046 DDS::ReturnCode_t
01047 RecorderImpl::repoid_to_bit_key(const DCPS::RepoId&     id,
01048                                 DDS::BuiltinTopicKey_t& key)
01049 {
01050   DDS::InstanceHandle_t publication_handle = this->participant_servant_->id_to_handle(id);
01051 
01052   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01053                    guard,
01054                    this->publication_handle_lock_,
01055                    DDS::RETCODE_ERROR);
01056 
01057   DDS::PublicationBuiltinTopicDataSeq data;
01058 
01059   DDS::ReturnCode_t ret = instance_handle_to_bit_data<DDS::PublicationBuiltinTopicDataDataReader_var>(
01060                             participant_servant_,
01061                             BUILT_IN_PUBLICATION_TOPIC,
01062                             publication_handle,
01063                             data);
01064 
01065   if (ret == DDS::RETCODE_OK) {
01066     key = data[0].key;
01067   }
01068 
01069   return ret;
01070 }
01071 #endif // !defined (DDS_HAS_MINIMUM_BIT)
01072 
01073 } // namespace DCPS
01074 } // namespace
01075 
01076 OPENDDS_END_VERSIONED_NAMESPACE_DECL
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1