OpenDDS::DCPS::RecorderImpl Class Reference

Implementation of Recorder functionality. More...

#include <RecorderImpl.h>

Inheritance diagram for OpenDDS::DCPS::RecorderImpl:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::RecorderImpl:
Collaboration graph
[legend]

List of all members.

Public Member Functions

 RecorderImpl ()
virtual ~RecorderImpl ()
DDS::ReturnCode_t cleanup ()
void init (TopicDescriptionImpl *a_topic_desc, const DDS::DataReaderQos &qos, RecorderListener_rch a_listener, const DDS::StatusMask &mask, DomainParticipantImpl *participant, DDS::SubscriberQos subqos)
DDS::ReturnCode_t enable ()
virtual bool check_transport_qos (const TransportInst &inst)
virtual const RepoIdget_repo_id () const
DDS::DomainId_t domain_id () const
virtual CORBA::Long get_priority_value (const AssociationData &data) const
virtual void data_received (const ReceivedDataSample &sample)
virtual void notify_subscription_disconnected (const WriterIdSeq &pubids)
virtual void notify_subscription_reconnected (const WriterIdSeq &pubids)
virtual void notify_subscription_lost (const WriterIdSeq &pubids)
virtual void add_association (const RepoId &yourId, const WriterAssociation &writer, bool active)
virtual void association_complete (const RepoId &remote_id)
virtual void remove_associations (const WriterIdSeq &writers, CORBA::Boolean callback)
virtual void update_incompatible_qos (const IncompatibleQosStatus &status)
virtual void inconsistent_topic ()
virtual void signal_liveliness (const RepoId &remote_participant)
void remove_all_associations ()
virtual DDS::ReturnCode_t repoid_to_bit_key (const DCPS::RepoId &id, DDS::BuiltinTopicKey_t &key)
DDS::ReturnCode_t set_qos (const DDS::SubscriberQos &subscriber_qos, const DDS::DataReaderQos &datareader_qos)
DDS::ReturnCode_t get_qos (DDS::SubscriberQos &subscriber_qos, DDS::DataReaderQos &datareader_qos)
DDS::ReturnCode_t set_listener (const RecorderListener_rch &a_listener, DDS::StatusMask mask)
RecorderListener_rch get_listener ()
DomainParticipantImplparticipant ()
virtual DDS::InstanceHandle_t get_instance_handle ()
virtual void register_for_writer (const RepoId &, const RepoId &, const RepoId &, const TransportLocatorSeq &, DiscoveryListener *)
virtual void unregister_for_writer (const RepoId &, const RepoId &, const RepoId &)

Protected Member Functions

virtual void remove_associations_i (const WriterIdSeq &writers, bool callback)
void remove_publication (const PublicationId &pub_id)

Private Member Functions

void notify_subscription_lost (const DDS::InstanceHandleSeq &handles)
void lookup_instance_handles (const WriterIdSeq &ids, DDS::InstanceHandleSeq &hdls)
 Lookup the instance handles by the publication repo ids.
typedef OPENDDS_MAP_CMP (RepoId, DDS::InstanceHandle_t, GUID_tKeyLessThan) RepoIdToHandleMap
typedef OPENDDS_MAP_CMP (PublicationId, RcHandle< WriterInfo >, GUID_tKeyLessThan) WriterMapType
 publications writing to this reader.

Private Attributes

DDS::DataReaderQos qos_
ACE_Recursive_Thread_Mutex sample_lock_
 lock protecting sample container as well as statuses.
DomainParticipantImplparticipant_servant_
TopicDescriptionPtr< TopicImpltopic_servant_
bool is_exclusive_ownership_
OwnershipManagerowner_manager_
DDS::SubscriberQos subqos_
DDS::TopicDescription_var topic_desc_
DDS::StatusMask listener_mask_
RecorderListener_rch listener_
DDS::DomainId_t domain_id_
RcHandle
< RemoveAssociationSweeper
< RecorderImpl > > 
remove_association_sweeper_
ACE_Recursive_Thread_Mutex publication_handle_lock_
RepoIdToHandleMap id_to_handle_map_
DDS::RequestedIncompatibleQosStatus requested_incompatible_qos_status_
DDS::SubscriptionMatchedStatus subscription_match_status_
bool is_bit_
WriterMapType writers_
ACE_RW_Thread_Mutex writers_lock_
 RW lock for reading/writing publications.

Friends

class RemoveAssociationSweeper< RecorderImpl >
class ::DDS_TEST

Detailed Description

Implementation of Recorder functionality.

This class is the implementation of the Recorder. Inheritance is used to limit the applications access to underlying system methods.

Definition at line 40 of file RecorderImpl.h.


Constructor & Destructor Documentation

OpenDDS::DCPS::RecorderImpl::RecorderImpl (  ) 

Definition at line 49 of file RecorderImpl.cpp.

References DDS::SubscriptionMatchedStatus::current_count, DDS::SubscriptionMatchedStatus::current_count_change, DDS::HANDLE_NIL, DDS::RequestedIncompatibleQosStatus::last_policy_id, DDS::SubscriptionMatchedStatus::last_publication_handle, DDS::RequestedIncompatibleQosStatus::policies, requested_incompatible_qos_status_, subscription_match_status_, DDS::SubscriptionMatchedStatus::total_count, DDS::RequestedIncompatibleQosStatus::total_count, DDS::SubscriptionMatchedStatus::total_count_change, and DDS::RequestedIncompatibleQosStatus::total_count_change.

00050   : qos_(TheServiceParticipant->initial_DataReaderQos()),
00051   participant_servant_(0),
00052   topic_servant_(0),
00053 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00054   is_exclusive_ownership_ (false),
00055 #endif
00056 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00057   owner_manager_ (0),
00058 #endif
00059   subqos_ (TheServiceParticipant->initial_SubscriberQos()),
00060   topic_desc_(0),
00061   listener_mask_(DEFAULT_STATUS_MASK),
00062   domain_id_(0),
00063   remove_association_sweeper_(
00064     make_rch<RemoveAssociationSweeper<RecorderImpl> >(TheServiceParticipant->reactor(),
00065                                          TheServiceParticipant->reactor_owner(),
00066                                          this)),
00067   is_bit_(false)
00068 {
00069 
00070   requested_incompatible_qos_status_.total_count = 0;
00071   requested_incompatible_qos_status_.total_count_change = 0;
00072   requested_incompatible_qos_status_.last_policy_id = 0;
00073   requested_incompatible_qos_status_.policies.length(0);
00074 
00075   subscription_match_status_.total_count = 0;
00076   subscription_match_status_.total_count_change = 0;
00077   subscription_match_status_.current_count = 0;
00078   subscription_match_status_.current_count_change = 0;
00079   subscription_match_status_.last_publication_handle =
00080     DDS::HANDLE_NIL;
00081 
00082 }

OpenDDS::DCPS::RecorderImpl::~RecorderImpl (  )  [virtual]

Definition at line 86 of file RecorderImpl.cpp.

References DBG_ENTRY_LVL, remove_association_sweeper_, writers_, and writers_lock_.

00087 {
00088   DBG_ENTRY_LVL("RecorderImpl","~RecorderImpl",6);
00089   {
00090     ACE_READ_GUARD(ACE_RW_Thread_Mutex,
00091                    read_guard,
00092                    this->writers_lock_);
00093     // Cancel any uncancelled sweeper timers to decrement reference count.
00094     WriterMapType::iterator writer;
00095     for (writer = writers_.begin(); writer != writers_.end(); ++writer) {
00096       remove_association_sweeper_->cancel_timer(writer->second);
00097     }
00098   }
00099 
00100   remove_association_sweeper_->wait();
00101 }


Member Function Documentation

void OpenDDS::DCPS::RecorderImpl::add_association ( const RepoId yourId,
const WriterAssociation writer,
bool  active 
) [virtual]

Implements OpenDDS::DCPS::DataReaderCallbacks.

Definition at line 264 of file RecorderImpl.cpp.

References ACE_TEXT(), OpenDDS::DCPS::TransportClient::associate(), DDS::SubscriptionMatchedStatus::current_count, DDS::SubscriptionMatchedStatus::current_count_change, OpenDDS::DCPS::DCPS_debug_level, domain_id_, DDS::DataWriterQos::durability, OpenDDS::DCPS::DomainParticipantImpl::get_id(), OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::DomainParticipantImpl::id_to_handle(), id_to_handle_map_, OpenDDS::DCPS::RcHandle< T >::in(), is_bit_, DDS::SubscriptionMatchedStatus::last_publication_handle, listener_, LM_DEBUG, LM_ERROR, OPENDDS_STRING, participant_servant_, publication_handle_lock_, OpenDDS::DCPS::AssociationData::publication_transport_priority_, DDS::DataWriterQos::reliability, DDS::RELIABLE_RELIABILITY_QOS, OpenDDS::DCPS::AssociationData::remote_data_, OpenDDS::DCPS::AssociationData::remote_durable_, OpenDDS::DCPS::AssociationData::remote_id_, OpenDDS::DCPS::AssociationData::remote_reliable_, sample_lock_, OpenDDS::DCPS::WriterInfoListener::subscription_id_, subscription_match_status_, TheServiceParticipant, DDS::SubscriptionMatchedStatus::total_count, DDS::SubscriptionMatchedStatus::total_count_change, DDS::DataWriterQos::transport_priority, DDS::VOLATILE_DURABILITY_QOS, OpenDDS::DCPS::WriterAssociation::writerId, OpenDDS::DCPS::WriterAssociation::writerQos, writers_, writers_lock_, and OpenDDS::DCPS::WriterAssociation::writerTransInfo.

00267 {
00268     ACE_DEBUG((LM_DEBUG, "RecorderImpl::add_association\n"));
00269   //
00270   // The following block is for diagnostic purposes only.
00271   //
00272   if (DCPS_debug_level >= 1) {
00273     GuidConverter reader_converter(yourId);
00274     GuidConverter writer_converter(writer.writerId);
00275     ACE_DEBUG((LM_DEBUG,
00276                ACE_TEXT("(%P|%t) RecorderImpl::add_association - ")
00277                ACE_TEXT("bit %d local %C remote %C\n"),
00278                is_bit_,
00279                OPENDDS_STRING(reader_converter).c_str(),
00280                OPENDDS_STRING(writer_converter).c_str()));
00281   }
00282 
00283   //
00284   // This block prevents adding associations to deleted readers.
00285   // Presumably this is a "good thing(tm)".
00286   //
00287   // if (entity_deleted_ == true) {
00288   //   if (DCPS_debug_level >= 1)
00289   //     ACE_DEBUG((LM_DEBUG,
00290   //                ACE_TEXT("(%P|%t) RecorderImpl::add_association")
00291   //                ACE_TEXT(" This is a deleted datareader, ignoring add.\n")));
00292   //
00293   //   return;
00294   // }
00295 
00296   //
00297   // We are being called back from the repository before we are done
00298   // processing after our call to the repository that caused this call
00299   // (from the repository) to be made.
00300   //
00301   if (GUID_UNKNOWN == subscription_id_) {
00302     // add_associations was invoked before DCSPInfoRepo::add_subscription() returned.
00303     subscription_id_ = yourId;
00304   }
00305 
00306   //
00307   // We do the following while holding the publication_handle_lock_.
00308   //
00309   {
00310     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->publication_handle_lock_);
00311 
00312     //
00313     // For each writer in the list of writers to associate with, we
00314     // create a WriterInfo and a WriterStats object and store them in
00315     // our internal maps.
00316     //
00317     {
00318       ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_);
00319 
00320       const PublicationId& writer_id = writer.writerId;
00321       RcHandle<WriterInfo> info ( make_rch<WriterInfo>(static_cast<WriterInfoListener*>(this), writer_id, writer.writerQos));
00322       /*std::pair<WriterMapType::iterator, bool> bpair =*/
00323       this->writers_.insert(
00324         // This insertion is idempotent.
00325         WriterMapType::value_type(
00326           writer_id,
00327           info));
00328       // this->statistics_.insert(
00329       //   StatsMapType::value_type(
00330       //     writer_id,
00331       //     WriterStats(
00332       //       this->raw_latency_buffer_size_,
00333       //       this->raw_latency_buffer_type_)));
00334 
00335       // if (DCPS_debug_level > 4) {
00336       //   GuidConverter converter(writer_id);
00337       //   ACE_DEBUG((LM_DEBUG,
00338       //              "(%P|%t) RecorderImpl::add_association: "
00339       //              "inserted writer %C.return %d \n",
00340       //              OPENDDS_STRING(converter).c_str(), bpair.second));
00341       //
00342       //   WriterMapType::iterator iter = writers_.find(writer_id);
00343       //   if (iter != writers_.end()) {
00344       //     // This may not be an error since it could happen that the sample
00345       //     // is delivered to the datareader after the write is dis-associated
00346       //     // with this datareader.
00347       //     GuidConverter reader_converter(subscription_id_);
00348       //     GuidConverter writer_converter(writer_id);
00349       //     ACE_DEBUG((LM_DEBUG,
00350       //               ACE_TEXT("(%P|%t) RecorderImpl::add_association: ")
00351       //               ACE_TEXT("reader %C is associated with writer %C.\n"),
00352       //               OPENDDS_STRING(reader_converter).c_str(),
00353       //               OPENDDS_STRING(writer_converter).c_str()));
00354       //   }
00355       // }
00356     }
00357 
00358     //
00359     // Propagate the add_associations processing down into the Transport
00360     // layer here.  This will establish the transport support and reserve
00361     // usage of an existing connection or initiate creation of a new
00362     // connection if no suitable connection is available.
00363     //
00364     AssociationData data;
00365     data.remote_id_ = writer.writerId;
00366     data.remote_data_ = writer.writerTransInfo;
00367     data.publication_transport_priority_ =
00368       writer.writerQos.transport_priority.value;
00369     data.remote_reliable_ =
00370       (writer.writerQos.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS);
00371     data.remote_durable_ =
00372       (writer.writerQos.durability.kind > DDS::VOLATILE_DURABILITY_QOS);
00373 
00374     if (!this->associate(data, active)) {
00375       if (DCPS_debug_level) {
00376         ACE_ERROR((LM_ERROR,
00377                    ACE_TEXT("(%P|%t) RecorderImpl::add_association: ")
00378                    ACE_TEXT("ERROR: transport layer failed to associate.\n")));
00379       }
00380       return;
00381     }
00382 
00383     // Check if any publications have already sent a REQUEST_ACK message.
00384     // {
00385     //   ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
00386     //
00387     //   WriterMapType::iterator where = this->writers_.find(writer.writerId);
00388     //
00389     //   if (where != this->writers_.end()) {
00390     //     const ACE_Time_Value now = ACE_OS::gettimeofday();
00391     //
00392     //     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_);
00393     //
00394     //     if (where->second->should_ack(now)) {
00395     //       const SequenceNumber sequence = where->second->ack_sequence();
00396     //       const DDS::Time_t timenow = time_value_to_time(now);
00397     //       if (this->send_sample_ack(writer.writerId, sequence, timenow)) {
00398     //         where->second->clear_acks(sequence);
00399     //       }
00400     //     }
00401     //   }
00402     // }
00403 
00404     //
00405     // LIVELINESS policy timers are managed here.
00406     //
00407     // if (liveliness_lease_duration_ != ACE_Time_Value::zero) {
00408     //   // this call will start the timer if it is not already set
00409     //   const ACE_Time_Value now = ACE_OS::gettimeofday();
00410     //
00411     //   if (DCPS_debug_level >= 5) {
00412     //     GuidConverter converter(subscription_id_);
00413     //     ACE_DEBUG((LM_DEBUG,
00414     //                ACE_TEXT("(%P|%t) RecorderImpl::add_association: ")
00415     //                ACE_TEXT("starting/resetting liveliness timer for reader %C\n"),
00416     //                OPENDDS_STRING(converter).c_str()));
00417     //   }
00418     //
00419     //   this->handle_timeout(now, this);
00420     // }
00421 
00422     // else - no timer needed when LIVELINESS.lease_duration is INFINITE
00423 
00424   }
00425   //
00426   // We no longer hold the publication_handle_lock_.
00427   //
00428 
00429   //
00430   // We only do the following processing for readers that are *not*
00431   // readers of Builtin Topics.
00432   //
00433   if (!is_bit_) {
00434 
00435     DDS::InstanceHandle_t handle =
00436       this->participant_servant_->id_to_handle(writer.writerId);
00437 
00438     //
00439     // We acquire the publication_handle_lock_ for the remainder of our
00440     // processing.
00441     //
00442     {
00443       ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->publication_handle_lock_);
00444 
00445       // This insertion is idempotent.
00446       this->id_to_handle_map_.insert(
00447         RepoIdToHandleMap::value_type(writer.writerId, handle));
00448 
00449       if (DCPS_debug_level > 4) {
00450         GuidConverter converter(writer.writerId);
00451         ACE_DEBUG((LM_DEBUG,
00452                    ACE_TEXT("(%P|%t) RecorderImpl::add_association: ")
00453                    ACE_TEXT("id_to_handle_map_[ %C] = 0x%x.\n"),
00454                    OPENDDS_STRING(converter).c_str(),
00455                    handle));
00456       }
00457 
00458       // We need to adjust these after the insertions have all completed
00459       // since insertions are not guaranteed to increase the number of
00460       // currently matched publications.
00461       int matchedPublications = static_cast<int>(this->id_to_handle_map_.size());
00462       this->subscription_match_status_.current_count_change
00463         = matchedPublications - this->subscription_match_status_.current_count;
00464       this->subscription_match_status_.current_count = matchedPublications;
00465 
00466       ++this->subscription_match_status_.total_count;
00467       ++this->subscription_match_status_.total_count_change;
00468 
00469       this->subscription_match_status_.last_publication_handle = handle;
00470 
00471       // set_status_changed_flag(DDS::SUBSCRIPTION_MATCHED_STATUS, true);
00472 
00473 
00474       if (listener_.in()) {
00475         listener_->on_recorder_matched(
00476           this,
00477           this->subscription_match_status_);
00478 
00479         // TBD - why does the spec say to change this but not change
00480         //       the ChangeFlagStatus after a listener call?
00481 
00482         // Client will look at it so next time it looks the change should be 0
00483         this->subscription_match_status_.total_count_change = 0;
00484         this->subscription_match_status_.current_count_change = 0;
00485       }
00486 
00487       // notify_status_condition();
00488     }
00489 
00490     {
00491       ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_);
00492 
00493       this->writers_[writer.writerId]->handle_ = handle;
00494     }
00495   }
00496 
00497   if (!active) {
00498     Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
00499     disco->association_complete(this->domain_id_,
00500                                 this->participant_servant_->get_id(),
00501                                 this->subscription_id_, writer.writerId);
00502   }
00503 
00504   // if (this->monitor_) {
00505   //   this->monitor_->report();
00506   // }
00507 }

Here is the call graph for this function:

void OpenDDS::DCPS::RecorderImpl::association_complete ( const RepoId remote_id  )  [virtual]

Implements OpenDDS::DCPS::DataReaderCallbacks.

Definition at line 510 of file RecorderImpl.cpp.

00511 {
00512   // For the current DCPSInfoRepo implementation, the DataReader side will
00513   // always be passive, so association_complete() will not be called.
00514 }

bool OpenDDS::DCPS::RecorderImpl::check_transport_qos ( const TransportInst inst  )  [virtual]

Implements OpenDDS::DCPS::TransportClient.

Definition at line 190 of file RecorderImpl.cpp.

References OpenDDS::DCPS::TransportInst::is_reliable(), qos_, DDS::DataReaderQos::reliability, and DDS::RELIABLE_RELIABILITY_QOS.

00191 {
00192   if (this->qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS) {
00193     return ti.is_reliable();
00194   }
00195   return true;
00196 }

Here is the call graph for this function:

DDS::ReturnCode_t OpenDDS::DCPS::RecorderImpl::cleanup ( void   ) 

cleanup the DataWriter.

Definition at line 105 of file RecorderImpl.cpp.

References ACE_TEXT(), domain_id_, OpenDDS::DCPS::DomainParticipantImpl::get_id(), LM_ERROR, participant_servant_, remove_all_associations(), remove_association_sweeper_, DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::WriterInfoListener::subscription_id_, TheServiceParticipant, writers_, and writers_lock_.

Referenced by OpenDDS::DCPS::Service_Participant::delete_recorder(), and OpenDDS::DCPS::DomainParticipantImpl::handle_exception().

00106 {
00107 
00108   Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
00109   if (!disco->remove_subscription(this->domain_id_,
00110                                   participant_servant_->get_id(),
00111                                   this->subscription_id_)) {
00112     ACE_ERROR_RETURN((LM_ERROR,
00113                       ACE_TEXT("(%P|%t) ERROR: ")
00114                       ACE_TEXT("RecorderImpl::cleanup: ")
00115                       ACE_TEXT(" could not remove subscription from discovery.\n")),
00116                      DDS::RETCODE_ERROR);
00117   }
00118 
00119   // Call remove association before unregistering the datareader from the transport,
00120   // otherwise some callbacks resulted from remove_association may lost.
00121 
00122   this->remove_all_associations();
00123 
00124   {
00125     ACE_READ_GUARD_RETURN(ACE_RW_Thread_Mutex,
00126                    read_guard,
00127                    this->writers_lock_,
00128                    0);
00129     // Cancel any uncancelled sweeper timers
00130     WriterMapType::iterator writer;
00131     for (writer = writers_.begin(); writer != writers_.end(); ++writer) {
00132       remove_association_sweeper_->cancel_timer(writer->second);
00133     }
00134   }
00135 
00136   remove_association_sweeper_->wait();
00137   return DDS::RETCODE_OK;
00138 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::RecorderImpl::data_received ( const ReceivedDataSample sample  )  [virtual]

Implements OpenDDS::DCPS::TransportReceiveListener.

Definition at line 209 of file RecorderImpl.cpp.

References ACE_TEXT(), OpenDDS::DCPS::DataSampleHeader::byte_order_, DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), OpenDDS::DCPS::ReceivedDataSample::header_, OpenDDS::DCPS::RcHandle< T >::in(), listener_, LM_DEBUG, OpenDDS::DCPS::DataSampleHeader::message_id_, OPENDDS_STRING, OpenDDS::DCPS::DataSampleHeader::publication_id_, OpenDDS::DCPS::ReceivedDataSample::sample_, OpenDDS::DCPS::SAMPLE_DATA, sample_lock_, OpenDDS::DCPS::DataSampleHeader::source_timestamp_nanosec_, OpenDDS::DCPS::DataSampleHeader::source_timestamp_sec_, OpenDDS::DCPS::WriterInfoListener::subscription_id_, and OpenDDS::DCPS::to_string().

00210 {
00211   DBG_ENTRY_LVL("RecorderImpl","data_received",6);
00212 
00213   // ensure some other thread is not changing the sample container
00214   // or statuses related to samples.
00215   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_);
00216 
00217   if (DCPS_debug_level > 9) {
00218     GuidConverter converter(subscription_id_);
00219     ACE_DEBUG((LM_DEBUG,
00220                ACE_TEXT("(%P|%t) RecorderImpl::data_received: ")
00221                ACE_TEXT("%C received sample: %C.\n"),
00222                OPENDDS_STRING(converter).c_str(),
00223                to_string(sample.header_).c_str()));
00224   }
00225 
00226   // we only support SAMPLE_DATA messages
00227   if (sample.header_.message_id_ != SAMPLE_DATA)
00228     return;
00229 
00230   RawDataSample rawSample(static_cast<MessageId> (sample.header_.message_id_),
00231                           sample.header_.source_timestamp_sec_,
00232                           sample.header_.source_timestamp_nanosec_,
00233                           sample.header_.publication_id_,
00234                           sample.header_.byte_order_,
00235                           sample.sample_.get());
00236 
00237   if (listener_.in()) {
00238     listener_->on_sample_data_received(this, rawSample);
00239   }
00240 
00241 }

Here is the call graph for this function:

DDS::DomainId_t OpenDDS::DCPS::RecorderImpl::domain_id (  )  const [inline, virtual]

Implements OpenDDS::DCPS::TransportClient.

Definition at line 71 of file RecorderImpl.h.

00071 { return this->domain_id_; }

DDS::ReturnCode_t OpenDDS::DCPS::RecorderImpl::enable (  ) 

Implements DDS::Entity.

Definition at line 941 of file RecorderImpl.cpp.

References ACE_TEXT(), OpenDDS::DCPS::TransportClient::connection_info(), OpenDDS::DCPS::DCPS_debug_level, domain_id_, OpenDDS::DCPS::TransportClient::enable_transport(), OpenDDS::DCPS::DomainParticipantImpl::get_id(), OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::EntityImpl::is_enabled(), LM_DEBUG, LM_ERROR, participant_servant_, qos_, DDS::DataReaderQos::reliability, DDS::RELIABLE_RELIABILITY_QOS, DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::EntityImpl::set_enabled(), subqos_, OpenDDS::DCPS::WriterInfoListener::subscription_id_, TheServiceParticipant, topic_servant_, and DDS::VOLATILE_DURABILITY_QOS.

Referenced by OpenDDS::DCPS::DomainParticipantImpl::create_recorder().

00942 {
00943   if (DCPS_debug_level >= 1) {
00944 
00945     ACE_DEBUG((LM_DEBUG,
00946                ACE_TEXT("(%P|%t) RecorderImpl::enable\n")));
00947   }
00948   //According spec:
00949   // - Calling enable on an already enabled Entity returns OK and has no
00950   // effect.
00951   // - Calling enable on an Entity whose factory is not enabled will fail
00952   // and return PRECONDITION_NOT_MET.
00953 
00954   if (this->is_enabled()) {
00955     return DDS::RETCODE_OK;
00956   }
00957 
00958   this->set_enabled();
00959 
00960   // if (topic_servant_ && !transport_disabled_) {
00961   if (topic_servant_) {
00962 
00963     ACE_DEBUG((LM_DEBUG,
00964                ACE_TEXT("(%P|%t) RecorderImpl::enable_transport\n")));
00965 
00966     try {
00967       this->enable_transport(this->qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS,
00968                              this->qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS);
00969     } catch (const Transport::Exception&) {
00970       ACE_ERROR((LM_ERROR,
00971                  ACE_TEXT("(%P|%t) ERROR: RecorderImpl::enable, ")
00972                  ACE_TEXT("Transport Exception.\n")));
00973       return DDS::RETCODE_ERROR;
00974 
00975     }
00976 
00977     const TransportLocatorSeq& trans_conf_info = this->connection_info();
00978 
00979     CORBA::String_var filterClassName = "";
00980     CORBA::String_var filterExpression = "";
00981     DDS::StringSeq exprParams;
00982 
00983 
00984     Discovery_rch disco =
00985       TheServiceParticipant->get_discovery(this->domain_id_);
00986 
00987     ACE_DEBUG((LM_DEBUG,
00988                ACE_TEXT("(%P|%t) RecorderImpl::add_subscription\n")));
00989 
00990     this->subscription_id_ =
00991       disco->add_subscription(this->domain_id_,
00992                               this->participant_servant_->get_id(),
00993                               this->topic_servant_->get_id(),
00994                               this,
00995                               this->qos_,
00996                               trans_conf_info,
00997                               this->subqos_,
00998                               filterClassName,
00999                               filterExpression,
01000                               exprParams);
01001 
01002     if (this->subscription_id_ == OpenDDS::DCPS::GUID_UNKNOWN) {
01003       ACE_ERROR((LM_ERROR,
01004                  ACE_TEXT("(%P|%t) ERROR: RecorderImpl::enable, ")
01005                  ACE_TEXT("add_subscription returned invalid id.\n")));
01006       return DDS::RETCODE_ERROR;
01007     }
01008   }
01009 
01010   if (topic_servant_) {
01011     const CORBA::String_var name = topic_servant_->get_name();
01012     DDS::ReturnCode_t return_value = DDS::RETCODE_OK;
01013     //   this->participant_servant_->recorder_enabled(name.in(), this);
01014 
01015     return return_value;
01016   } else {
01017     return DDS::RETCODE_OK;
01018   }
01019 }

Here is the call graph for this function:

Here is the caller graph for this function:

DDS::InstanceHandle_t OpenDDS::DCPS::RecorderImpl::get_instance_handle (  )  [virtual]

Implements OpenDDS::DCPS::EntityImpl.

Definition at line 1022 of file RecorderImpl.cpp.

References OpenDDS::DCPS::DomainParticipantImpl::id_to_handle(), participant_servant_, and OpenDDS::DCPS::WriterInfoListener::subscription_id_.

01023 {
01024   return this->participant_servant_->id_to_handle(subscription_id_);
01025 }

Here is the call graph for this function:

RecorderListener_rch OpenDDS::DCPS::RecorderImpl::get_listener (  )  [virtual]

Get the listener for this Recorder.

Implements OpenDDS::DCPS::Recorder.

Definition at line 907 of file RecorderImpl.cpp.

References listener_.

00908 {
00909   return listener_;
00910 }

CORBA::Long OpenDDS::DCPS::RecorderImpl::get_priority_value ( const AssociationData data  )  const [virtual]

Implements OpenDDS::DCPS::TransportClient.

Definition at line 203 of file RecorderImpl.cpp.

References OpenDDS::DCPS::AssociationData::publication_transport_priority_.

00204 {
00205   return data.publication_transport_priority_;
00206 }

DDS::ReturnCode_t OpenDDS::DCPS::RecorderImpl::get_qos ( DDS::SubscriberQos subscriber_qos,
DDS::DataReaderQos datareader_qos 
) [virtual]

Get the Quality of Service settings for the Recorder.

Implements OpenDDS::DCPS::Recorder.

Definition at line 887 of file RecorderImpl.cpp.

References qos_, DDS::RETCODE_OK, and subqos_.

00890 {
00891   qos = qos_;
00892   subscriber_qos = subqos_;
00893   return DDS::RETCODE_OK;
00894 }

const RepoId & OpenDDS::DCPS::RecorderImpl::get_repo_id (  )  const [virtual]

Implements OpenDDS::DCPS::TransportClient.

Definition at line 198 of file RecorderImpl.cpp.

References OpenDDS::DCPS::WriterInfoListener::subscription_id_.

00199 {
00200   return this->subscription_id_;
00201 }

void OpenDDS::DCPS::RecorderImpl::inconsistent_topic (  )  [virtual]

Implements OpenDDS::DCPS::DataReaderCallbacks.

Definition at line 792 of file RecorderImpl.cpp.

References topic_servant_.

00793 {
00794   topic_servant_->inconsistent_topic();
00795 }

void OpenDDS::DCPS::RecorderImpl::init ( TopicDescriptionImpl a_topic_desc,
const DDS::DataReaderQos qos,
RecorderListener_rch  a_listener,
const DDS::StatusMask mask,
DomainParticipantImpl participant,
DDS::SubscriberQos  subqos 
)

Definition at line 140 of file RecorderImpl.cpp.

References OpenDDS::DCPS::Recorder::_duplicate(), ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, domain_id_, DDS::EXCLUSIVE_OWNERSHIP_QOS, OpenDDS::DCPS::DomainParticipantImpl::get_domain_id(), OpenDDS::DCPS::TopicDescriptionImpl::get_name(), OpenDDS::DCPS::TopicDescriptionImpl::get_type_name(), is_bit_, is_exclusive_ownership_, listener_, listener_mask_, LM_DEBUG, owner_manager_, OpenDDS::DCPS::DomainParticipantImpl::ownership_manager(), participant_servant_, qos_, subqos_, topic_desc_, topic_servant_, and OpenDDS::DCPS::topicIsBIT().

Referenced by OpenDDS::DCPS::DomainParticipantImpl::create_recorder().

00147 {
00148   //
00149   if (DCPS_debug_level >= 1) {
00150 
00151     ACE_DEBUG((LM_DEBUG,
00152                ACE_TEXT("(%P|%t) RecorderImpl::init \n")));
00153   }
00154 
00155 
00156   topic_desc_ = DDS::TopicDescription::_duplicate(a_topic_desc);
00157   if (TopicImpl* a_topic = dynamic_cast<TopicImpl*>(a_topic_desc)) {
00158     topic_servant_ = a_topic;
00159   }
00160 
00161   CORBA::String_var topic_name = a_topic_desc->get_name();
00162 
00163 #if !defined (DDS_HAS_MINIMUM_BIT)
00164   is_bit_ = topicIsBIT(topic_name.in(), a_topic_desc->get_type_name());
00165 #endif   // !defined (DDS_HAS_MINIMUM_BIT)
00166 
00167   qos_ = qos;
00168 
00169 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00170   is_exclusive_ownership_ = this->qos_.ownership.kind == ::DDS::EXCLUSIVE_OWNERSHIP_QOS;
00171 #endif
00172 
00173   listener_ = a_listener;
00174   listener_mask_ = mask;
00175 
00176   // Only store the participant pointer, since it is our "grand"
00177   // parent, we will exist as long as it does
00178   participant_servant_ = participant;
00179 
00180 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00181   if (is_exclusive_ownership_) {
00182     owner_manager_ = participant_servant_->ownership_manager ();
00183   }
00184 #endif
00185 
00186   domain_id_ = participant_servant_->get_domain_id();
00187   subqos_ = subqos;
00188 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::RecorderImpl::lookup_instance_handles ( const WriterIdSeq ids,
DDS::InstanceHandleSeq hdls 
) [private]

Lookup the instance handles by the publication repo ids.

Definition at line 913 of file RecorderImpl.cpp.

References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::DomainParticipantImpl::id_to_handle(), LM_DEBUG, OPENDDS_STRING, and participant_servant_.

Referenced by remove_associations_i().

00915 {
00916   CORBA::ULong const num_wrts = ids.length();
00917 
00918   if (DCPS_debug_level > 9) {
00919     OPENDDS_STRING separator = "";
00920     OPENDDS_STRING buffer;
00921 
00922     for (CORBA::ULong i = 0; i < num_wrts; ++i) {
00923       buffer += separator + OPENDDS_STRING(GuidConverter(ids[i]));
00924       separator = ", ";
00925     }
00926 
00927     ACE_DEBUG((LM_DEBUG,
00928                ACE_TEXT("(%P|%t) RecorderImpl::lookup_instance_handles: ")
00929                ACE_TEXT("searching for handles for writer Ids: %C.\n"),
00930                buffer.c_str()));
00931   }
00932 
00933   hdls.length(num_wrts);
00934 
00935   for (CORBA::ULong i = 0; i < num_wrts; ++i) {
00936     hdls[i] = this->participant_servant_->id_to_handle(ids[i]);
00937   }
00938 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::RecorderImpl::notify_subscription_disconnected ( const WriterIdSeq pubids  )  [virtual]

Implements OpenDDS::DCPS::TransportReceiveListener.

Definition at line 243 of file RecorderImpl.cpp.

00244 {
00245 }

void OpenDDS::DCPS::RecorderImpl::notify_subscription_lost ( const DDS::InstanceHandleSeq handles  )  [private]

Definition at line 253 of file RecorderImpl.cpp.

00254 {
00255 }

void OpenDDS::DCPS::RecorderImpl::notify_subscription_lost ( const WriterIdSeq pubids  )  [virtual]

Implements OpenDDS::DCPS::TransportReceiveListener.

Definition at line 257 of file RecorderImpl.cpp.

Referenced by remove_associations_i().

00258 {
00259 
00260 }

Here is the caller graph for this function:

void OpenDDS::DCPS::RecorderImpl::notify_subscription_reconnected ( const WriterIdSeq pubids  )  [virtual]

Implements OpenDDS::DCPS::TransportReceiveListener.

Definition at line 247 of file RecorderImpl.cpp.

00248 {
00249 
00250 }

typedef OpenDDS::DCPS::RecorderImpl::OPENDDS_MAP_CMP ( PublicationId  ,
RcHandle< WriterInfo ,
GUID_tKeyLessThan   
) [private]

publications writing to this reader.

typedef OpenDDS::DCPS::RecorderImpl::OPENDDS_MAP_CMP ( RepoId  ,
DDS::InstanceHandle_t  ,
GUID_tKeyLessThan   
) [private]
DomainParticipantImpl* OpenDDS::DCPS::RecorderImpl::participant (  )  [inline]

Definition at line 122 of file RecorderImpl.h.

Referenced by OpenDDS::DCPS::Service_Participant::delete_recorder().

00122                                                 {
00123     return participant_servant_;
00124   }

Here is the caller graph for this function:

void OpenDDS::DCPS::RecorderImpl::register_for_writer ( const RepoId participant,
const RepoId readerid,
const RepoId writerid,
const TransportLocatorSeq locators,
DiscoveryListener listener 
) [virtual]

Reimplemented from OpenDDS::DCPS::DataReaderCallbacks.

Definition at line 1028 of file RecorderImpl.cpp.

01033 {
01034   TransportClient::register_for_writer(participant, readerid, writerid, locators, listener);
01035 }

void OpenDDS::DCPS::RecorderImpl::remove_all_associations (  ) 

Definition at line 714 of file RecorderImpl.cpp.

References DBG_ENTRY_LVL, publication_handle_lock_, remove_associations(), size, writers_, and writers_lock_.

Referenced by cleanup().

00715 {
00716   DBG_ENTRY_LVL("RecorderImpl","remove_all_associations",6);
00717 
00718   OpenDDS::DCPS::WriterIdSeq writers;
00719   int size;
00720 
00721   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->publication_handle_lock_);
00722 
00723   {
00724     ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
00725 
00726     size = static_cast<int>(writers_.size());
00727     writers.length(size);
00728 
00729     WriterMapType::iterator curr_writer = writers_.begin();
00730     WriterMapType::iterator end_writer = writers_.end();
00731 
00732     int i = 0;
00733 
00734     while (curr_writer != end_writer) {
00735       writers[i++] = curr_writer->first;
00736       ++curr_writer;
00737     }
00738   }
00739 
00740   try {
00741     CORBA::Boolean dont_notify_lost = 0;
00742 
00743     if (0 < size) {
00744       remove_associations(writers, dont_notify_lost);
00745     }
00746 
00747   } catch (const CORBA::Exception&) {
00748   }
00749 }

Here is the call graph for this function:

Here is the caller graph for this function:

virtual void OpenDDS::DCPS::RecorderImpl::remove_associations ( const WriterIdSeq writers,
CORBA::Boolean  callback 
) [virtual]

Implements OpenDDS::DCPS::DataReaderCallbacks.

Referenced by remove_all_associations().

Here is the caller graph for this function:

void OpenDDS::DCPS::RecorderImpl::remove_associations_i ( const WriterIdSeq writers,
bool  callback 
) [protected, virtual]

Section 7.1.4.1: total_count will not decrement.

: Reconcile this with the verbiage in section 7.1.4.1

Definition at line 581 of file RecorderImpl.cpp.

References ACE_TEXT(), DDS::SubscriptionMatchedStatus::current_count, DDS::SubscriptionMatchedStatus::current_count_change, DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::TransportClient::disassociate(), id_to_handle_map_, OpenDDS::DCPS::RcHandle< T >::in(), is_bit_, DDS::SubscriptionMatchedStatus::last_publication_handle, listener_, LM_DEBUG, lookup_instance_handles(), notify_subscription_lost(), OPENDDS_STRING, publication_handle_lock_, OpenDDS::DCPS::push_back(), remove_association_sweeper_, OpenDDS::DCPS::WriterInfoListener::subscription_id_, subscription_match_status_, DDS::SubscriptionMatchedStatus::total_count_change, writers_, and writers_lock_.

Referenced by remove_publication().

00583 {
00584   DBG_ENTRY_LVL("RecorderImpl", "remove_associations_i", 6);
00585 
00586   if (writers.length() == 0) {
00587     return;
00588   }
00589 
00590   if (DCPS_debug_level >= 1) {
00591     GuidConverter reader_converter(subscription_id_);
00592     GuidConverter writer_converter(writers[0]);
00593     ACE_DEBUG((LM_DEBUG,
00594                ACE_TEXT("(%P|%t) RecorderImpl::remove_associations_i: ")
00595                ACE_TEXT("bit %d local %C remote %C num remotes %d \n"),
00596                is_bit_,
00597                OPENDDS_STRING(reader_converter).c_str(),
00598                OPENDDS_STRING(writer_converter).c_str(),
00599                writers.length()));
00600   }
00601   DDS::InstanceHandleSeq handles;
00602 
00603   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->publication_handle_lock_);
00604 
00605   // This is used to hold the list of writers which were actually
00606   // removed, which is a proper subset of the writers which were
00607   // requested to be removed.
00608   WriterIdSeq updated_writers;
00609 
00610   CORBA::ULong wr_len;
00611 
00612   //Remove the writers from writer list. If the supplied writer
00613   //is not in the cached writers list then it is already removed.
00614   //We just need remove the writers in the list that have not been
00615   //removed.
00616   {
00617     ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_);
00618 
00619     wr_len = writers.length();
00620 
00621     for (CORBA::ULong i = 0; i < wr_len; i++) {
00622       PublicationId writer_id = writers[i];
00623 
00624       WriterMapType::iterator it = this->writers_.find(writer_id);
00625 
00626       if (it != this->writers_.end()) {
00627         it->second->removed();
00628         remove_association_sweeper_->cancel_timer(it->second);
00629       }
00630 
00631       if (this->writers_.erase(writer_id) == 0) {
00632         if (DCPS_debug_level >= 1) {
00633           GuidConverter converter(writer_id);
00634           ACE_DEBUG((LM_DEBUG,
00635                      ACE_TEXT("(%P|%t) RecorderImpl::remove_associations_i: ")
00636                      ACE_TEXT("the writer local %C was already removed.\n"),
00637                      OPENDDS_STRING(converter).c_str()));
00638         }
00639 
00640       } else {
00641         push_back(updated_writers, writer_id);
00642       }
00643     }
00644   }
00645 
00646   wr_len = updated_writers.length();
00647 
00648   // Return now if the supplied writers have been removed already.
00649   if (wr_len == 0) {
00650     return;
00651   }
00652 
00653   if (!is_bit_) {
00654     // The writer should be in the id_to_handle map at this time.  Note
00655     // it if it not there.
00656     this->lookup_instance_handles(updated_writers, handles);
00657 
00658     for (CORBA::ULong i = 0; i < wr_len; ++i) {
00659       id_to_handle_map_.erase(updated_writers[i]);
00660     }
00661   }
00662   for (CORBA::ULong i = 0; i < updated_writers.length(); ++i) {
00663     this->disassociate(updated_writers[i]);
00664   }
00665 
00666   // Mirror the add_associations SUBSCRIPTION_MATCHED_STATUS processing.
00667   if (!this->is_bit_) {
00668     // Derive the change in the number of publications writing to this reader.
00669     int matchedPublications = static_cast<int>(this->id_to_handle_map_.size());
00670     this->subscription_match_status_.current_count_change
00671       = matchedPublications - this->subscription_match_status_.current_count;
00672 
00673     // Only process status if the number of publications has changed.
00674     if (this->subscription_match_status_.current_count_change != 0) {
00675       this->subscription_match_status_.current_count = matchedPublications;
00676       /// Section 7.1.4.1: total_count will not decrement.
00677 
00678       /// @TODO: Reconcile this with the verbiage in section 7.1.4.1
00679       this->subscription_match_status_.last_publication_handle
00680         = handles[ wr_len - 1];
00681 
00682       // set_status_changed_flag(DDS::SUBSCRIPTION_MATCHED_STATUS, true);
00683 
00684       // DDS::DataReaderListener_var listener
00685       // = listener_for(DDS::SUBSCRIPTION_MATCHED_STATUS);
00686 
00687       if (listener_.in()) {
00688         listener_->on_recorder_matched(
00689           this,
00690           this->subscription_match_status_);
00691 
00692         // Client will look at it so next time it looks the change should be 0
00693         this->subscription_match_status_.total_count_change = 0;
00694         this->subscription_match_status_.current_count_change = 0;
00695       }
00696 
00697       // notify_status_condition();
00698     }
00699   }
00700 
00701   // If this remove_association is invoked when the InfoRepo
00702   // detects a lost writer then make a callback to notify
00703   // subscription lost.
00704   if (notify_lost) {
00705     this->notify_subscription_lost(handles);
00706   }
00707 
00708   // if (this->monitor_) {
00709   //   this->monitor_->report();
00710   // }
00711 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::RecorderImpl::remove_publication ( const PublicationId pub_id  )  [protected]

Definition at line 566 of file RecorderImpl.cpp.

References OpenDDS::DCPS::WriterInfo::notify_lost_, OpenDDS::DCPS::push_back(), remove_associations_i(), writers_, and writers_lock_.

00567 {
00568   ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_);
00569   WriterMapType::iterator where = writers_.find(pub_id);
00570   if (writers_.end() != where) {
00571     WriterInfo& info = *where->second;
00572     WriterIdSeq writers;
00573     push_back(writers, pub_id);
00574     bool notify = info.notify_lost_;
00575     write_guard.release();
00576     remove_associations_i(writers, notify);
00577   }
00578 }

Here is the call graph for this function:

DDS::ReturnCode_t OpenDDS::DCPS::RecorderImpl::repoid_to_bit_key ( const DCPS::RepoId id,
DDS::BuiltinTopicKey_t key 
) [virtual]

Find the bit key for a given repo id.

Implements OpenDDS::DCPS::Recorder.

Definition at line 1047 of file RecorderImpl.cpp.

References OpenDDS::DCPS::BUILT_IN_PUBLICATION_TOPIC, OpenDDS::DCPS::DomainParticipantImpl::id_to_handle(), participant_servant_, publication_handle_lock_, DDS::RETCODE_ERROR, and DDS::RETCODE_OK.

01049 {
01050   DDS::InstanceHandle_t publication_handle = this->participant_servant_->id_to_handle(id);
01051 
01052   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01053                    guard,
01054                    this->publication_handle_lock_,
01055                    DDS::RETCODE_ERROR);
01056 
01057   DDS::PublicationBuiltinTopicDataSeq data;
01058 
01059   DDS::ReturnCode_t ret = instance_handle_to_bit_data<DDS::PublicationBuiltinTopicDataDataReader_var>(
01060                             participant_servant_,
01061                             BUILT_IN_PUBLICATION_TOPIC,
01062                             publication_handle,
01063                             data);
01064 
01065   if (ret == DDS::RETCODE_OK) {
01066     key = data[0].key;
01067   }
01068 
01069   return ret;
01070 }

Here is the call graph for this function:

DDS::ReturnCode_t OpenDDS::DCPS::RecorderImpl::set_listener ( const RecorderListener_rch a_listener,
DDS::StatusMask  mask 
) [virtual]

Change the listener for this Recorder.

Implements OpenDDS::DCPS::Recorder.

Definition at line 897 of file RecorderImpl.cpp.

References listener_, listener_mask_, and DDS::RETCODE_OK.

00899 {
00900   listener_mask_ = mask;
00901   //note: OK to duplicate  a nil object ref
00902   listener_ = a_listener;
00903   return DDS::RETCODE_OK;
00904 }

DDS::ReturnCode_t OpenDDS::DCPS::RecorderImpl::set_qos ( const DDS::SubscriberQos subscriber_qos,
const DDS::DataReaderQos datareader_qos 
) [virtual]

Set the Quality of Service settings for the Recorder.

Implements OpenDDS::DCPS::Recorder.

Definition at line 827 of file RecorderImpl.cpp.

References ACE_TEXT(), OpenDDS::DCPS::Qos_Helper::changeable(), OpenDDS::DCPS::Qos_Helper::consistent(), domain_id_, OpenDDS::DCPS::EntityImpl::enabled_, OpenDDS::DCPS::DomainParticipantImpl::get_domain_id(), OpenDDS::DCPS::DomainParticipantImpl::get_id(), OpenDDS::DCPS::EntityImpl::is_enabled(), LM_ERROR, OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK, OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK, OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK, OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK, participant_servant_, qos_, DDS::RETCODE_ERROR, DDS::RETCODE_IMMUTABLE_POLICY, DDS::RETCODE_INCONSISTENT_POLICY, DDS::RETCODE_OK, DDS::RETCODE_UNSUPPORTED, status, subqos_, OpenDDS::DCPS::WriterInfoListener::subscription_id_, TheServiceParticipant, and OpenDDS::DCPS::Qos_Helper::valid().

00830 {
00831 
00832   OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK(subscriber_qos, DDS::RETCODE_UNSUPPORTED);
00833 
00834   if (Qos_Helper::valid(subscriber_qos) && Qos_Helper::consistent(subscriber_qos)) {
00835     if (subqos_ != subscriber_qos) {
00836       // for the not changeable qos, it can be changed before enable
00837       if (!Qos_Helper::changeable(subqos_, subscriber_qos) && enabled_ == true) {
00838         return DDS::RETCODE_IMMUTABLE_POLICY;
00839 
00840       } else {
00841         subqos_ = subscriber_qos;
00842       }
00843     }
00844   } else {
00845     return DDS::RETCODE_INCONSISTENT_POLICY;
00846   }
00847 
00848   OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00849   OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00850   OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00851 
00852   if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
00853     if (qos_ == qos)
00854       return DDS::RETCODE_OK;
00855 
00856     if (!Qos_Helper::changeable(qos_, qos) && this->is_enabled()) {
00857       return DDS::RETCODE_IMMUTABLE_POLICY;
00858 
00859     } else {
00860       Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
00861       const bool status =
00862         disco->update_subscription_qos(
00863           this->participant_servant_->get_domain_id(),
00864           this->participant_servant_->get_id(),
00865           this->subscription_id_,
00866           qos,
00867           subscriber_qos);
00868       if (!status) {
00869         ACE_ERROR_RETURN((LM_ERROR,
00870                           ACE_TEXT("(%P|%t) RecorderImpl::set_qos, ")
00871                           ACE_TEXT("qos not updated. \n")),
00872                          DDS::RETCODE_ERROR);
00873       }
00874     }
00875 
00876     qos_ = qos;
00877     subqos_ = subscriber_qos;
00878 
00879     return DDS::RETCODE_OK;
00880 
00881   } else {
00882     return DDS::RETCODE_INCONSISTENT_POLICY;
00883   }
00884 }

Here is the call graph for this function:

void OpenDDS::DCPS::RecorderImpl::signal_liveliness ( const RepoId remote_participant  )  [virtual]

Implements OpenDDS::DCPS::DataReaderCallbacks.

Definition at line 798 of file RecorderImpl.cpp.

References OpenDDS::DCPS::GUID_t::entityId, ACE_OS::gettimeofday(), OpenDDS::DCPS::GUID_t::guidPrefix, OpenDDS::DCPS::TransportClient::OPENDDS_VECTOR(), sample_lock_, writers_, and writers_lock_.

00799 {
00800   RepoId prefix = remote_participant;
00801   prefix.entityId = EntityId_t();
00802 
00803   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_);
00804 
00805   typedef std::pair<RepoId, RcHandle<WriterInfo> > WriterSetElement;
00806   typedef OPENDDS_VECTOR(WriterSetElement) WriterSet;
00807   WriterSet writers;
00808 
00809   {
00810     ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
00811     for (WriterMapType::iterator pos = writers_.lower_bound(prefix),
00812            limit = writers_.end();
00813          pos != limit && GuidPrefixEqual() (pos->first.guidPrefix, prefix.guidPrefix);
00814          ++pos) {
00815       writers.push_back(std::make_pair(pos->first, pos->second));
00816     }
00817   }
00818 
00819   ACE_Time_Value when = ACE_OS::gettimeofday();
00820   for (WriterSet::iterator pos = writers.begin(), limit = writers.end();
00821        pos != limit;
00822        ++pos) {
00823     pos->second->received_activity(when);
00824   }
00825 }

Here is the call graph for this function:

void OpenDDS::DCPS::RecorderImpl::unregister_for_writer ( const RepoId participant,
const RepoId readerid,
const RepoId writerid 
) [virtual]

Reimplemented from OpenDDS::DCPS::DataReaderCallbacks.

Definition at line 1038 of file RecorderImpl.cpp.

01041 {
01042   TransportClient::unregister_for_writer(participant, readerid, writerid);
01043 }

void OpenDDS::DCPS::RecorderImpl::update_incompatible_qos ( const IncompatibleQosStatus status  )  [virtual]

Implements OpenDDS::DCPS::DataReaderCallbacks.

Definition at line 752 of file RecorderImpl.cpp.

References OpenDDS::DCPS::IncompatibleQosStatus::count_since_last_send, OpenDDS::DCPS::IncompatibleQosStatus::last_policy_id, DDS::RequestedIncompatibleQosStatus::last_policy_id, OpenDDS::DCPS::IncompatibleQosStatus::policies, DDS::RequestedIncompatibleQosStatus::policies, publication_handle_lock_, requested_incompatible_qos_status_, OpenDDS::DCPS::IncompatibleQosStatus::total_count, DDS::RequestedIncompatibleQosStatus::total_count, and DDS::RequestedIncompatibleQosStatus::total_count_change.

00753 {
00754 
00755 
00756   ACE_GUARD(ACE_Recursive_Thread_Mutex,
00757             guard,
00758             this->publication_handle_lock_);
00759 
00760   if (this->requested_incompatible_qos_status_.total_count == status.total_count) {
00761     // This test should make the method idempotent.
00762     return;
00763   }
00764 
00765   // set_status_changed_flag(DDS::REQUESTED_INCOMPATIBLE_QOS_STATUS,
00766   //                         true);
00767 
00768   // copy status and increment change
00769   requested_incompatible_qos_status_.total_count = status.total_count;
00770   requested_incompatible_qos_status_.total_count_change +=
00771     status.count_since_last_send;
00772   requested_incompatible_qos_status_.last_policy_id =
00773     status.last_policy_id;
00774   requested_incompatible_qos_status_.policies = status.policies;
00775 
00776   // if (!CORBA::is_nil(listener.in())) {
00777   //   listener->on_requested_incompatible_qos(this,
00778   //                                           requested_incompatible_qos_status_);
00779   //
00780   //   // TBD - why does the spec say to change total_count_change but not
00781   //   // change the ChangeFlagStatus after a listener call?
00782   //
00783   //   // client just looked at it so next time it looks the
00784   //   // change should be 0
00785   //   requested_incompatible_qos_status_.total_count_change = 0;
00786   // }
00787   //
00788   // notify_status_condition();
00789 }


Friends And Related Function Documentation

friend class ::DDS_TEST [friend]

Reimplemented from OpenDDS::DCPS::TransportClient.

Definition at line 168 of file RecorderImpl.h.

friend class RemoveAssociationSweeper< RecorderImpl > [friend]

Definition at line 166 of file RecorderImpl.h.


Member Data Documentation

Definition at line 173 of file RecorderImpl.h.

Referenced by add_association(), cleanup(), enable(), init(), and set_qos().

Definition at line 179 of file RecorderImpl.h.

Referenced by add_association(), and remove_associations_i().

Flag indicates that this datareader is a builtin topic datareader.

Definition at line 186 of file RecorderImpl.h.

Referenced by add_association(), init(), and remove_associations_i().

Definition at line 159 of file RecorderImpl.h.

Referenced by init().

Definition at line 171 of file RecorderImpl.h.

Referenced by init(), and set_listener().

Definition at line 161 of file RecorderImpl.h.

Referenced by init().

Definition at line 150 of file RecorderImpl.h.

Referenced by check_transport_qos(), enable(), get_qos(), init(), and set_qos().

Definition at line 174 of file RecorderImpl.h.

Referenced by cleanup(), remove_associations_i(), and ~RecorderImpl().

Definition at line 181 of file RecorderImpl.h.

Referenced by RecorderImpl(), and update_incompatible_qos().

lock protecting sample container as well as statuses.

Definition at line 153 of file RecorderImpl.h.

Referenced by add_association(), data_received(), and signal_liveliness().

Definition at line 164 of file RecorderImpl.h.

Referenced by enable(), get_qos(), init(), and set_qos().

Definition at line 182 of file RecorderImpl.h.

Referenced by add_association(), RecorderImpl(), and remove_associations_i().

DDS::TopicDescription_var OpenDDS::DCPS::RecorderImpl::topic_desc_ [private]

Definition at line 170 of file RecorderImpl.h.

Referenced by init().

Definition at line 156 of file RecorderImpl.h.

Referenced by enable(), inconsistent_topic(), and init().

WriterMapType OpenDDS::DCPS::RecorderImpl::writers_ [private]

RW lock for reading/writing publications.

Definition at line 194 of file RecorderImpl.h.

Referenced by add_association(), cleanup(), remove_all_associations(), remove_associations_i(), remove_publication(), signal_liveliness(), and ~RecorderImpl().


The documentation for this class was generated from the following files:
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1