OpenDDS::DCPS::SubscriberImpl Class Reference

#include <SubscriberImpl.h>

Inheritance diagram for OpenDDS::DCPS::SubscriberImpl:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::SubscriberImpl:
Collaboration graph
[legend]

List of all members.

Public Member Functions

 SubscriberImpl (DDS::InstanceHandle_t handle, const DDS::SubscriberQos &qos, DDS::SubscriberListener_ptr a_listener, const DDS::StatusMask &mask, DomainParticipantImpl *participant)
virtual ~SubscriberImpl ()
virtual DDS::InstanceHandle_t get_instance_handle ()
bool contains_reader (DDS::InstanceHandle_t a_handle)
virtual DDS::DataReader_ptr create_datareader (DDS::TopicDescription_ptr a_topic_desc, const DDS::DataReaderQos &qos, DDS::DataReaderListener_ptr a_listener, DDS::StatusMask mask)
virtual DDS::ReturnCode_t delete_datareader (DDS::DataReader_ptr a_datareader)
virtual DDS::ReturnCode_t delete_contained_entities ()
virtual DDS::DataReader_ptr lookup_datareader (const char *topic_name)
virtual DDS::ReturnCode_t get_datareaders (DDS::DataReaderSeq &readers, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
virtual DDS::ReturnCode_t notify_datareaders ()
virtual DDS::ReturnCode_t set_qos (const DDS::SubscriberQos &qos)
virtual DDS::ReturnCode_t get_qos (DDS::SubscriberQos &qos)
virtual DDS::ReturnCode_t set_listener (DDS::SubscriberListener_ptr a_listener, DDS::StatusMask mask)
virtual DDS::SubscriberListener_ptr get_listener ()
virtual DDS::ReturnCode_t begin_access ()
virtual DDS::ReturnCode_t end_access ()
virtual DDS::DomainParticipant_ptr get_participant ()
virtual DDS::ReturnCode_t set_default_datareader_qos (const DDS::DataReaderQos &qos)
virtual DDS::ReturnCode_t get_default_datareader_qos (DDS::DataReaderQos &qos)
virtual DDS::ReturnCode_t copy_from_topic_qos (DDS::DataReaderQos &a_datareader_qos, const DDS::TopicQos &a_topic_qos)
virtual DDS::ReturnCode_t enable ()
bool is_clean () const
void data_received (DataReaderImpl *reader)
DDS::ReturnCode_t reader_enabled (const char *topic_name, DataReaderImpl *reader)
DDS::ReturnCode_t multitopic_reader_enabled (DDS::DataReader_ptr reader)
void remove_from_datareader_set (DataReaderImpl *reader)
DDS::SubscriberListener_ptr listener_for (DDS::StatusKind kind)
typedef OPENDDS_VECTOR (RepoId) SubscriptionIdVec
void get_subscription_ids (SubscriptionIdVec &subs)
void update_ownership_strength (const PublicationId &pub_id, const CORBA::Long &ownership_strength)
void coherent_change_received (RepoId &publisher_id, DataReaderImpl *reader, Coherent_State &group_state)
virtual RcHandle< EntityImplparent () const
Raw Latency Statistics Configuration Interfaces



unsigned int & raw_latency_buffer_size ()
 Configure the size of the raw data collection buffer.
DataCollector< double >::OnFull & raw_latency_buffer_type ()
 Configure the type of the raw data collection buffer.

Static Public Member Functions

static bool validate_datareader_qos (const DDS::DataReaderQos &qos, const DDS::DataReaderQos &default_qos, DDS::Topic_ptr a_topic, DDS::DataReaderQos &result_qos, bool mt)

Private Member Functions

typedef OPENDDS_MULTIMAP (OPENDDS_STRING, DataReaderImpl_rch) DataReaderMap
typedef OPENDDS_SET (DataReaderImpl_rch) DataReaderSet
typedef OPENDDS_MAP_CMP (RepoId, DDS::DataReaderQos, GUID_tKeyLessThan) DrIdToQosMap
 DataReader id to qos map.
 OPENDDS_MAP (OPENDDS_STRING, DDS::DataReader_var) multitopic_reader_map_

Private Attributes

DDS::InstanceHandle_t handle_
DDS::SubscriberQos qos_
DDS::DataReaderQos default_datareader_qos_
DDS::StatusMask listener_mask_
DDS::SubscriberListener_var listener_
DataReaderSet readers_not_enabled_
DataReaderMap datareader_map_
DataReaderSet datareader_set_
WeakRcHandle
< DomainParticipantImpl
participant_
DDS::DomainId_t domain_id_
RepoId dp_id_
unsigned int raw_latency_buffer_size_
 Bound (or initial reservation) of raw latency buffers.
DataCollector< double >::OnFull raw_latency_buffer_type_
 Type of raw latency data buffers.
ACE_Recursive_Thread_Mutex si_lock_
 this lock protects the data structures in this class.
Monitormonitor_
 Monitor object for this entity.
int access_depth_

Detailed Description

Definition at line 36 of file SubscriberImpl.h.


Constructor & Destructor Documentation

OpenDDS::DCPS::SubscriberImpl::SubscriberImpl ( DDS::InstanceHandle_t  handle,
const DDS::SubscriberQos qos,
DDS::SubscriberListener_ptr  a_listener,
const DDS::StatusMask mask,
DomainParticipantImpl participant 
)

Definition at line 43 of file SubscriberImpl.cpp.

References CORBA::LocalObject::_duplicate(), listener_, monitor_, and TheServiceParticipant.

00048   : handle_(handle),
00049   qos_(qos),
00050   default_datareader_qos_(TheServiceParticipant->initial_DataReaderQos()),
00051   listener_mask_(mask),
00052   participant_(*participant),
00053   domain_id_(participant->get_domain_id()),
00054   raw_latency_buffer_size_(0),
00055   raw_latency_buffer_type_(DataCollector<double>::KeepOldest),
00056   monitor_(0),
00057   access_depth_ (0)
00058 {
00059   //Note: OK to duplicate a nil.
00060   listener_ = DDS::SubscriberListener::_duplicate(a_listener);
00061 
00062   monitor_ = TheServiceParticipant->monitor_factory_->create_subscriber_monitor(this);
00063 }

Here is the call graph for this function:

OpenDDS::DCPS::SubscriberImpl::~SubscriberImpl (  )  [virtual]

Definition at line 65 of file SubscriberImpl.cpp.

References ACE_TEXT(), datareader_map_, is_clean(), and LM_ERROR.

00066 {
00067   // The datareaders should be deleted already before calling delete
00068   // subscriber.
00069   if (!is_clean()) {
00070     ACE_ERROR((LM_ERROR,
00071                ACE_TEXT("(%P|%t) ERROR: ")
00072                ACE_TEXT("SubscriberImpl::~SubscriberImpl, ")
00073                ACE_TEXT("%B datareaders still exist.\n"),
00074                datareader_map_.size ()));
00075   }
00076 }

Here is the call graph for this function:


Member Function Documentation

DDS::ReturnCode_t OpenDDS::DCPS::SubscriberImpl::begin_access (  )  [virtual]

Implements DDS::Subscriber.

Definition at line 676 of file SubscriberImpl.cpp.

References access_depth_, ACE_TEXT(), datareader_set_, OpenDDS::DCPS::EntityImpl::enabled_, DDS::GROUP_PRESENTATION_QOS, LM_ERROR, DDS::SubscriberQos::presentation, qos_, DDS::RETCODE_ERROR, DDS::RETCODE_NOT_ENABLED, DDS::RETCODE_OK, and si_lock_.

00677 {
00678   if (enabled_ == false) {
00679     ACE_ERROR_RETURN((LM_ERROR,
00680                       ACE_TEXT("(%P|%t) ERROR: SubscriberImpl::begin_access:")
00681                       ACE_TEXT(" Subscriber is not enabled!\n")),
00682                      DDS::RETCODE_NOT_ENABLED);
00683   }
00684 
00685   if (qos_.presentation.access_scope != DDS::GROUP_PRESENTATION_QOS) {
00686     return DDS::RETCODE_OK;
00687   }
00688 
00689   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00690                    guard,
00691                    this->si_lock_,
00692                    DDS::RETCODE_ERROR);
00693 
00694   ++this->access_depth_;
00695 
00696   // We should only notify subscription on the first
00697   // and last change to the current change set:
00698   if (this->access_depth_ == 1) {
00699     for (DataReaderSet::iterator it = this->datareader_set_.begin();
00700          it != this->datareader_set_.end(); ++it) {
00701       (*it)->begin_access();
00702     }
00703   }
00704 
00705   return DDS::RETCODE_OK;
00706 }

Here is the call graph for this function:

void OpenDDS::DCPS::SubscriberImpl::coherent_change_received ( RepoId publisher_id,
DataReaderImpl reader,
Coherent_State group_state 
)

Definition at line 961 of file SubscriberImpl.cpp.

References OpenDDS::DCPS::COMPLETED, datareader_set_, OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::NOT_COMPLETED_YET, OpenDDS::DCPS::REJECTED, si_lock_, and state.

00964 {
00965   ACE_GUARD(ACE_Recursive_Thread_Mutex,
00966             guard,
00967             this->si_lock_);
00968 
00969   // Verify if all readers complete the coherent changes. The result
00970   // is either COMPLETED or REJECTED.
00971   group_state = COMPLETED;
00972   DataReaderSet::const_iterator endIter = datareader_set_.end();
00973   for (DataReaderSet::const_iterator iter = datareader_set_.begin();
00974        iter != endIter; ++iter) {
00975 
00976     Coherent_State state = COMPLETED;
00977     (*iter)->coherent_change_received (publisher_id, state);
00978     if (state == NOT_COMPLETED_YET) {
00979       group_state = NOT_COMPLETED_YET;
00980       return;
00981     }
00982     else if (state == REJECTED) {
00983       group_state = REJECTED;
00984     }
00985   }
00986 
00987   PublicationId writerId = GUID_UNKNOWN;
00988   for (DataReaderSet::const_iterator iter = datareader_set_.begin();
00989        iter != endIter; ++iter) {
00990     if (group_state == COMPLETED) {
00991       (*iter)->accept_coherent (writerId, publisher_id);
00992     }
00993     else {   //REJECTED
00994       (*iter)->reject_coherent (writerId, publisher_id);
00995     }
00996   }
00997 
00998   if (group_state == COMPLETED) {
00999     for (DataReaderSet::const_iterator iter = datareader_set_.begin();
01000          iter != endIter; ++iter) {
01001       (*iter)->coherent_changes_completed (reader);
01002       (*iter)->reset_coherent_info (writerId, publisher_id);
01003     }
01004   }
01005 }

bool OpenDDS::DCPS::SubscriberImpl::contains_reader ( DDS::InstanceHandle_t  a_handle  ) 

Definition at line 85 of file SubscriberImpl.cpp.

References datareader_set_, and si_lock_.

00086 {
00087   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00088                    guard,
00089                    this->si_lock_,
00090                    false);
00091 
00092   for (DataReaderSet::iterator it(datareader_set_.begin());
00093        it != datareader_set_.end(); ++it) {
00094     if (a_handle == (*it)->get_instance_handle())
00095       return true;
00096   }
00097 
00098   return false;
00099 }

DDS::ReturnCode_t OpenDDS::DCPS::SubscriberImpl::copy_from_topic_qos ( DDS::DataReaderQos a_datareader_qos,
const DDS::TopicQos a_topic_qos 
) [virtual]

Definition at line 778 of file SubscriberImpl.cpp.

References OpenDDS::DCPS::Qos_Helper::copy_from_topic_qos(), DDS::RETCODE_INCONSISTENT_POLICY, and DDS::RETCODE_OK.

Referenced by validate_datareader_qos().

00781 {
00782   if (Qos_Helper::copy_from_topic_qos(a_datareader_qos, a_topic_qos) ) {
00783     return DDS::RETCODE_OK;
00784 
00785   } else {
00786     return DDS::RETCODE_INCONSISTENT_POLICY;
00787   }
00788 }

Here is the call graph for this function:

Here is the caller graph for this function:

DDS::DataReader_ptr OpenDDS::DCPS::SubscriberImpl::create_datareader ( DDS::TopicDescription_ptr  a_topic_desc,
const DDS::DataReaderQos qos,
DDS::DataReaderListener_ptr  a_listener,
DDS::StatusMask  mask 
) [virtual]

Definition at line 102 of file SubscriberImpl.cpp.

References CORBA::LocalObject::_duplicate(), CORBA::LocalObject::_nil(), ACE_TEXT(), default_datareader_qos_, OpenDDS::DCPS::DataReaderImpl::enable(), OpenDDS::DCPS::DataReaderImpl::enable_filtering(), OpenDDS::DCPS::EntityImpl::enabled_, DDS::SubscriberQos::entity_factory, OpenDDS::DCPS::ContentFilteredTopicImpl::get_related_topic(), OpenDDS::DCPS::TopicDescriptionImpl::get_type_support(), OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::DataReaderImpl::init(), OpenDDS::DCPS::MultiTopicDataReaderBase::init(), CORBA::is_nil(), LM_ERROR, LM_WARNING, OpenDDS::DCPS::WeakRcHandle< T >::lock(), multitopic_reader_enabled(), participant_, qos_, OpenDDS::DCPS::DataReaderImpl::raw_latency_buffer_size(), raw_latency_buffer_size_, OpenDDS::DCPS::DataReaderImpl::raw_latency_buffer_type(), raw_latency_buffer_type_, OpenDDS::DCPS::rchandle_from(), readers_not_enabled_, DDS::RETCODE_OK, si_lock_, validate_datareader_qos(), and ACE_Atomic_Op< ACE_LOCK, TYPE >::value().

Referenced by OpenDDS::DCPS::MultiTopicDataReaderBase::init().

00107 {
00108   if (CORBA::is_nil(a_topic_desc)) {
00109     ACE_ERROR((LM_ERROR,
00110                ACE_TEXT("(%P|%t) ERROR: ")
00111                ACE_TEXT("SubscriberImpl::create_datareader, ")
00112                ACE_TEXT("topic desc is nil.\n")));
00113     return DDS::DataReader::_nil();
00114   }
00115 
00116   DDS::DataReaderQos dr_qos;
00117   RcHandle<DomainParticipantImpl> participant = this->participant_.lock();
00118   if (!participant)
00119     return DDS::DataReader::_nil();
00120 
00121   TopicImpl* topic_servant = dynamic_cast<TopicImpl*>(a_topic_desc);
00122 
00123 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00124   ContentFilteredTopicImpl* cft = 0;
00125 #endif
00126 #ifndef OPENDDS_NO_MULTI_TOPIC
00127   MultiTopicImpl* mt = 0;
00128 #else
00129   bool mt = false;
00130 #endif
00131 
00132   if (!topic_servant) {
00133 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00134     cft = dynamic_cast<ContentFilteredTopicImpl*>(a_topic_desc);
00135     if (cft) {
00136       DDS::Topic_var related;
00137       related = cft->get_related_topic();
00138       topic_servant = dynamic_cast<TopicImpl*>(related.in());
00139     }
00140     else
00141 #endif
00142     {
00143 #ifndef OPENDDS_NO_MULTI_TOPIC
00144       mt = dynamic_cast<MultiTopicImpl*>(a_topic_desc);
00145 #endif
00146     }
00147   }
00148 
00149   if (!validate_datareader_qos (qos, default_datareader_qos_, topic_servant, dr_qos, mt))
00150     return DDS::DataReader::_nil();
00151 
00152 #ifndef OPENDDS_NO_MULTI_TOPIC
00153   if (mt) {
00154     try {
00155       DDS::DataReader_var dr =
00156         mt->get_type_support()->create_multitopic_datareader();
00157       MultiTopicDataReaderBase* mtdr =
00158         dynamic_cast<MultiTopicDataReaderBase*>(dr.in());
00159       mtdr->init(dr_qos, a_listener, mask, this, mt);
00160       if (enabled_.value() && qos_.entity_factory.autoenable_created_entities) {
00161         if (dr->enable() != DDS::RETCODE_OK) {
00162           ACE_ERROR((LM_ERROR,
00163                      ACE_TEXT("(%P|%t) ERROR: ")
00164                      ACE_TEXT("SubscriberImpl::create_datareader, ")
00165                      ACE_TEXT("enable of MultiTopicDataReader failed.\n")));
00166           return DDS::DataReader::_nil();
00167         }
00168         multitopic_reader_enabled(dr);
00169       }
00170       return dr._retn();
00171     } catch (const std::exception& e) {
00172       ACE_ERROR((LM_ERROR,
00173                  ACE_TEXT("(%P|%t) ERROR: ")
00174                  ACE_TEXT("SubscriberImpl::create_datareader, ")
00175                  ACE_TEXT("creation of MultiTopicDataReader failed: %C.\n"),
00176                  e.what()));
00177     }
00178     return DDS::DataReader::_nil();
00179   }
00180 #endif
00181 
00182   OpenDDS::DCPS::TypeSupport_ptr typesupport =
00183     topic_servant->get_type_support();
00184 
00185   if (0 == typesupport) {
00186     CORBA::String_var name = a_topic_desc->get_name();
00187     ACE_ERROR((LM_ERROR,
00188                ACE_TEXT("(%P|%t) ERROR: ")
00189                ACE_TEXT("SubscriberImpl::create_datareader, ")
00190                ACE_TEXT("typesupport(topic_name=%C) is nil.\n"),
00191                name.in()));
00192     return DDS::DataReader::_nil();
00193   }
00194 
00195   DDS::DataReader_var dr_obj = typesupport->create_datareader();
00196 
00197   DataReaderImpl* dr_servant =
00198     dynamic_cast<DataReaderImpl*>(dr_obj.in());
00199 
00200   if (dr_servant == 0) {
00201     ACE_ERROR((LM_ERROR,
00202         ACE_TEXT("(%P|%t) ERROR: ")
00203         ACE_TEXT("SubscriberImpl::create_datareader, ")
00204         ACE_TEXT("servant is nil.\n")));
00205     return DDS::DataReader::_nil();
00206   }
00207 
00208 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00209   if (cft) {
00210     dr_servant->enable_filtering(cft);
00211   }
00212 #endif
00213 
00214   // Propagate the latency buffer data collection configuration.
00215   // @TODO: Determine whether we want to exclude the Builtin Topic
00216   //        readers from data gathering.
00217   dr_servant->raw_latency_buffer_size() = this->raw_latency_buffer_size_;
00218   dr_servant->raw_latency_buffer_type() = this->raw_latency_buffer_type_;
00219 
00220 
00221   dr_servant->init(topic_servant,
00222                    dr_qos,
00223                    a_listener,
00224                    mask,
00225                    participant.in(),
00226                    this);
00227 
00228   if ((this->enabled_ == true) && (qos_.entity_factory.autoenable_created_entities)) {
00229     const DDS::ReturnCode_t ret = dr_servant->enable();
00230 
00231     if (ret != DDS::RETCODE_OK) {
00232       ACE_ERROR((LM_WARNING,
00233                  ACE_TEXT("(%P|%t) WARNING: ")
00234                  ACE_TEXT("SubscriberImpl::create_datareader, ")
00235                  ACE_TEXT("enable failed.\n")));
00236       return DDS::DataReader::_nil();
00237     }
00238   } else {
00239     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, si_lock_, 0);
00240     readers_not_enabled_.insert(rchandle_from(dr_servant));
00241   }
00242 
00243   // add created data reader to this' data reader container -
00244   // done in enable_reader
00245   return DDS::DataReader::_duplicate(dr_obj.in());
00246 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::SubscriberImpl::data_received ( DataReaderImpl reader  ) 

Definition at line 842 of file SubscriberImpl.cpp.

References datareader_set_, OpenDDS::DCPS::rchandle_from(), and si_lock_.

00843 {
00844   ACE_GUARD(ACE_Recursive_Thread_Mutex,
00845             guard,
00846             this->si_lock_);
00847   datareader_set_.insert(rchandle_from(reader));
00848 }

Here is the call graph for this function:

DDS::ReturnCode_t OpenDDS::DCPS::SubscriberImpl::delete_contained_entities (  )  [virtual]

Implements DDS::Subscriber.

Definition at line 370 of file SubscriberImpl.cpp.

References ACE_TEXT(), ACE_Vector< T, DEFAULT_SIZE >::clear(), datareader_map_, delete_datareader(), LM_ERROR, OPENDDS_MAP(), OPENDDS_STRING, ACE_Vector< T, DEFAULT_SIZE >::push_back(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::EntityImpl::set_deleted(), si_lock_, and ACE_Vector< T, DEFAULT_SIZE >::size().

Referenced by OpenDDS::DCPS::DomainParticipantImpl::delete_subscriber().

00371 {
00372   // mark that the entity is being deleted
00373   set_deleted(true);
00374 
00375   ACE_Vector<DDS::DataReader_ptr> drs;
00376 
00377 #ifndef OPENDDS_NO_MULTI_TOPIC
00378   {
00379     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00380                      guard,
00381                      this->si_lock_,
00382                      DDS::RETCODE_ERROR);
00383     for (OPENDDS_MAP(OPENDDS_STRING, DDS::DataReader_var)::iterator mt_iter =
00384            multitopic_reader_map_.begin();
00385          mt_iter != multitopic_reader_map_.end(); ++mt_iter) {
00386       drs.push_back(mt_iter->second);
00387     }
00388   }
00389 
00390   for (size_t i = 0; i < drs.size(); ++i) {
00391     const DDS::ReturnCode_t ret = delete_datareader(drs[i]);
00392     if (ret != DDS::RETCODE_OK) {
00393       ACE_ERROR_RETURN((LM_ERROR,
00394                         ACE_TEXT("(%P|%t) ERROR: ")
00395                         ACE_TEXT("SubscriberImpl::delete_contained_entities, ")
00396                         ACE_TEXT("failed to delete datareader\n")),
00397                        ret);
00398     }
00399   }
00400   drs.clear();
00401 #endif
00402 
00403   {
00404     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00405                      guard,
00406                      this->si_lock_,
00407                      DDS::RETCODE_ERROR);
00408     DataReaderMap::iterator it;
00409     DataReaderMap::iterator itEnd = datareader_map_.end();
00410 
00411     for (it = datareader_map_.begin(); it != itEnd; ++it) {
00412       drs.push_back(it->second.in());
00413     }
00414   }
00415 
00416   for (size_t i = 0; i < drs.size(); ++i) {
00417     DDS::ReturnCode_t ret = delete_datareader(drs[i]);
00418 
00419     if (ret != DDS::RETCODE_OK) {
00420       ACE_ERROR_RETURN((LM_ERROR,
00421                         ACE_TEXT("(%P|%t) ERROR: ")
00422                         ACE_TEXT("SubscriberImpl::delete_contained_entities, ")
00423                         ACE_TEXT("failed to delete datareader\n")),
00424                        ret);
00425     }
00426   }
00427 
00428   // the subscriber can now start creating new publications
00429   set_deleted(false);
00430 
00431   return DDS::RETCODE_OK;
00432 }

Here is the call graph for this function:

Here is the caller graph for this function:

virtual DDS::ReturnCode_t OpenDDS::DCPS::SubscriberImpl::delete_datareader ( DDS::DataReader_ptr  a_datareader  )  [virtual]

Referenced by delete_contained_entities().

Here is the caller graph for this function:

DDS::ReturnCode_t OpenDDS::DCPS::SubscriberImpl::enable (  )  [virtual]

Implements DDS::Entity.

Definition at line 791 of file SubscriberImpl.cpp.

References dp_id_, DDS::SubscriberQos::entity_factory, OpenDDS::DCPS::EntityImpl::is_enabled(), OpenDDS::DCPS::WeakRcHandle< T >::lock(), monitor_, participant_, qos_, readers_not_enabled_, OpenDDS::DCPS::Monitor::report(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, DDS::RETCODE_PRECONDITION_NOT_MET, OpenDDS::DCPS::EntityImpl::set_enabled(), and si_lock_.

Referenced by OpenDDS::DCPS::DomainParticipantImpl::create_subscriber().

00792 {
00793   //According spec:
00794   // - Calling enable on an already enabled Entity returns OK and has no
00795   // effect.
00796   // - Calling enable on an Entity whose factory is not enabled will fail
00797   // and return PRECONDITION_NOT_MET.
00798 
00799   if (this->is_enabled()) {
00800     return DDS::RETCODE_OK;
00801   }
00802 
00803   RcHandle<DomainParticipantImpl> participant = this->participant_.lock();
00804   if (!participant || participant->is_enabled() == false) {
00805     return DDS::RETCODE_PRECONDITION_NOT_MET;
00806   }
00807 
00808   dp_id_ = participant->get_id();
00809 
00810   if (this->monitor_) {
00811     this->monitor_->report();
00812   }
00813 
00814   this->set_enabled();
00815 
00816   if (qos_.entity_factory.autoenable_created_entities) {
00817     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, si_lock_, DDS::RETCODE_ERROR);
00818     DataReaderSet readers;
00819     readers_not_enabled_.swap(readers);
00820     for (DataReaderSet::iterator it = readers.begin(); it != readers.end(); ++it) {
00821       (*it)->enable();
00822     }
00823   }
00824 
00825   return DDS::RETCODE_OK;
00826 }

Here is the call graph for this function:

Here is the caller graph for this function:

DDS::ReturnCode_t OpenDDS::DCPS::SubscriberImpl::end_access (  )  [virtual]

Implements DDS::Subscriber.

Definition at line 709 of file SubscriberImpl.cpp.

References access_depth_, ACE_TEXT(), datareader_set_, OpenDDS::DCPS::EntityImpl::enabled_, DDS::GROUP_PRESENTATION_QOS, LM_ERROR, DDS::SubscriberQos::presentation, qos_, DDS::RETCODE_ERROR, DDS::RETCODE_NOT_ENABLED, DDS::RETCODE_OK, DDS::RETCODE_PRECONDITION_NOT_MET, and si_lock_.

00710 {
00711   if (enabled_ == false) {
00712     ACE_ERROR_RETURN((LM_ERROR,
00713                       ACE_TEXT("(%P|%t) ERROR: SubscriberImpl::end_access:")
00714                       ACE_TEXT(" Publisher is not enabled!\n")),
00715                      DDS::RETCODE_NOT_ENABLED);
00716   }
00717 
00718   if (qos_.presentation.access_scope != DDS::GROUP_PRESENTATION_QOS) {
00719     return DDS::RETCODE_OK;
00720   }
00721 
00722   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00723                    guard,
00724                    this->si_lock_,
00725                    DDS::RETCODE_ERROR);
00726 
00727   if (this->access_depth_ == 0) {
00728     ACE_ERROR_RETURN((LM_ERROR,
00729                       ACE_TEXT("(%P|%t) ERROR: SubscriberImpl::end_access:")
00730                       ACE_TEXT(" No matching call to begin_coherent_changes!\n")),
00731                      DDS::RETCODE_PRECONDITION_NOT_MET);
00732   }
00733 
00734   --this->access_depth_;
00735 
00736   // We should only notify subscription on the first
00737   // and last change to the current change set:
00738   if (this->access_depth_ == 0) {
00739     for (DataReaderSet::iterator it = this->datareader_set_.begin();
00740          it != this->datareader_set_.end(); ++it) {
00741       (*it)->end_access();
00742     }
00743   }
00744 
00745   return DDS::RETCODE_OK;
00746 }

Here is the call graph for this function:

DDS::ReturnCode_t OpenDDS::DCPS::SubscriberImpl::get_datareaders ( DDS::DataReaderSeq readers,
DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states 
) [virtual]

Definition at line 472 of file SubscriberImpl.cpp.

References access_depth_, datareader_set_, OpenDDS::DCPS::GroupRakeData::get_datareaders(), DDS::GROUP_PRESENTATION_QOS, DDS::SubscriberQos::presentation, OpenDDS::DCPS::push_back(), qos_, DDS::RETCODE_ERROR, DDS::RETCODE_OK, DDS::RETCODE_PRECONDITION_NOT_MET, and si_lock_.

00477 {
00478   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00479                    guard,
00480                    this->si_lock_,
00481                    DDS::RETCODE_ERROR);
00482 
00483 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00484   // If access_scope is GROUP and ordered_access is true then return readers as
00485   // list which may contain same readers multiple times. Otherwise return readers
00486   // as set.
00487   if (this->qos_.presentation.access_scope == ::DDS::GROUP_PRESENTATION_QOS) {
00488     if (this->access_depth_ == 0 && this->qos_.presentation.coherent_access) {
00489       return ::DDS::RETCODE_PRECONDITION_NOT_MET;
00490     }
00491 
00492     if (this->qos_.presentation.ordered_access) {
00493 
00494       GroupRakeData data;
00495       for (DataReaderSet::const_iterator pos = datareader_set_.begin();
00496            pos != datareader_set_.end(); ++pos) {
00497         (*pos)->get_ordered_data (data, sample_states, view_states, instance_states);
00498       }
00499 
00500       // Return list of readers in the order of the source timestamp of the received
00501       // samples from readers.
00502       data.get_datareaders (readers);
00503 
00504       return DDS::RETCODE_OK;
00505     }
00506   }
00507 #endif
00508 
00509   // Return set of datareaders.
00510   readers.length(0);
00511 
00512   for (DataReaderSet::const_iterator pos = datareader_set_.begin();
00513        pos != datareader_set_.end(); ++pos) {
00514     if ((*pos)->have_sample_states(sample_states) &&
00515         (*pos)->have_view_states(view_states) &&
00516         (*pos)->have_instance_states(instance_states)) {
00517       push_back(readers, DDS::DataReader::_duplicate(pos->in()));
00518     }
00519   }
00520 
00521   return DDS::RETCODE_OK;
00522 }

Here is the call graph for this function:

DDS::ReturnCode_t OpenDDS::DCPS::SubscriberImpl::get_default_datareader_qos ( DDS::DataReaderQos qos  )  [virtual]

Definition at line 770 of file SubscriberImpl.cpp.

References default_datareader_qos_, and DDS::RETCODE_OK.

Referenced by OpenDDS::DCPS::PeerDiscovery< Spdp >::init_bit().

00772 {
00773   qos = default_datareader_qos_;
00774   return DDS::RETCODE_OK;
00775 }

Here is the caller graph for this function:

DDS::InstanceHandle_t OpenDDS::DCPS::SubscriberImpl::get_instance_handle (  )  [virtual]

Implements OpenDDS::DCPS::EntityImpl.

Definition at line 79 of file SubscriberImpl.cpp.

References handle_.

Referenced by OpenDDS::DCPS::SubscriberMonitorImpl::report().

00080 {
00081   return handle_;
00082 }

Here is the caller graph for this function:

DDS::SubscriberListener_ptr OpenDDS::DCPS::SubscriberImpl::get_listener (  )  [virtual]

Implements DDS::Subscriber.

Definition at line 668 of file SubscriberImpl.cpp.

References CORBA::LocalObject::_duplicate(), and listener_.

00669 {
00670   return DDS::SubscriberListener::_duplicate(listener_.in());
00671 }

Here is the call graph for this function:

DDS::DomainParticipant_ptr OpenDDS::DCPS::SubscriberImpl::get_participant (  )  [virtual]

Implements DDS::Subscriber.

Definition at line 751 of file SubscriberImpl.cpp.

References OpenDDS::DCPS::RcHandle< T >::_retn(), OpenDDS::DCPS::WeakRcHandle< T >::lock(), and participant_.

Referenced by OpenDDS::DCPS::PeerDiscovery< Spdp >::create_bit_dr(), OpenDDS::DCPS::MultiTopicDataReaderBase::init(), and OpenDDS::DCPS::SubscriberMonitorImpl::report().

00752 {
00753   return participant_.lock()._retn();
00754 }

Here is the call graph for this function:

Here is the caller graph for this function:

DDS::ReturnCode_t OpenDDS::DCPS::SubscriberImpl::get_qos ( DDS::SubscriberQos qos  )  [virtual]

Definition at line 649 of file SubscriberImpl.cpp.

References qos_, and DDS::RETCODE_OK.

Referenced by OpenDDS::DCPS::DataReaderImpl::init().

00651 {
00652   qos = qos_;
00653   return DDS::RETCODE_OK;
00654 }

Here is the caller graph for this function:

void OpenDDS::DCPS::SubscriberImpl::get_subscription_ids ( SubscriptionIdVec &  subs  ) 

Populates a std::vector with the SubscriptionIds (GUIDs) of this Subscriber's Data Readers

Definition at line 923 of file SubscriberImpl.cpp.

References datareader_map_, and si_lock_.

Referenced by OpenDDS::DCPS::SubscriberMonitorImpl::report().

00924 {
00925   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00926                    guard,
00927                    this->si_lock_,
00928                    );
00929 
00930   subs.reserve(datareader_map_.size());
00931   for (DataReaderMap::iterator iter = datareader_map_.begin();
00932        iter != datareader_map_.end();
00933        ++iter) {
00934     subs.push_back(iter->second->get_subscription_id());
00935   }
00936 }

Here is the caller graph for this function:

bool OpenDDS::DCPS::SubscriberImpl::is_clean (  )  const

This method is not defined in the IDL and is defined for internal use. Check if there is any datareader associated with it.

Definition at line 829 of file SubscriberImpl.cpp.

References datareader_map_, and TheTransientKludge.

Referenced by OpenDDS::DCPS::DomainParticipantImpl::delete_subscriber(), and ~SubscriberImpl().

00830 {
00831   const bool sub_is_clean = datareader_map_.empty();
00832 
00833   if (!sub_is_clean && !TheTransientKludge->is_enabled()) {
00834     // Four BIT datareaders.
00835     return datareader_map_.size() == 4;
00836   }
00837 
00838   return sub_is_clean;
00839 }

Here is the caller graph for this function:

DDS::SubscriberListener_ptr OpenDDS::DCPS::SubscriberImpl::listener_for ( DDS::StatusKind  kind  ) 
DDS::DataReader_ptr OpenDDS::DCPS::SubscriberImpl::lookup_datareader ( const char *  topic_name  )  [virtual]

Definition at line 435 of file SubscriberImpl.cpp.

References CORBA::LocalObject::_duplicate(), CORBA::LocalObject::_nil(), ACE_TEXT(), datareader_map_, OpenDDS::DCPS::DCPS_debug_level, LM_DEBUG, OPENDDS_MAP(), OPENDDS_STRING, and si_lock_.

00437 {
00438   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00439                    guard,
00440                    this->si_lock_,
00441                    DDS::DataReader::_nil());
00442 
00443   // If multiple entries whose key is "topic_name" then which one is
00444   // returned ? Spec does not limit which one should give.
00445   DataReaderMap::iterator it = datareader_map_.find(topic_name);
00446 
00447   if (it == datareader_map_.end()) {
00448 #ifndef OPENDDS_NO_MULTI_TOPIC
00449     OPENDDS_MAP(OPENDDS_STRING, DDS::DataReader_var)::iterator mt_iter =
00450       multitopic_reader_map_.find(topic_name);
00451     if (mt_iter != multitopic_reader_map_.end()) {
00452       return DDS::DataReader::_duplicate(mt_iter->second);
00453     }
00454 #endif
00455 
00456     if (DCPS_debug_level >= 2) {
00457       ACE_DEBUG((LM_DEBUG,
00458                  ACE_TEXT("(%P|%t) ")
00459                  ACE_TEXT("SubscriberImpl::lookup_datareader, ")
00460                  ACE_TEXT("The datareader(topic_name=%C) is not found\n"),
00461                  topic_name));
00462     }
00463 
00464     return DDS::DataReader::_nil();
00465 
00466   } else {
00467     return DDS::DataReader::_duplicate(it->second.in());
00468   }
00469 }

Here is the call graph for this function:

DDS::ReturnCode_t OpenDDS::DCPS::SubscriberImpl::multitopic_reader_enabled ( DDS::DataReader_ptr  reader  ) 

Definition at line 876 of file SubscriberImpl.cpp.

References CORBA::LocalObject::_duplicate(), and DDS::RETCODE_OK.

Referenced by create_datareader().

00877 {
00878   DDS::TopicDescription_var td = reader->get_topicdescription();
00879   CORBA::String_var topic = td->get_name();
00880   multitopic_reader_map_[topic.in()] = DDS::DataReader::_duplicate(reader);
00881   return DDS::RETCODE_OK;
00882 }

Here is the call graph for this function:

Here is the caller graph for this function:

DDS::ReturnCode_t OpenDDS::DCPS::SubscriberImpl::notify_datareaders (  )  [virtual]

Implements DDS::Subscriber.

Definition at line 525 of file SubscriberImpl.cpp.

References ACE_TEXT(), DDS::DATA_AVAILABLE_STATUS, datareader_map_, OpenDDS::DCPS::MultiTopicDataReaderBase::get_listener(), OpenDDS::DCPS::MultiTopicDataReaderBase::have_sample_states(), CORBA::is_nil(), LM_ERROR, DDS::NOT_READ_SAMPLE_STATE, OPENDDS_MAP(), OPENDDS_STRING, DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::MultiTopicDataReaderBase::set_status_changed_flag(), and si_lock_.

00526 {
00527   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00528                    guard,
00529                    this->si_lock_,
00530                    DDS::RETCODE_ERROR);
00531 
00532   DataReaderMap::iterator it;
00533 
00534   for (it = datareader_map_.begin(); it != datareader_map_.end(); ++it) {
00535     if (it->second->have_sample_states(DDS::NOT_READ_SAMPLE_STATE)) {
00536       DDS::DataReaderListener_var listener = it->second->get_listener();
00537       if (!CORBA::is_nil (listener)) {
00538         listener->on_data_available(it->second.in());
00539       }
00540 
00541       it->second->set_status_changed_flag(DDS::DATA_AVAILABLE_STATUS, false);
00542     }
00543   }
00544 
00545 #ifndef OPENDDS_NO_MULTI_TOPIC
00546   for (OPENDDS_MAP(OPENDDS_STRING, DDS::DataReader_var)::iterator it =
00547          multitopic_reader_map_.begin(); it != multitopic_reader_map_.end();
00548        ++it) {
00549     MultiTopicDataReaderBase* dri =
00550       dynamic_cast<MultiTopicDataReaderBase*>(it->second.in());
00551 
00552     if (!dri) {
00553       ACE_ERROR_RETURN((LM_ERROR,
00554         ACE_TEXT("(%P|%t) ERROR: SubscriberImpl::notify_datareaders: ")
00555         ACE_TEXT("failed to obtain MultiTopicDataReaderBase.\n")),
00556         ::DDS::RETCODE_ERROR);
00557     }
00558 
00559     if (dri->have_sample_states(DDS::NOT_READ_SAMPLE_STATE)) {
00560       DDS::DataReaderListener_var listener = dri->get_listener();
00561       if (!CORBA::is_nil(listener)) {
00562         listener->on_data_available(dri);
00563       }
00564       dri->set_status_changed_flag(DDS::DATA_AVAILABLE_STATUS, false);
00565     }
00566   }
00567 #endif
00568 
00569   return DDS::RETCODE_OK;
00570 }

Here is the call graph for this function:

OpenDDS::DCPS::SubscriberImpl::OPENDDS_MAP ( OPENDDS_STRING  ,
DDS::DataReader_var   
) [private]

Referenced by delete_contained_entities(), lookup_datareader(), and notify_datareaders().

Here is the caller graph for this function:

typedef OpenDDS::DCPS::SubscriberImpl::OPENDDS_MAP_CMP ( RepoId  ,
DDS::DataReaderQos  ,
GUID_tKeyLessThan   
) [private]

DataReader id to qos map.

typedef OpenDDS::DCPS::SubscriberImpl::OPENDDS_MULTIMAP ( OPENDDS_STRING  ,
DataReaderImpl_rch   
) [private]

Keep track of all the DataReaders attached to this Subscriber: key is the topic_name

typedef OpenDDS::DCPS::SubscriberImpl::OPENDDS_SET ( DataReaderImpl_rch   )  [private]

Keep track of DataReaders with data std::set for now, want to encapsulate this so we can switch between a set or list depending on Presentation QoS.

typedef OpenDDS::DCPS::SubscriberImpl::OPENDDS_VECTOR ( RepoId   ) 
RcHandle< EntityImpl > OpenDDS::DCPS::SubscriberImpl::parent ( void   )  const [virtual]

Reimplemented from OpenDDS::DCPS::EntityImpl.

Definition at line 1009 of file SubscriberImpl.cpp.

References OpenDDS::DCPS::WeakRcHandle< T >::lock(), and participant_.

01010 {
01011   return this->participant_.lock();
01012 }

Here is the call graph for this function:

unsigned int & OpenDDS::DCPS::SubscriberImpl::raw_latency_buffer_size (  ) 

Configure the size of the raw data collection buffer.

Definition at line 911 of file SubscriberImpl.cpp.

References raw_latency_buffer_size_.

Referenced by OpenDDS::DCPS::MultiTopicDataReaderBase::init().

00912 {
00913   return this->raw_latency_buffer_size_;
00914 }

Here is the caller graph for this function:

DataCollector< double >::OnFull & OpenDDS::DCPS::SubscriberImpl::raw_latency_buffer_type (  ) 

Configure the type of the raw data collection buffer.

Definition at line 917 of file SubscriberImpl.cpp.

References raw_latency_buffer_type_.

Referenced by OpenDDS::DCPS::MultiTopicDataReaderBase::init().

00918 {
00919   return this->raw_latency_buffer_type_;
00920 }

Here is the caller graph for this function:

DDS::ReturnCode_t OpenDDS::DCPS::SubscriberImpl::reader_enabled ( const char *  topic_name,
DataReaderImpl reader 
)

Definition at line 851 of file SubscriberImpl.cpp.

References ACE_TEXT(), datareader_map_, OpenDDS::DCPS::DCPS_debug_level, LM_DEBUG, monitor_, OpenDDS::DCPS::rchandle_from(), readers_not_enabled_, OpenDDS::DCPS::Monitor::report(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, and si_lock_.

00853 {
00854   if (DCPS_debug_level >= 4) {
00855     ACE_DEBUG((LM_DEBUG,
00856                ACE_TEXT("(%P|%t) SubscriberImpl::reader_enabled, ")
00857                ACE_TEXT("datareader(topic_name=%C) enabled\n"),
00858                topic_name));
00859   }
00860 
00861   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, si_lock_, DDS::RETCODE_ERROR);
00862   DataReaderImpl_rch reader = rchandle_from(reader_ptr);
00863   readers_not_enabled_.erase(reader);
00864 
00865   this->datareader_map_.insert(DataReaderMap::value_type(topic_name, reader));
00866 
00867   if (this->monitor_) {
00868     this->monitor_->report();
00869   }
00870 
00871   return DDS::RETCODE_OK;
00872 }

Here is the call graph for this function:

void OpenDDS::DCPS::SubscriberImpl::remove_from_datareader_set ( DataReaderImpl reader  ) 

Definition at line 885 of file SubscriberImpl.cpp.

References datareader_set_, OpenDDS::DCPS::rchandle_from(), and si_lock_.

Referenced by OpenDDS::DCPS::MultiTopicDataReaderBase::cleanup().

00886 {
00887   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, si_lock_);
00888   datareader_set_.erase(rchandle_from(reader));
00889 }

Here is the call graph for this function:

Here is the caller graph for this function:

DDS::ReturnCode_t OpenDDS::DCPS::SubscriberImpl::set_default_datareader_qos ( const DDS::DataReaderQos qos  )  [virtual]

Definition at line 757 of file SubscriberImpl.cpp.

References OpenDDS::DCPS::Qos_Helper::consistent(), default_datareader_qos_, DDS::RETCODE_INCONSISTENT_POLICY, DDS::RETCODE_OK, and OpenDDS::DCPS::Qos_Helper::valid().

00759 {
00760   if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
00761     default_datareader_qos_ = qos;
00762     return DDS::RETCODE_OK;
00763 
00764   } else {
00765     return DDS::RETCODE_INCONSISTENT_POLICY;
00766   }
00767 }

Here is the call graph for this function:

DDS::ReturnCode_t OpenDDS::DCPS::SubscriberImpl::set_listener ( DDS::SubscriberListener_ptr  a_listener,
DDS::StatusMask  mask 
) [virtual]

Definition at line 657 of file SubscriberImpl.cpp.

References CORBA::LocalObject::_duplicate(), listener_, listener_mask_, and DDS::RETCODE_OK.

00660 {
00661   listener_mask_ = mask;
00662   //note: OK to duplicate  a nil object ref
00663   listener_ = DDS::SubscriberListener::_duplicate(a_listener);
00664   return DDS::RETCODE_OK;
00665 }

Here is the call graph for this function:

DDS::ReturnCode_t OpenDDS::DCPS::SubscriberImpl::set_qos ( const DDS::SubscriberQos qos  )  [virtual]

Definition at line 573 of file SubscriberImpl.cpp.

References ACE_TEXT(), OpenDDS::DCPS::Qos_Helper::changeable(), OpenDDS::DCPS::Qos_Helper::consistent(), datareader_map_, domain_id_, dp_id_, OpenDDS::DCPS::EntityImpl::enabled_, LM_ERROR, OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK, OPENDDS_STRING, qos_, DDS::RETCODE_ERROR, DDS::RETCODE_IMMUTABLE_POLICY, DDS::RETCODE_INCONSISTENT_POLICY, DDS::RETCODE_OK, DDS::RETCODE_UNSUPPORTED, si_lock_, status, TheServiceParticipant, and OpenDDS::DCPS::Qos_Helper::valid().

00575 {
00576 
00577   OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00578 
00579   if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
00580     if (qos_ == qos)
00581       return DDS::RETCODE_OK;
00582 
00583     // for the not changeable qos, it can be changed before enable
00584     if (!Qos_Helper::changeable(qos_, qos) && enabled_ == true) {
00585       return DDS::RETCODE_IMMUTABLE_POLICY;
00586 
00587     } else {
00588       qos_ = qos;
00589 
00590       DrIdToQosMap idToQosMap;
00591       {
00592         ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00593                          guard,
00594                          this->si_lock_,
00595                          DDS::RETCODE_ERROR);
00596         // after FaceCTS bug 619 is fixed, make endIter and iter const iteratorsx
00597         DataReaderMap::iterator endIter = datareader_map_.end();
00598 
00599         for (DataReaderMap::iterator iter = datareader_map_.begin();
00600              iter != endIter; ++iter) {
00601           DataReaderImpl_rch reader = iter->second;
00602           reader->set_subscriber_qos (qos);
00603           DDS::DataReaderQos qos;
00604           reader->get_qos(qos);
00605           RepoId id = reader->get_subscription_id();
00606           std::pair<DrIdToQosMap::iterator, bool> pair
00607             = idToQosMap.insert(DrIdToQosMap::value_type(id, qos));
00608 
00609           if (pair.second == false) {
00610             GuidConverter converter(id);
00611             ACE_ERROR_RETURN((LM_ERROR,
00612                               ACE_TEXT("(%P|%t) ERROR: SubscriberImpl::set_qos: ")
00613                               ACE_TEXT("insert %C to DrIdToQosMap failed.\n"),
00614                               OPENDDS_STRING(converter).c_str()),::DDS::RETCODE_ERROR);
00615           }
00616         }
00617       }
00618 
00619       DrIdToQosMap::iterator iter = idToQosMap.begin();
00620 
00621       while (iter != idToQosMap.end()) {
00622         Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
00623         const bool status
00624           = disco->update_subscription_qos(this->domain_id_,
00625                                            this->dp_id_,
00626                                            iter->first,
00627                                            iter->second,
00628                                            this->qos_);
00629 
00630         if (!status) {
00631           ACE_ERROR_RETURN((LM_ERROR,
00632                             ACE_TEXT("(%P|%t) SubscriberImpl::set_qos, ")
00633                             ACE_TEXT("failed. \n")),
00634                            DDS::RETCODE_ERROR);
00635         }
00636 
00637         ++iter;
00638       }
00639     }
00640 
00641     return DDS::RETCODE_OK;
00642 
00643   } else {
00644     return DDS::RETCODE_INCONSISTENT_POLICY;
00645   }
00646 }

Here is the call graph for this function:

void OpenDDS::DCPS::SubscriberImpl::update_ownership_strength ( const PublicationId pub_id,
const CORBA::Long ownership_strength 
)

Definition at line 940 of file SubscriberImpl.cpp.

References datareader_map_, and si_lock_.

00942 {
00943   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00944                    guard,
00945                    this->si_lock_,
00946                    );
00947 
00948   for (DataReaderMap::iterator iter = datareader_map_.begin();
00949        iter != datareader_map_.end();
00950        ++iter) {
00951     if (!iter->second->is_bit()) {
00952       iter->second->update_ownership_strength(pub_id, ownership_strength);
00953     }
00954   }
00955 }

bool OpenDDS::DCPS::SubscriberImpl::validate_datareader_qos ( const DDS::DataReaderQos qos,
const DDS::DataReaderQos default_qos,
DDS::Topic_ptr  a_topic,
DDS::DataReaderQos result_qos,
bool  mt 
) [static]

Definition at line 1015 of file SubscriberImpl.cpp.

References CORBA::LocalObject::_nil(), ACE_TEXT(), OpenDDS::DCPS::Qos_Helper::consistent(), copy_from_topic_qos(), DATAREADER_QOS_DEFAULT, DATAREADER_QOS_USE_TOPIC_QOS, OpenDDS::DCPS::DCPS_debug_level, LM_ERROR, OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK, OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK, OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK, and OpenDDS::DCPS::Qos_Helper::valid().

Referenced by create_datareader(), and OpenDDS::DCPS::DomainParticipantImpl::create_recorder().

01020 {
01021 
01022 
01023   if (qos == DATAREADER_QOS_DEFAULT) {
01024     dr_qos = default_qos;
01025 
01026   } else if (qos == DATAREADER_QOS_USE_TOPIC_QOS) {
01027 
01028 #ifndef OPENDDS_NO_MULTI_TOPIC
01029     if (mt) {
01030       if (DCPS_debug_level) {
01031         ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
01032                    ACE_TEXT("SubscriberImpl::create_datareader, ")
01033                    ACE_TEXT("DATAREADER_QOS_USE_TOPIC_QOS can not be used ")
01034                    ACE_TEXT("to create a MultiTopic DataReader.\n")));
01035       }
01036       return DDS::DataReader::_nil();
01037     }
01038 #else
01039     ACE_UNUSED_ARG(mt);
01040 #endif
01041     DDS::TopicQos topic_qos;
01042     a_topic->get_qos(topic_qos);
01043 
01044     dr_qos = default_qos;
01045 
01046     Qos_Helper::copy_from_topic_qos(dr_qos,
01047                                     topic_qos);
01048 
01049   } else {
01050     dr_qos = qos;
01051   }
01052 
01053   OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(dr_qos, false);
01054   OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(dr_qos, false);
01055   OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(dr_qos, false);
01056 
01057   if (!Qos_Helper::valid(dr_qos)) {
01058     ACE_ERROR((LM_ERROR,
01059                ACE_TEXT("(%P|%t) ERROR: ")
01060                ACE_TEXT("SubscriberImpl::create_datareader, ")
01061                ACE_TEXT("invalid qos.\n")));
01062     return false;
01063   }
01064 
01065   if (!Qos_Helper::consistent(dr_qos)) {
01066     ACE_ERROR((LM_ERROR,
01067                ACE_TEXT("(%P|%t) ERROR: ")
01068                ACE_TEXT("SubscriberImpl::create_datareader, ")
01069                ACE_TEXT("inconsistent qos.\n")));
01070     return false;
01071   }
01072 
01073 
01074   return true;
01075 }

Here is the call graph for this function:

Here is the caller graph for this function:


Member Data Documentation

Definition at line 210 of file SubscriberImpl.h.

Referenced by begin_access(), end_access(), and get_datareaders().

Definition at line 195 of file SubscriberImpl.h.

Referenced by set_qos().

Definition at line 196 of file SubscriberImpl.h.

Referenced by enable(), and set_qos().

Definition at line 177 of file SubscriberImpl.h.

Referenced by get_instance_handle().

DDS::SubscriberListener_var OpenDDS::DCPS::SubscriberImpl::listener_ [private]

Definition at line 183 of file SubscriberImpl.h.

Referenced by get_listener(), set_listener(), and SubscriberImpl().

Definition at line 182 of file SubscriberImpl.h.

Referenced by set_listener().

Monitor object for this entity.

Definition at line 208 of file SubscriberImpl.h.

Referenced by enable(), reader_enabled(), and SubscriberImpl().

Definition at line 193 of file SubscriberImpl.h.

Referenced by create_datareader(), enable(), get_participant(), and parent().

Bound (or initial reservation) of raw latency buffers.

Definition at line 199 of file SubscriberImpl.h.

Referenced by create_datareader(), and raw_latency_buffer_size().

Type of raw latency data buffers.

Definition at line 202 of file SubscriberImpl.h.

Referenced by create_datareader(), and raw_latency_buffer_type().

Definition at line 185 of file SubscriberImpl.h.

Referenced by create_datareader(), enable(), and reader_enabled().


The documentation for this class was generated from the following files:
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1