OpenDDS::DCPS::RecorderImpl Class Reference

Implementation of Recorder functionality. More...

#include <RecorderImpl.h>

Inheritance diagram for OpenDDS::DCPS::RecorderImpl:

Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::RecorderImpl:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 RecorderImpl ()
virtual ~RecorderImpl ()
DDS::ReturnCode_t cleanup ()
void init (TopicDescriptionImpl *a_topic_desc, const DDS::DataReaderQos &qos, RecorderListener_rch a_listener, const DDS::StatusMask &mask, DomainParticipantImpl *participant, DDS::SubscriberQos subqos)
DDS::ReturnCode_t enable ()
virtual bool check_transport_qos (const TransportInst &inst)
virtual const RepoIdget_repo_id () const
DDS::DomainId_t domain_id () const
virtual CORBA::Long get_priority_value (const AssociationData &data) const
virtual void data_received (const ReceivedDataSample &sample)
virtual void notify_subscription_disconnected (const WriterIdSeq &pubids)
virtual void notify_subscription_reconnected (const WriterIdSeq &pubids)
virtual void notify_subscription_lost (const WriterIdSeq &pubids)
virtual void notify_connection_deleted (const RepoId &)
virtual void add_association (const RepoId &yourId, const WriterAssociation &writer, bool active)
virtual void association_complete (const RepoId &remote_id)
virtual void remove_associations (const WriterIdSeq &writers, CORBA::Boolean callback)
virtual void update_incompatible_qos (const IncompatibleQosStatus &status)
virtual void inconsistent_topic ()
virtual void signal_liveliness (const RepoId &remote_participant)
void remove_all_associations ()
virtual DDS::ReturnCode_t repoid_to_bit_key (const DCPS::RepoId &id, DDS::BuiltinTopicKey_t &key)
DDS::ReturnCode_t set_qos (const ::DDS::SubscriberQos &subscriber_qos, const DDS::DataReaderQos &datareader_qos)
DDS::ReturnCode_t get_qos (DDS::SubscriberQos &subscriber_qos, DDS::DataReaderQos &datareader_qos)
DDS::ReturnCode_t set_listener (const RecorderListener_rch &a_listener, DDS::StatusMask mask)
RecorderListener_rch get_listener ()
DomainParticipantImplparticipant ()
virtual DDS::InstanceHandle_t get_instance_handle ()
virtual void register_for_writer (const RepoId &, const RepoId &, const RepoId &, const TransportLocatorSeq &, DiscoveryListener *)
virtual void unregister_for_writer (const RepoId &, const RepoId &, const RepoId &)

Protected Member Functions

virtual void remove_associations_i (const WriterIdSeq &writers, bool callback)
void remove_or_reschedule (const PublicationId &pub_id)

Private Member Functions

void notify_subscription_lost (const DDS::InstanceHandleSeq &handles)
bool lookup_instance_handles (const WriterIdSeq &ids, DDS::InstanceHandleSeq &hdls)
 Lookup the instance handles by the publication repo ids.
void listener_add_ref ()
void listener_remove_ref ()
void _add_ref ()
void _remove_ref ()
typedef OPENDDS_MAP_CMP (RepoId, DDS::InstanceHandle_t, GUID_tKeyLessThan) RepoIdToHandleMap
typedef OPENDDS_MAP_CMP (PublicationId, RcHandle< WriterInfo >, GUID_tKeyLessThan) WriterMapType
 publications writing to this reader.

Private Attributes

DDS::DataReaderQos qos_
ACE_Recursive_Thread_Mutex sample_lock_
 lock protecting sample container as well as statuses.
DomainParticipantImplparticipant_servant_
TopicImpltopic_servant_
bool is_exclusive_ownership_
OwnershipManagerowner_manager_
DDS::SubscriberQos subqos_
DDS::TopicDescription_var topic_desc_
DDS::StatusMask listener_mask_
RecorderListener_rch listener_
DDS::DomainId_t domain_id_
RemoveAssociationSweeper<
RecorderImpl > * 
remove_association_sweeper_
ACE_Recursive_Thread_Mutex publication_handle_lock_
RepoIdToHandleMap id_to_handle_map_
DDS::RequestedIncompatibleQosStatus requested_incompatible_qos_status_
DDS::SubscriptionMatchedStatus subscription_match_status_
bool is_bit_
WriterMapType writers_
ACE_RW_Thread_Mutex writers_lock_
 RW lock for reading/writing publications.

Friends

class RemoveAssociationSweeper< RecorderImpl >
class ::DDS_TEST

Detailed Description

Implementation of Recorder functionality.

This class is the implmentation of the Recorder. Inheritance is used to limit the applications access to underlying system methods.

Definition at line 39 of file RecorderImpl.h.


Constructor & Destructor Documentation

OpenDDS::DCPS::RecorderImpl::RecorderImpl (  ) 

Definition at line 47 of file RecorderImpl.cpp.

References DDS::SubscriptionMatchedStatus::current_count, DDS::SubscriptionMatchedStatus::current_count_change, DDS::HANDLE_NIL, DDS::RequestedIncompatibleQosStatus::last_policy_id, DDS::SubscriptionMatchedStatus::last_publication_handle, DDS::RequestedIncompatibleQosStatus::policies, requested_incompatible_qos_status_, subscription_match_status_, DDS::SubscriptionMatchedStatus::total_count, DDS::RequestedIncompatibleQosStatus::total_count, DDS::SubscriptionMatchedStatus::total_count_change, and DDS::RequestedIncompatibleQosStatus::total_count_change.

00048   : qos_(TheServiceParticipant->initial_DataReaderQos()),
00049   participant_servant_(0),
00050   topic_servant_(0),
00051 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00052   is_exclusive_ownership_ (false),
00053 #endif
00054 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00055   owner_manager_ (0),
00056 #endif
00057   subqos_ (TheServiceParticipant->initial_SubscriberQos()),
00058   topic_desc_(0),
00059   listener_mask_(DEFAULT_STATUS_MASK),
00060   domain_id_(0),
00061   remove_association_sweeper_(
00062     new RemoveAssociationSweeper<RecorderImpl>(TheServiceParticipant->reactor(),
00063                                          TheServiceParticipant->reactor_owner(),
00064                                          this)),
00065   is_bit_(false)
00066 {
00067 
00068   requested_incompatible_qos_status_.total_count = 0;
00069   requested_incompatible_qos_status_.total_count_change = 0;
00070   requested_incompatible_qos_status_.last_policy_id = 0;
00071   requested_incompatible_qos_status_.policies.length(0);
00072 
00073   subscription_match_status_.total_count = 0;
00074   subscription_match_status_.total_count_change = 0;
00075   subscription_match_status_.current_count = 0;
00076   subscription_match_status_.current_count_change = 0;
00077   subscription_match_status_.last_publication_handle =
00078     DDS::HANDLE_NIL;
00079 
00080 }

OpenDDS::DCPS::RecorderImpl::~RecorderImpl (  )  [virtual]

Definition at line 84 of file RecorderImpl.cpp.

References DBG_ENTRY_LVL, remove_association_sweeper_, and writers_.

00085 {
00086   DBG_ENTRY_LVL("RecorderImpl","~RecorderImpl",6);
00087   {
00088     ACE_READ_GUARD(ACE_RW_Thread_Mutex,
00089                    read_guard,
00090                    this->writers_lock_);
00091     // Cancel any uncancelled sweeper timers to decrement reference count.
00092     WriterMapType::iterator writer;
00093     for (writer = writers_.begin(); writer != writers_.end(); ++writer) {
00094       remove_association_sweeper_->cancel_timer(writer->second);
00095     }
00096   }
00097 
00098   remove_association_sweeper_->wait();
00099   remove_association_sweeper_->destroy();
00100 }


Member Function Documentation

void OpenDDS::DCPS::RecorderImpl::_add_ref (  )  [inline, private, virtual]

Implements OpenDDS::DCPS::Recorder.

Definition at line 155 of file RecorderImpl.h.

00155 { EntityImpl::_add_ref(); }

void OpenDDS::DCPS::RecorderImpl::_remove_ref (  )  [inline, private, virtual]

Implements OpenDDS::DCPS::Recorder.

Definition at line 156 of file RecorderImpl.h.

00156 { EntityImpl::_remove_ref(); }

void OpenDDS::DCPS::RecorderImpl::add_association ( const RepoId yourId,
const WriterAssociation writer,
bool  active 
) [virtual]

Implements OpenDDS::DCPS::DataReaderCallbacks.

Definition at line 279 of file RecorderImpl.cpp.

References DDS::SubscriptionMatchedStatus::current_count, DDS::SubscriptionMatchedStatus::current_count_change, OpenDDS::DCPS::DCPS_debug_level, DDS::DataWriterQos::durability, OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::DomainParticipantImpl::id_to_handle(), id_to_handle_map_, OpenDDS::DCPS::RcHandle< T >::in(), is_bit_, DDS::SubscriptionMatchedStatus::last_publication_handle, listener_, OPENDDS_STRING, participant_servant_, OpenDDS::DCPS::AssociationData::publication_transport_priority_, DDS::DataWriterQos::reliability, DDS::RELIABLE_RELIABILITY_QOS, OpenDDS::DCPS::AssociationData::remote_data_, OpenDDS::DCPS::AssociationData::remote_durable_, OpenDDS::DCPS::AssociationData::remote_id_, OpenDDS::DCPS::AssociationData::remote_reliable_, OpenDDS::DCPS::WriterInfoListener::subscription_id_, subscription_match_status_, TheServiceParticipant, DDS::SubscriptionMatchedStatus::total_count, DDS::SubscriptionMatchedStatus::total_count_change, DDS::DataWriterQos::transport_priority, DDS::VOLATILE_DURABILITY_QOS, OpenDDS::DCPS::WriterAssociation::writerId, OpenDDS::DCPS::WriterAssociation::writerQos, writers_, and OpenDDS::DCPS::WriterAssociation::writerTransInfo.

00282 {
00283     ACE_DEBUG((LM_DEBUG, "RecorderImpl::add_association\n"));
00284   //
00285   // The following block is for diagnostic purposes only.
00286   //
00287   if (DCPS_debug_level >= 1) {
00288     GuidConverter reader_converter(yourId);
00289     GuidConverter writer_converter(writer.writerId);
00290     ACE_DEBUG((LM_DEBUG,
00291                ACE_TEXT("(%P|%t) RecorderImpl::add_association - ")
00292                ACE_TEXT("bit %d local %C remote %C\n"),
00293                is_bit_,
00294                OPENDDS_STRING(reader_converter).c_str(),
00295                OPENDDS_STRING(writer_converter).c_str()));
00296   }
00297 
00298   //
00299   // This block prevents adding associations to deleted readers.
00300   // Presumably this is a "good thing(tm)".
00301   //
00302   // if (entity_deleted_ == true) {
00303   //   if (DCPS_debug_level >= 1)
00304   //     ACE_DEBUG((LM_DEBUG,
00305   //                ACE_TEXT("(%P|%t) RecorderImpl::add_association")
00306   //                ACE_TEXT(" This is a deleted datareader, ignoring add.\n")));
00307   //
00308   //   return;
00309   // }
00310 
00311   //
00312   // We are being called back from the repository before we are done
00313   // processing after our call to the repository that caused this call
00314   // (from the repository) to be made.
00315   //
00316   if (GUID_UNKNOWN == subscription_id_) {
00317     // add_associations was invoked before DCSPInfoRepo::add_subscription() returned.
00318     subscription_id_ = yourId;
00319   }
00320 
00321   //
00322   // We do the following while holding the publication_handle_lock_.
00323   //
00324   {
00325     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->publication_handle_lock_);
00326 
00327     //
00328     // For each writer in the list of writers to associate with, we
00329     // create a WriterInfo and a WriterStats object and store them in
00330     // our internal maps.
00331     //
00332     {
00333       ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_);
00334 
00335       const PublicationId& writer_id = writer.writerId;
00336       RcHandle<WriterInfo> info = new WriterInfo(this, writer_id, writer.writerQos);
00337       /*std::pair<WriterMapType::iterator, bool> bpair =*/
00338       this->writers_.insert(
00339         // This insertion is idempotent.
00340         WriterMapType::value_type(
00341           writer_id,
00342           info));
00343       // this->statistics_.insert(
00344       //   StatsMapType::value_type(
00345       //     writer_id,
00346       //     WriterStats(
00347       //       this->raw_latency_buffer_size_,
00348       //       this->raw_latency_buffer_type_)));
00349 
00350       // if (DCPS_debug_level > 4) {
00351       //   GuidConverter converter(writer_id);
00352       //   ACE_DEBUG((LM_DEBUG,
00353       //              "(%P|%t) RecorderImpl::add_association: "
00354       //              "inserted writer %C.return %d \n",
00355       //              OPENDDS_STRING(converter).c_str(), bpair.second));
00356       //
00357       //   WriterMapType::iterator iter = writers_.find(writer_id);
00358       //   if (iter != writers_.end()) {
00359       //     // This may not be an error since it could happen that the sample
00360       //     // is delivered to the datareader after the write is dis-associated
00361       //     // with this datareader.
00362       //     GuidConverter reader_converter(subscription_id_);
00363       //     GuidConverter writer_converter(writer_id);
00364       //     ACE_DEBUG((LM_DEBUG,
00365       //               ACE_TEXT("(%P|%t) RecorderImpl::add_association: ")
00366       //               ACE_TEXT("reader %C is associated with writer %C.\n"),
00367       //               OPENDDS_STRING(reader_converter).c_str(),
00368       //               OPENDDS_STRING(writer_converter).c_str()));
00369       //   }
00370       // }
00371     }
00372 
00373     //
00374     // Propagate the add_associations processing down into the Transport
00375     // layer here.  This will establish the transport support and reserve
00376     // usage of an existing connection or initiate creation of a new
00377     // connection if no suitable connection is available.
00378     //
00379     AssociationData data;
00380     data.remote_id_ = writer.writerId;
00381     data.remote_data_ = writer.writerTransInfo;
00382     data.publication_transport_priority_ =
00383       writer.writerQos.transport_priority.value;
00384     data.remote_reliable_ =
00385       (writer.writerQos.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS);
00386     data.remote_durable_ =
00387       (writer.writerQos.durability.kind > DDS::VOLATILE_DURABILITY_QOS);
00388 
00389     if (!this->associate(data, active)) {
00390       if (DCPS_debug_level) {
00391         ACE_DEBUG((LM_ERROR,
00392                    ACE_TEXT("(%P|%t) RecorderImpl::add_association: ")
00393                    ACE_TEXT("ERROR: transport layer failed to associate.\n")));
00394       }
00395       return;
00396     }
00397 
00398     // Check if any publications have already sent a REQUEST_ACK message.
00399     // {
00400     //   ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
00401     //
00402     //   WriterMapType::iterator where = this->writers_.find(writer.writerId);
00403     //
00404     //   if (where != this->writers_.end()) {
00405     //     const ACE_Time_Value now = ACE_OS::gettimeofday();
00406     //
00407     //     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_);
00408     //
00409     //     if (where->second->should_ack(now)) {
00410     //       const SequenceNumber sequence = where->second->ack_sequence();
00411     //       const DDS::Time_t timenow = time_value_to_time(now);
00412     //       if (this->send_sample_ack(writer.writerId, sequence, timenow)) {
00413     //         where->second->clear_acks(sequence);
00414     //       }
00415     //     }
00416     //   }
00417     // }
00418 
00419     //
00420     // LIVELINESS policy timers are managed here.
00421     //
00422     // if (liveliness_lease_duration_ != ACE_Time_Value::zero) {
00423     //   // this call will start the timer if it is not already set
00424     //   const ACE_Time_Value now = ACE_OS::gettimeofday();
00425     //
00426     //   if (DCPS_debug_level >= 5) {
00427     //     GuidConverter converter(subscription_id_);
00428     //     ACE_DEBUG((LM_DEBUG,
00429     //                ACE_TEXT("(%P|%t) RecorderImpl::add_association: ")
00430     //                ACE_TEXT("starting/resetting liveliness timer for reader %C\n"),
00431     //                OPENDDS_STRING(converter).c_str()));
00432     //   }
00433     //
00434     //   this->handle_timeout(now, this);
00435     // }
00436 
00437     // else - no timer needed when LIVELINESS.lease_duration is INFINITE
00438 
00439   }
00440   //
00441   // We no longer hold the publication_handle_lock_.
00442   //
00443 
00444   //
00445   // We only do the following processing for readers that are *not*
00446   // readers of Builtin Topics.
00447   //
00448   if (!is_bit_) {
00449 
00450     DDS::InstanceHandle_t handle =
00451       this->participant_servant_->id_to_handle(writer.writerId);
00452 
00453     //
00454     // We acquire the publication_handle_lock_ for the remainder of our
00455     // processing.
00456     //
00457     {
00458       ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->publication_handle_lock_);
00459 
00460       // This insertion is idempotent.
00461       this->id_to_handle_map_.insert(
00462         RepoIdToHandleMap::value_type(writer.writerId, handle));
00463 
00464       if (DCPS_debug_level > 4) {
00465         GuidConverter converter(writer.writerId);
00466         ACE_DEBUG((LM_DEBUG,
00467                    ACE_TEXT("(%P|%t) RecorderImpl::add_association: ")
00468                    ACE_TEXT("id_to_handle_map_[ %C] = 0x%x.\n"),
00469                    OPENDDS_STRING(converter).c_str(),
00470                    handle));
00471       }
00472 
00473       // We need to adjust these after the insertions have all completed
00474       // since insertions are not guaranteed to increase the number of
00475       // currently matched publications.
00476       int matchedPublications = static_cast<int>(this->id_to_handle_map_.size());
00477       this->subscription_match_status_.current_count_change
00478         = matchedPublications - this->subscription_match_status_.current_count;
00479       this->subscription_match_status_.current_count = matchedPublications;
00480 
00481       ++this->subscription_match_status_.total_count;
00482       ++this->subscription_match_status_.total_count_change;
00483 
00484       this->subscription_match_status_.last_publication_handle = handle;
00485 
00486       // set_status_changed_flag(DDS::SUBSCRIPTION_MATCHED_STATUS, true);
00487 
00488 
00489       if (listener_.in()) {
00490         listener_->on_recorder_matched(
00491           this,
00492           this->subscription_match_status_);
00493 
00494         // TBD - why does the spec say to change this but not change
00495         //       the ChangeFlagStatus after a listener call?
00496 
00497         // Client will look at it so next time it looks the change should be 0
00498         this->subscription_match_status_.total_count_change = 0;
00499         this->subscription_match_status_.current_count_change = 0;
00500       }
00501 
00502       // notify_status_condition();
00503     }
00504 
00505     {
00506       ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_);
00507 
00508       this->writers_[writer.writerId]->handle_ = handle;
00509     }
00510   }
00511 
00512   if (!active) {
00513     Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
00514     disco->association_complete(this->domain_id_,
00515                                 this->participant_servant_->get_id(),
00516                                 this->subscription_id_, writer.writerId);
00517   }
00518 
00519   // if (this->monitor_) {
00520   //   this->monitor_->report();
00521   // }
00522 }

void OpenDDS::DCPS::RecorderImpl::association_complete ( const RepoId remote_id  )  [virtual]

Implements OpenDDS::DCPS::DataReaderCallbacks.

Definition at line 525 of file RecorderImpl.cpp.

00526 {
00527   // For the current DCPSInfoRepo implementation, the DataReader side will
00528   // always be passive, so association_complete() will not be called.
00529 }

bool OpenDDS::DCPS::RecorderImpl::check_transport_qos ( const TransportInst inst  )  [virtual]

Implements OpenDDS::DCPS::TransportClient.

Definition at line 199 of file RecorderImpl.cpp.

References OpenDDS::DCPS::TransportInst::is_reliable(), and DDS::RELIABLE_RELIABILITY_QOS.

00200 {
00201   if (this->qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS) {
00202     return ti.is_reliable();
00203   }
00204   return true;
00205 }

DDS::ReturnCode_t OpenDDS::DCPS::RecorderImpl::cleanup (  ) 

cleanup the DataWriter.

Definition at line 104 of file RecorderImpl.cpp.

References OpenDDS::DCPS::DomainParticipantImpl::get_id(), participant_servant_, remove_all_associations(), remove_association_sweeper_, OpenDDS::DCPS::TopicImpl::remove_entity_ref(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::WriterInfoListener::subscription_id_, TheServiceParticipant, topic_servant_, and writers_.

Referenced by OpenDDS::DCPS::Service_Participant::delete_recorder().

00105 {
00106 
00107   Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
00108   if (!disco->remove_subscription(this->domain_id_,
00109                                   participant_servant_->get_id(),
00110                                   this->subscription_id_)) {
00111     ACE_ERROR_RETURN((LM_ERROR,
00112                       ACE_TEXT("(%P|%t) ERROR: ")
00113                       ACE_TEXT("RecorderImpl::cleanup: ")
00114                       ACE_TEXT(" could not remove subscription from discovery.\n")),
00115                      DDS::RETCODE_ERROR);
00116   }
00117 
00118   // Call remove association before unregistering the datareader from the transport,
00119   // otherwise some callbacks resulted from remove_association may lost.
00120 
00121   this->remove_all_associations();
00122 
00123   if (topic_servant_) {
00124     topic_servant_->remove_entity_ref();
00125     topic_servant_->_remove_ref();
00126   }
00127   {
00128     ACE_READ_GUARD_RETURN(ACE_RW_Thread_Mutex,
00129                    read_guard,
00130                    this->writers_lock_,
00131                    0);
00132     // Cancel any uncancelled sweeper timers
00133     WriterMapType::iterator writer;
00134     for (writer = writers_.begin(); writer != writers_.end(); ++writer) {
00135       remove_association_sweeper_->cancel_timer(writer->second);
00136     }
00137   }
00138 
00139   remove_association_sweeper_->wait();
00140   return DDS::RETCODE_OK;
00141 }

void OpenDDS::DCPS::RecorderImpl::data_received ( const ReceivedDataSample sample  )  [virtual]

Implements OpenDDS::DCPS::TransportReceiveListener.

Definition at line 218 of file RecorderImpl.cpp.

References OpenDDS::DCPS::DataSampleHeader::byte_order_, DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::ReceivedDataSample::header_, OpenDDS::DCPS::RcHandle< T >::in(), listener_, OpenDDS::DCPS::DataSampleHeader::message_id_, OPENDDS_STRING, OpenDDS::DCPS::DataSampleHeader::publication_id_, OpenDDS::DCPS::ReceivedDataSample::sample_, OpenDDS::DCPS::SAMPLE_DATA, OpenDDS::DCPS::DataSampleHeader::source_timestamp_nanosec_, OpenDDS::DCPS::DataSampleHeader::source_timestamp_sec_, OpenDDS::DCPS::WriterInfoListener::subscription_id_, and OpenDDS::DCPS::to_string().

00219 {
00220   DBG_ENTRY_LVL("RecorderImpl","data_received",6);
00221 
00222   // ensure some other thread is not changing the sample container
00223   // or statuses related to samples.
00224   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_);
00225 
00226   if (DCPS_debug_level > 9) {
00227     GuidConverter converter(subscription_id_);
00228     ACE_DEBUG((LM_DEBUG,
00229                ACE_TEXT("(%P|%t) RecorderImpl::data_received: ")
00230                ACE_TEXT("%C received sample: %C.\n"),
00231                OPENDDS_STRING(converter).c_str(),
00232                to_string(sample.header_).c_str()));
00233   }
00234 
00235   // we only support SAMPLE_DATA messages
00236   if (sample.header_.message_id_ != SAMPLE_DATA)
00237     return;
00238 
00239   RawDataSample rawSample(static_cast<MessageId> (sample.header_.message_id_),
00240                           sample.header_.source_timestamp_sec_,
00241                           sample.header_.source_timestamp_nanosec_,
00242                           sample.header_.publication_id_,
00243                           sample.header_.byte_order_,
00244                           sample.sample_);
00245 
00246   if (listener_.in()) {
00247     listener_->on_sample_data_received(this, rawSample);
00248   }
00249 
00250 }

DDS::DomainId_t OpenDDS::DCPS::RecorderImpl::domain_id (  )  const [inline, virtual]

Implements OpenDDS::DCPS::TransportClient.

Definition at line 73 of file RecorderImpl.h.

00073 { return this->domain_id_; }

DDS::ReturnCode_t OpenDDS::DCPS::RecorderImpl::enable (  ) 

Implements DDS::Entity.

Definition at line 968 of file RecorderImpl.cpp.

References OpenDDS::DCPS::TransportClient::connection_info(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::TransportClient::enable_transport(), OpenDDS::DCPS::TopicImpl::get_id(), OpenDDS::DCPS::TopicDescriptionImpl::get_name(), OpenDDS::DCPS::GUID_UNKNOWN, DDS::RELIABLE_RELIABILITY_QOS, DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::EntityImpl::set_enabled(), subqos_, OpenDDS::DCPS::WriterInfoListener::subscription_id_, TheServiceParticipant, topic_servant_, and DDS::VOLATILE_DURABILITY_QOS.

Referenced by OpenDDS::DCPS::DomainParticipantImpl::create_recorder().

00969 {
00970   if (DCPS_debug_level >= 1) {
00971 
00972     ACE_DEBUG((LM_DEBUG,
00973                ACE_TEXT("(%P|%t) RecorderImpl::enable\n")));
00974   }
00975   //According spec:
00976   // - Calling enable on an already enabled Entity returns OK and has no
00977   // effect.
00978   // - Calling enable on an Entity whose factory is not enabled will fail
00979   // and return PRECONDITION_NOT_MET.
00980 
00981   if (this->is_enabled()) {
00982     return DDS::RETCODE_OK;
00983   }
00984 
00985   this->set_enabled();
00986 
00987   // if (topic_servant_ && !transport_disabled_) {
00988   if (topic_servant_) {
00989 
00990     ACE_DEBUG((LM_DEBUG,
00991                ACE_TEXT("(%P|%t) RecorderImpl::enable_transport\n")));
00992 
00993     try {
00994       this->enable_transport(this->qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS,
00995                              this->qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS);
00996     } catch (const Transport::Exception&) {
00997       ACE_ERROR((LM_ERROR,
00998                  ACE_TEXT("(%P|%t) ERROR: RecorderImpl::enable, ")
00999                  ACE_TEXT("Transport Exception.\n")));
01000       return DDS::RETCODE_ERROR;
01001 
01002     }
01003 
01004     const TransportLocatorSeq& trans_conf_info = this->connection_info();
01005 
01006     CORBA::String_var filterClassName = "";
01007     CORBA::String_var filterExpression = "";
01008     DDS::StringSeq exprParams;
01009 
01010 
01011     Discovery_rch disco =
01012       TheServiceParticipant->get_discovery(this->domain_id_);
01013 
01014     ACE_DEBUG((LM_DEBUG,
01015                ACE_TEXT("(%P|%t) RecorderImpl::add_subscription\n")));
01016 
01017     this->subscription_id_ =
01018       disco->add_subscription(this->domain_id_,
01019                               this->participant_servant_->get_id(),
01020                               this->topic_servant_->get_id(),
01021                               this,
01022                               this->qos_,
01023                               trans_conf_info,
01024                               this->subqos_,
01025                               filterClassName,
01026                               filterExpression,
01027                               exprParams);
01028 
01029     if (this->subscription_id_ == OpenDDS::DCPS::GUID_UNKNOWN) {
01030       ACE_ERROR((LM_ERROR,
01031                  ACE_TEXT("(%P|%t) ERROR: RecorderImpl::enable, ")
01032                  ACE_TEXT("add_subscription returned invalid id.\n")));
01033       return DDS::RETCODE_ERROR;
01034     }
01035   }
01036 
01037   if (topic_servant_) {
01038     const CORBA::String_var name = topic_servant_->get_name();
01039     DDS::ReturnCode_t return_value = DDS::RETCODE_OK;
01040     //   this->participant_servant_->recorder_enabled(name.in(), this);
01041 
01042     return return_value;
01043   } else {
01044     return DDS::RETCODE_OK;
01045   }
01046 }

DDS::InstanceHandle_t OpenDDS::DCPS::RecorderImpl::get_instance_handle (  )  [virtual]

Implements OpenDDS::DCPS::EntityImpl.

Definition at line 1049 of file RecorderImpl.cpp.

References OpenDDS::DCPS::DomainParticipantImpl::id_to_handle(), participant_servant_, and OpenDDS::DCPS::WriterInfoListener::subscription_id_.

01050 {
01051   return this->participant_servant_->id_to_handle(subscription_id_);
01052 }

RecorderListener_rch OpenDDS::DCPS::RecorderImpl::get_listener (  )  [virtual]

Get the listener for this Recorder.

Implements OpenDDS::DCPS::Recorder.

Definition at line 932 of file RecorderImpl.cpp.

References listener_.

00933 {
00934   return listener_;
00935 }

CORBA::Long OpenDDS::DCPS::RecorderImpl::get_priority_value ( const AssociationData data  )  const [virtual]

Implements OpenDDS::DCPS::TransportClient.

Definition at line 212 of file RecorderImpl.cpp.

References OpenDDS::DCPS::AssociationData::publication_transport_priority_.

00213 {
00214   return data.publication_transport_priority_;
00215 }

DDS::ReturnCode_t OpenDDS::DCPS::RecorderImpl::get_qos ( DDS::SubscriberQos subscriber_qos,
DDS::DataReaderQos datareader_qos 
) [virtual]

Get the Quality of Service settings for the Recorder.

Implements OpenDDS::DCPS::Recorder.

Definition at line 912 of file RecorderImpl.cpp.

References DDS::RETCODE_OK, and subqos_.

00915 {
00916   qos = qos_;
00917   subscriber_qos = subqos_;
00918   return DDS::RETCODE_OK;
00919 }

const RepoId & OpenDDS::DCPS::RecorderImpl::get_repo_id (  )  const [virtual]

Implements OpenDDS::DCPS::TransportClient.

Definition at line 207 of file RecorderImpl.cpp.

References OpenDDS::DCPS::WriterInfoListener::subscription_id_.

00208 {
00209   return this->subscription_id_;
00210 }

void OpenDDS::DCPS::RecorderImpl::inconsistent_topic (  )  [virtual]

Implements OpenDDS::DCPS::DataReaderCallbacks.

Definition at line 817 of file RecorderImpl.cpp.

References OpenDDS::DCPS::TopicImpl::inconsistent_topic(), and topic_servant_.

00818 {
00819   topic_servant_->inconsistent_topic();
00820 }

void OpenDDS::DCPS::RecorderImpl::init ( TopicDescriptionImpl a_topic_desc,
const DDS::DataReaderQos qos,
RecorderListener_rch  a_listener,
const DDS::StatusMask mask,
DomainParticipantImpl participant,
DDS::SubscriberQos  subqos 
)

Definition at line 143 of file RecorderImpl.cpp.

References OpenDDS::DCPS::TopicImpl::add_entity_ref(), OpenDDS::DCPS::BUILT_IN_PARTICIPANT_TOPIC, OpenDDS::DCPS::BUILT_IN_PUBLICATION_TOPIC, OpenDDS::DCPS::BUILT_IN_SUBSCRIPTION_TOPIC, OpenDDS::DCPS::BUILT_IN_TOPIC_TOPIC, OpenDDS::DCPS::DCPS_debug_level, domain_id_, DDS::EXCLUSIVE_OWNERSHIP_QOS, OpenDDS::DCPS::DomainParticipantImpl::get_domain_id(), is_bit_, is_exclusive_ownership_, listener_, listener_mask_, owner_manager_, OpenDDS::DCPS::DomainParticipantImpl::ownership_manager(), participant(), participant_servant_, qos_, subqos_, topic_desc_, and topic_servant_.

Referenced by OpenDDS::DCPS::DomainParticipantImpl::create_recorder().

00150 {
00151   //
00152   if (DCPS_debug_level >= 1) {
00153 
00154     ACE_DEBUG((LM_DEBUG,
00155                ACE_TEXT("(%P|%t) RecorderImpl::init \n")));
00156   }
00157 
00158 
00159   topic_desc_ = DDS::TopicDescription::_duplicate(a_topic_desc);
00160   if (TopicImpl* a_topic = dynamic_cast<TopicImpl*>(a_topic_desc)) {
00161     topic_servant_ = a_topic;
00162     topic_servant_->_add_ref();
00163 
00164     topic_servant_->add_entity_ref();
00165   }
00166 
00167   CORBA::String_var topic_name = a_topic_desc->get_name();
00168 
00169 #if !defined (DDS_HAS_MINIMUM_BIT)
00170   is_bit_ = ACE_OS::strcmp(topic_name.in(), BUILT_IN_PARTICIPANT_TOPIC) == 0
00171             || ACE_OS::strcmp(topic_name.in(), BUILT_IN_TOPIC_TOPIC) == 0
00172             || ACE_OS::strcmp(topic_name.in(), BUILT_IN_SUBSCRIPTION_TOPIC) == 0
00173             || ACE_OS::strcmp(topic_name.in(), BUILT_IN_PUBLICATION_TOPIC) == 0;
00174 #endif   // !defined (DDS_HAS_MINIMUM_BIT)
00175 
00176   qos_ = qos;
00177 
00178 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00179   is_exclusive_ownership_ = this->qos_.ownership.kind == ::DDS::EXCLUSIVE_OWNERSHIP_QOS;
00180 #endif
00181 
00182   listener_ = a_listener;
00183   listener_mask_ = mask;
00184 
00185   // Only store the participant pointer, since it is our "grand"
00186   // parent, we will exist as long as it does
00187   participant_servant_ = participant;
00188 
00189 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00190   if (is_exclusive_ownership_) {
00191     owner_manager_ = participant_servant_->ownership_manager ();
00192   }
00193 #endif
00194 
00195   domain_id_ = participant_servant_->get_domain_id();
00196   subqos_ = subqos;
00197 }

void OpenDDS::DCPS::RecorderImpl::listener_add_ref (  )  [inline, private, virtual]

Implements OpenDDS::DCPS::TransportReceiveListener.

Definition at line 153 of file RecorderImpl.h.

00153 { EntityImpl::_add_ref(); }

void OpenDDS::DCPS::RecorderImpl::listener_remove_ref (  )  [inline, private, virtual]

Implements OpenDDS::DCPS::TransportReceiveListener.

Definition at line 154 of file RecorderImpl.h.

00154 { EntityImpl::_remove_ref(); }

bool OpenDDS::DCPS::RecorderImpl::lookup_instance_handles ( const WriterIdSeq ids,
DDS::InstanceHandleSeq hdls 
) [private]

Lookup the instance handles by the publication repo ids.

Definition at line 938 of file RecorderImpl.cpp.

References OpenDDS::DCPS::DCPS_debug_level, and OPENDDS_STRING.

00940 {
00941   if (DCPS_debug_level > 9) {
00942     CORBA::ULong const size = ids.length();
00943     OPENDDS_STRING separator = "";
00944     OPENDDS_STRING buffer;
00945 
00946     for (unsigned long i = 0; i < size; ++i) {
00947       buffer += separator + OPENDDS_STRING(GuidConverter(ids[i]));
00948       separator = ", ";
00949     }
00950 
00951     ACE_DEBUG((LM_DEBUG,
00952                ACE_TEXT("(%P|%t) RecorderImpl::lookup_instance_handles: ")
00953                ACE_TEXT("searching for handles for writer Ids: %C.\n"),
00954                buffer.c_str()));
00955   }
00956 
00957   CORBA::ULong const num_wrts = ids.length();
00958   hdls.length(num_wrts);
00959 
00960   for (CORBA::ULong i = 0; i < num_wrts; ++i) {
00961     hdls[i] = this->participant_servant_->id_to_handle(ids[i]);
00962   }
00963 
00964   return true;
00965 }

void OpenDDS::DCPS::RecorderImpl::notify_connection_deleted ( const RepoId  )  [virtual]

Implements OpenDDS::DCPS::TransportReceiveListener.

Definition at line 271 of file RecorderImpl.cpp.

00272 {
00273 
00274 }

void OpenDDS::DCPS::RecorderImpl::notify_subscription_disconnected ( const WriterIdSeq pubids  )  [virtual]

Implements OpenDDS::DCPS::TransportReceiveListener.

Definition at line 252 of file RecorderImpl.cpp.

00253 {
00254 }

void OpenDDS::DCPS::RecorderImpl::notify_subscription_lost ( const DDS::InstanceHandleSeq handles  )  [private]

Definition at line 262 of file RecorderImpl.cpp.

00263 {
00264 }

void OpenDDS::DCPS::RecorderImpl::notify_subscription_lost ( const WriterIdSeq pubids  )  [virtual]

Implements OpenDDS::DCPS::TransportReceiveListener.

Definition at line 266 of file RecorderImpl.cpp.

Referenced by remove_associations_i().

00267 {
00268 
00269 }

void OpenDDS::DCPS::RecorderImpl::notify_subscription_reconnected ( const WriterIdSeq pubids  )  [virtual]

Implements OpenDDS::DCPS::TransportReceiveListener.

Definition at line 256 of file RecorderImpl.cpp.

00257 {
00258 
00259 }

typedef OpenDDS::DCPS::RecorderImpl::OPENDDS_MAP_CMP ( PublicationId  ,
RcHandle< WriterInfo ,
GUID_tKeyLessThan   
) [private]

publications writing to this reader.

typedef OpenDDS::DCPS::RecorderImpl::OPENDDS_MAP_CMP ( RepoId  ,
DDS::InstanceHandle_t  ,
GUID_tKeyLessThan   
) [private]

DomainParticipantImpl* OpenDDS::DCPS::RecorderImpl::participant (  )  [inline]

Definition at line 125 of file RecorderImpl.h.

Referenced by OpenDDS::DCPS::Service_Participant::delete_recorder(), init(), register_for_writer(), and unregister_for_writer().

00125                                                 {
00126     return participant_servant_;
00127   }

void OpenDDS::DCPS::RecorderImpl::register_for_writer ( const RepoId ,
const RepoId ,
const RepoId ,
const TransportLocatorSeq ,
DiscoveryListener  
) [virtual]

Reimplemented from OpenDDS::DCPS::DataReaderCallbacks.

Definition at line 1055 of file RecorderImpl.cpp.

References participant(), and OpenDDS::DCPS::TransportClient::register_for_writer().

01060 {
01061   TransportClient::register_for_writer(participant, readerid, writerid, locators, listener);
01062 }

void OpenDDS::DCPS::RecorderImpl::remove_all_associations (  ) 

Definition at line 739 of file RecorderImpl.cpp.

References DBG_ENTRY_LVL, remove_associations(), and writers_.

Referenced by cleanup().

00740 {
00741   DBG_ENTRY_LVL("RecorderImpl","remove_all_associations",6);
00742 
00743   OpenDDS::DCPS::WriterIdSeq writers;
00744   int size;
00745 
00746   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->publication_handle_lock_);
00747 
00748   {
00749     ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
00750 
00751     size = static_cast<int>(writers_.size());
00752     writers.length(size);
00753 
00754     WriterMapType::iterator curr_writer = writers_.begin();
00755     WriterMapType::iterator end_writer = writers_.end();
00756 
00757     int i = 0;
00758 
00759     while (curr_writer != end_writer) {
00760       writers[i++] = curr_writer->first;
00761       ++curr_writer;
00762     }
00763   }
00764 
00765   try {
00766     CORBA::Boolean dont_notify_lost = 0;
00767 
00768     if (0 < size) {
00769       remove_associations(writers, dont_notify_lost);
00770     }
00771 
00772   } catch (const CORBA::Exception&) {
00773   }
00774 }

virtual void OpenDDS::DCPS::RecorderImpl::remove_associations ( const WriterIdSeq writers,
CORBA::Boolean  callback 
) [virtual]

Implements OpenDDS::DCPS::DataReaderCallbacks.

Referenced by remove_all_associations(), and remove_or_reschedule().

void OpenDDS::DCPS::RecorderImpl::remove_associations_i ( const WriterIdSeq writers,
bool  callback 
) [protected, virtual]

Definition at line 601 of file RecorderImpl.cpp.

References DDS::SubscriptionMatchedStatus::current_count, DDS::SubscriptionMatchedStatus::current_count_change, DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, id_to_handle_map_, OpenDDS::DCPS::RcHandle< T >::in(), is_bit_, DDS::SubscriptionMatchedStatus::last_publication_handle, listener_, notify_subscription_lost(), OPENDDS_STRING, OpenDDS::DCPS::push_back(), remove_association_sweeper_, OpenDDS::DCPS::WriterInfoListener::subscription_id_, subscription_match_status_, DDS::SubscriptionMatchedStatus::total_count_change, and writers_.

Referenced by remove_or_reschedule().

00603 {
00604   DBG_ENTRY_LVL("RecorderImpl", "remove_associations_i", 6);
00605 
00606   if (writers.length() == 0) {
00607     return;
00608   }
00609 
00610   if (DCPS_debug_level >= 1) {
00611     GuidConverter reader_converter(subscription_id_);
00612     GuidConverter writer_converter(writers[0]);
00613     ACE_DEBUG((LM_DEBUG,
00614                ACE_TEXT("(%P|%t) RecorderImpl::remove_associations_i: ")
00615                ACE_TEXT("bit %d local %C remote %C num remotes %d \n"),
00616                is_bit_,
00617                OPENDDS_STRING(reader_converter).c_str(),
00618                OPENDDS_STRING(writer_converter).c_str(),
00619                writers.length()));
00620   }
00621   DDS::InstanceHandleSeq handles;
00622 
00623   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->publication_handle_lock_);
00624 
00625   // This is used to hold the list of writers which were actually
00626   // removed, which is a proper subset of the writers which were
00627   // requested to be removed.
00628   WriterIdSeq updated_writers;
00629 
00630   CORBA::ULong wr_len;
00631 
00632   //Remove the writers from writer list. If the supplied writer
00633   //is not in the cached writers list then it is already removed.
00634   //We just need remove the writers in the list that have not been
00635   //removed.
00636   {
00637     ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_);
00638 
00639     wr_len = writers.length();
00640 
00641     for (CORBA::ULong i = 0; i < wr_len; i++) {
00642       PublicationId writer_id = writers[i];
00643 
00644       WriterMapType::iterator it = this->writers_.find(writer_id);
00645 
00646       if (it != this->writers_.end()) {
00647         it->second->removed();
00648         remove_association_sweeper_->cancel_timer(it->second);
00649       }
00650 
00651       if (this->writers_.erase(writer_id) == 0) {
00652         if (DCPS_debug_level >= 1) {
00653           GuidConverter converter(writer_id);
00654           ACE_DEBUG((LM_DEBUG,
00655                      ACE_TEXT("(%P|%t) RecorderImpl::remove_associations_i: ")
00656                      ACE_TEXT("the writer local %C was already removed.\n"),
00657                      OPENDDS_STRING(converter).c_str()));
00658         }
00659 
00660       } else {
00661         push_back(updated_writers, writer_id);
00662       }
00663     }
00664   }
00665 
00666   wr_len = updated_writers.length();
00667 
00668   // Return now if the supplied writers have been removed already.
00669   if (wr_len == 0) {
00670     return;
00671   }
00672 
00673   if (!is_bit_) {
00674     // The writer should be in the id_to_handle map at this time.  Note
00675     // it if it not there.
00676     if (this->lookup_instance_handles(updated_writers, handles) == false) {
00677       if (DCPS_debug_level > 4) {
00678         ACE_DEBUG((LM_DEBUG,
00679                    ACE_TEXT("(%P|%t) RecorderImpl::remove_associations_i: ")
00680                    ACE_TEXT("lookup_instance_handles failed.\n")));
00681       }
00682     }
00683     for (CORBA::ULong i = 0; i < wr_len; ++i) {
00684       id_to_handle_map_.erase(updated_writers[i]);
00685     }
00686   }
00687   for (CORBA::ULong i = 0; i < updated_writers.length(); ++i) {
00688     this->disassociate(updated_writers[i]);
00689   }
00690 
00691   // Mirror the add_associations SUBSCRIPTION_MATCHED_STATUS processing.
00692   if (!this->is_bit_) {
00693     // Derive the change in the number of publications writing to this reader.
00694     int matchedPublications = static_cast<int>(this->id_to_handle_map_.size());
00695     this->subscription_match_status_.current_count_change
00696       = matchedPublications - this->subscription_match_status_.current_count;
00697 
00698     // Only process status if the number of publications has changed.
00699     if (this->subscription_match_status_.current_count_change != 0) {
00700       this->subscription_match_status_.current_count = matchedPublications;
00701       /// Section 7.1.4.1: total_count will not decrement.
00702 
00703       /// @TODO: Reconcile this with the verbiage in section 7.1.4.1
00704       this->subscription_match_status_.last_publication_handle
00705         = handles[ wr_len - 1];
00706 
00707       // set_status_changed_flag(DDS::SUBSCRIPTION_MATCHED_STATUS, true);
00708 
00709       // DDS::DataReaderListener_var listener
00710       // = listener_for(DDS::SUBSCRIPTION_MATCHED_STATUS);
00711 
00712       if (listener_.in()) {
00713         listener_->on_recorder_matched(
00714           this,
00715           this->subscription_match_status_);
00716 
00717         // Client will look at it so next time it looks the change should be 0
00718         this->subscription_match_status_.total_count_change = 0;
00719         this->subscription_match_status_.current_count_change = 0;
00720       }
00721 
00722       // notify_status_condition();
00723     }
00724   }
00725 
00726   // If this remove_association is invoked when the InfoRepo
00727   // detects a lost writer then make a callback to notify
00728   // subscription lost.
00729   if (notify_lost) {
00730     this->notify_subscription_lost(handles);
00731   }
00732 
00733   // if (this->monitor_) {
00734   //   this->monitor_->report();
00735   // }
00736 }

void OpenDDS::DCPS::RecorderImpl::remove_or_reschedule ( const PublicationId pub_id  )  [protected]

Definition at line 581 of file RecorderImpl.cpp.

References OpenDDS::DCPS::push_back(), remove_associations(), remove_associations_i(), and writers_.

00582 {
00583   ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_);
00584   WriterMapType::iterator where = writers_.find(pub_id);
00585   if (writers_.end() != where) {
00586     WriterInfo& info = *where->second;
00587     WriterIdSeq writers;
00588     push_back(writers, pub_id);
00589     bool notify = info.notify_lost_;
00590     if (info.removal_deadline_ < ACE_OS::gettimeofday()) {
00591       write_guard.release();
00592       remove_associations_i(writers, notify);
00593     } else {
00594       write_guard.release();
00595       remove_associations(writers, notify);
00596     }
00597   }
00598 }

DDS::ReturnCode_t OpenDDS::DCPS::RecorderImpl::repoid_to_bit_key ( const DCPS::RepoId id,
DDS::BuiltinTopicKey_t key 
) [virtual]

Find the bit key for a given repo id.

Implements OpenDDS::DCPS::Recorder.

Definition at line 1074 of file RecorderImpl.cpp.

References OpenDDS::DCPS::BUILT_IN_PUBLICATION_TOPIC, OpenDDS::DCPS::DomainParticipantImpl::id_to_handle(), participant_servant_, DDS::RETCODE_ERROR, and DDS::RETCODE_OK.

01076 {
01077   DDS::InstanceHandle_t publication_handle = this->participant_servant_->id_to_handle(id);
01078 
01079   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01080                    guard,
01081                    this->publication_handle_lock_,
01082                    DDS::RETCODE_ERROR);
01083 
01084   BIT_Helper_1 < DDS::PublicationBuiltinTopicDataDataReader,
01085                  DDS::PublicationBuiltinTopicDataDataReader_var,
01086                  DDS::PublicationBuiltinTopicDataSeq > hh;
01087 
01088   DDS::PublicationBuiltinTopicDataSeq data;
01089 
01090   DDS::ReturnCode_t ret
01091     = hh.instance_handle_to_bit_data(participant_servant_,
01092                                      BUILT_IN_PUBLICATION_TOPIC,
01093                                      publication_handle,
01094                                      data);
01095 
01096   if (ret == DDS::RETCODE_OK) {
01097     key = data[0].key;
01098   }
01099 
01100   return ret;
01101 }

DDS::ReturnCode_t OpenDDS::DCPS::RecorderImpl::set_listener ( const RecorderListener_rch a_listener,
DDS::StatusMask  mask 
) [virtual]

Change the listener for this Recorder.

Implements OpenDDS::DCPS::Recorder.

Definition at line 922 of file RecorderImpl.cpp.

References listener_, listener_mask_, and DDS::RETCODE_OK.

00924 {
00925   listener_mask_ = mask;
00926   //note: OK to duplicate  a nil object ref
00927   listener_ = a_listener;
00928   return DDS::RETCODE_OK;
00929 }

DDS::ReturnCode_t OpenDDS::DCPS::RecorderImpl::set_qos ( const ::DDS::SubscriberQos subscriber_qos,
const DDS::DataReaderQos datareader_qos 
) [virtual]

Set the Quality of Service settings for the Recorder.

Implements OpenDDS::DCPS::Recorder.

void OpenDDS::DCPS::RecorderImpl::signal_liveliness ( const RepoId remote_participant  )  [virtual]

Implements OpenDDS::DCPS::DataReaderCallbacks.

Definition at line 823 of file RecorderImpl.cpp.

References OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::GUID_t::guidPrefix, OpenDDS::DCPS::TransportClient::OPENDDS_VECTOR(), and writers_.

00824 {
00825   RepoId prefix = remote_participant;
00826   prefix.entityId = EntityId_t();
00827 
00828   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_);
00829 
00830   typedef std::pair<RepoId, RcHandle<WriterInfo> > WriterSetElement;
00831   typedef OPENDDS_VECTOR(WriterSetElement) WriterSet;
00832   WriterSet writers;
00833 
00834   {
00835     ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
00836     for (WriterMapType::iterator pos = writers_.lower_bound(prefix),
00837            limit = writers_.end();
00838          pos != limit && GuidPrefixEqual() (pos->first.guidPrefix, prefix.guidPrefix);
00839          ++pos) {
00840       writers.push_back(std::make_pair(pos->first, pos->second));
00841     }
00842   }
00843 
00844   ACE_Time_Value when = ACE_OS::gettimeofday();
00845   for (WriterSet::iterator pos = writers.begin(), limit = writers.end();
00846        pos != limit;
00847        ++pos) {
00848     pos->second->received_activity(when);
00849   }
00850 }

void OpenDDS::DCPS::RecorderImpl::unregister_for_writer ( const RepoId ,
const RepoId ,
const RepoId  
) [virtual]

Reimplemented from OpenDDS::DCPS::DataReaderCallbacks.

Definition at line 1065 of file RecorderImpl.cpp.

References participant(), and OpenDDS::DCPS::TransportClient::unregister_for_writer().

01068 {
01069   TransportClient::unregister_for_writer(participant, readerid, writerid);
01070 }

void OpenDDS::DCPS::RecorderImpl::update_incompatible_qos ( const IncompatibleQosStatus status  )  [virtual]

Implements OpenDDS::DCPS::DataReaderCallbacks.

Definition at line 777 of file RecorderImpl.cpp.

References OpenDDS::DCPS::IncompatibleQosStatus::count_since_last_send, OpenDDS::DCPS::IncompatibleQosStatus::last_policy_id, DDS::RequestedIncompatibleQosStatus::last_policy_id, OpenDDS::DCPS::IncompatibleQosStatus::policies, DDS::RequestedIncompatibleQosStatus::policies, requested_incompatible_qos_status_, DDS::RequestedIncompatibleQosStatus::total_count, OpenDDS::DCPS::IncompatibleQosStatus::total_count, and DDS::RequestedIncompatibleQosStatus::total_count_change.

00778 {
00779 
00780 
00781   ACE_GUARD(ACE_Recursive_Thread_Mutex,
00782             guard,
00783             this->publication_handle_lock_);
00784 
00785   if (this->requested_incompatible_qos_status_.total_count == status.total_count) {
00786     // This test should make the method idempotent.
00787     return;
00788   }
00789 
00790   // set_status_changed_flag(DDS::REQUESTED_INCOMPATIBLE_QOS_STATUS,
00791   //                         true);
00792 
00793   // copy status and increment change
00794   requested_incompatible_qos_status_.total_count = status.total_count;
00795   requested_incompatible_qos_status_.total_count_change +=
00796     status.count_since_last_send;
00797   requested_incompatible_qos_status_.last_policy_id =
00798     status.last_policy_id;
00799   requested_incompatible_qos_status_.policies = status.policies;
00800 
00801   // if (!CORBA::is_nil(listener.in())) {
00802   //   listener->on_requested_incompatible_qos(dr_local_objref_.in(),
00803   //                                           requested_incompatible_qos_status_);
00804   //
00805   //   // TBD - why does the spec say to change total_count_change but not
00806   //   // change the ChangeFlagStatus after a listener call?
00807   //
00808   //   // client just looked at it so next time it looks the
00809   //   // change should be 0
00810   //   requested_incompatible_qos_status_.total_count_change = 0;
00811   // }
00812   //
00813   // notify_status_condition();
00814 }


Friends And Related Function Documentation

friend class ::DDS_TEST [friend]

Reimplemented from OpenDDS::DCPS::TransportClient.

Definition at line 176 of file RecorderImpl.h.

friend class RemoveAssociationSweeper< RecorderImpl > [friend]

Definition at line 174 of file RecorderImpl.h.


Member Data Documentation

DDS::DomainId_t OpenDDS::DCPS::RecorderImpl::domain_id_ [private]

Definition at line 181 of file RecorderImpl.h.

Referenced by init().

RepoIdToHandleMap OpenDDS::DCPS::RecorderImpl::id_to_handle_map_ [private]

Definition at line 187 of file RecorderImpl.h.

Referenced by add_association(), and remove_associations_i().

bool OpenDDS::DCPS::RecorderImpl::is_bit_ [private]

Flag indicates that this datareader is a builtin topic datareader.

Definition at line 194 of file RecorderImpl.h.

Referenced by add_association(), init(), and remove_associations_i().

bool OpenDDS::DCPS::RecorderImpl::is_exclusive_ownership_ [private]

Definition at line 167 of file RecorderImpl.h.

Referenced by init().

RecorderListener_rch OpenDDS::DCPS::RecorderImpl::listener_ [private]

Definition at line 180 of file RecorderImpl.h.

Referenced by add_association(), data_received(), get_listener(), init(), remove_associations_i(), and set_listener().

DDS::StatusMask OpenDDS::DCPS::RecorderImpl::listener_mask_ [private]

Definition at line 179 of file RecorderImpl.h.

Referenced by init(), and set_listener().

OwnershipManager* OpenDDS::DCPS::RecorderImpl::owner_manager_ [private]

Definition at line 169 of file RecorderImpl.h.

Referenced by init().

DomainParticipantImpl* OpenDDS::DCPS::RecorderImpl::participant_servant_ [private]

Definition at line 163 of file RecorderImpl.h.

Referenced by add_association(), cleanup(), get_instance_handle(), init(), and repoid_to_bit_key().

ACE_Recursive_Thread_Mutex OpenDDS::DCPS::RecorderImpl::publication_handle_lock_ [private]

Definition at line 184 of file RecorderImpl.h.

DDS::DataReaderQos OpenDDS::DCPS::RecorderImpl::qos_ [private]

Definition at line 158 of file RecorderImpl.h.

Referenced by init().

RemoveAssociationSweeper<RecorderImpl>* OpenDDS::DCPS::RecorderImpl::remove_association_sweeper_ [private]

Definition at line 182 of file RecorderImpl.h.

Referenced by cleanup(), remove_associations_i(), and ~RecorderImpl().

DDS::RequestedIncompatibleQosStatus OpenDDS::DCPS::RecorderImpl::requested_incompatible_qos_status_ [private]

Definition at line 189 of file RecorderImpl.h.

Referenced by RecorderImpl(), and update_incompatible_qos().

ACE_Recursive_Thread_Mutex OpenDDS::DCPS::RecorderImpl::sample_lock_ [private]

lock protecting sample container as well as statuses.

Definition at line 161 of file RecorderImpl.h.

DDS::SubscriberQos OpenDDS::DCPS::RecorderImpl::subqos_ [private]

Definition at line 172 of file RecorderImpl.h.

Referenced by enable(), get_qos(), and init().

DDS::SubscriptionMatchedStatus OpenDDS::DCPS::RecorderImpl::subscription_match_status_ [private]

Definition at line 190 of file RecorderImpl.h.

Referenced by add_association(), RecorderImpl(), and remove_associations_i().

DDS::TopicDescription_var OpenDDS::DCPS::RecorderImpl::topic_desc_ [private]

Definition at line 178 of file RecorderImpl.h.

Referenced by init().

TopicImpl* OpenDDS::DCPS::RecorderImpl::topic_servant_ [private]

Definition at line 164 of file RecorderImpl.h.

Referenced by cleanup(), enable(), inconsistent_topic(), and init().

WriterMapType OpenDDS::DCPS::RecorderImpl::writers_ [private]

Definition at line 199 of file RecorderImpl.h.

Referenced by add_association(), cleanup(), remove_all_associations(), remove_associations_i(), remove_or_reschedule(), signal_liveliness(), and ~RecorderImpl().

ACE_RW_Thread_Mutex OpenDDS::DCPS::RecorderImpl::writers_lock_ [private]

RW lock for reading/writing publications.

Definition at line 202 of file RecorderImpl.h.


The documentation for this class was generated from the following files:
Generated on Fri Feb 12 20:06:23 2016 for OpenDDS by  doxygen 1.4.7