LCOV - code coverage report
Current view: top level - DCPS - SubscriberImpl.cpp (source / functions) Hit Total Coverage
Test: coverage.info Lines: 0 518 0.0 %
Date: 2023-04-30 01:32:43 Functions: 0 37 0.0 %

          Line data    Source code
       1             : /*
       2             :  * Distributed under the OpenDDS License.
       3             :  * See: http://www.opendds.org/license.html
       4             :  */
       5             : 
       6             : #include <DCPS/DdsDcps_pch.h> // Only the _pch include should start with DCPS/
       7             : 
       8             : #include "debug.h"
       9             : #include "SubscriberImpl.h"
      10             : #include "FeatureDisabledQosCheck.h"
      11             : #include "DomainParticipantImpl.h"
      12             : #include "Qos_Helper.h"
      13             : #include "GuidConverter.h"
      14             : #include "BuiltInTopicUtils.h"
      15             : #include "TopicImpl.h"
      16             : #include "MonitorFactory.h"
      17             : #include "DataReaderImpl.h"
      18             : #include "Service_Participant.h"
      19             : #include "TopicDescriptionImpl.h"
      20             : #include "Marked_Default_Qos.h"
      21             : #include "Transient_Kludge.h"
      22             : #include "ContentFilteredTopicImpl.h"
      23             : #include "MultiTopicImpl.h"
      24             : #include "GroupRakeData.h"
      25             : #include "MultiTopicDataReaderBase.h"
      26             : #include "Util.h"
      27             : #include "transport/framework/TransportImpl.h"
      28             : #include "transport/framework/DataLinkSet.h"
      29             : #include "DCPS_Utils.h"
      30             : #include "PoolAllocator.h"
      31             : 
      32             : #include <dds/DdsDcpsTypeSupportExtC.h>
      33             : 
      34             : #include <stdexcept>
      35             : 
      36             : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
      37             : 
      38             : namespace OpenDDS {
      39             : namespace DCPS {
      40             : 
      41           0 : SubscriberImpl::SubscriberImpl(DDS::InstanceHandle_t       handle,
      42             :                                const DDS::SubscriberQos &  qos,
      43             :                                DDS::SubscriberListener_ptr a_listener,
      44             :                                const DDS::StatusMask&      mask,
      45           0 :                                DomainParticipantImpl*      participant)
      46           0 :   : handle_(handle),
      47           0 :   qos_(qos),
      48           0 :   default_datareader_qos_(TheServiceParticipant->initial_DataReaderQos()),
      49           0 :   listener_mask_(mask),
      50           0 :   participant_(*participant),
      51           0 :   domain_id_(participant->get_domain_id()),
      52           0 :   raw_latency_buffer_size_(0),
      53           0 :   raw_latency_buffer_type_(DataCollector<double>::KeepOldest),
      54           0 :   access_depth_ (0)
      55             : {
      56             :   //Note: OK to duplicate a nil.
      57           0 :   listener_ = DDS::SubscriberListener::_duplicate(a_listener);
      58             : 
      59           0 :   monitor_.reset(TheServiceParticipant->monitor_factory_->create_subscriber_monitor(this));
      60           0 : }
      61             : 
      62           0 : SubscriberImpl::~SubscriberImpl()
      63             : {
      64           0 :   const RcHandle<DomainParticipantImpl> participant = participant_.lock();
      65           0 :   if (participant) {
      66           0 :     participant->return_handle(handle_);
      67             :   }
      68             : 
      69             :   // The datareaders should be deleted already before calling delete
      70             :   // subscriber.
      71           0 :   String leftover_entities;
      72           0 :   if (!is_clean(&leftover_entities)) {
      73           0 :     if (log_level >= LogLevel::Warning) {
      74           0 :       ACE_ERROR((LM_WARNING, "(%P|%t) WARNING: SubscriberImpl::~SubscriberImpl: "
      75             :                  "%C still exist\n", leftover_entities.c_str()));
      76             :     }
      77             :   }
      78           0 : }
      79             : 
      80             : DDS::InstanceHandle_t
      81           0 : SubscriberImpl::get_instance_handle()
      82             : {
      83           0 :   return handle_;
      84             : }
      85             : 
      86             : bool
      87           0 : SubscriberImpl::contains_reader(DDS::InstanceHandle_t a_handle)
      88             : {
      89           0 :   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
      90             :                    guard,
      91             :                    this->si_lock_,
      92             :                    false);
      93             : 
      94           0 :   for (DataReaderMap::iterator it(datareader_map_.begin());
      95           0 :        it != datareader_map_.end(); ++it) {
      96           0 :     if (a_handle == it->second->get_instance_handle()) {
      97           0 :       return true;
      98             :     }
      99             :   }
     100             : 
     101           0 :   return false;
     102           0 : }
     103             : 
     104             : DDS::DataReader_ptr
     105           0 : SubscriberImpl::create_datareader(
     106             :   DDS::TopicDescription_ptr   a_topic_desc,
     107             :   const DDS::DataReaderQos &  qos,
     108             :   DDS::DataReaderListener_ptr a_listener,
     109             :   DDS::StatusMask             mask)
     110             : {
     111           0 :   if (CORBA::is_nil(a_topic_desc)) {
     112           0 :     if (DCPS_debug_level > 0) {
     113           0 :       ACE_ERROR((LM_ERROR,
     114             :                 ACE_TEXT("(%P|%t) ERROR: ")
     115             :                 ACE_TEXT("SubscriberImpl::create_datareader, ")
     116             :                 ACE_TEXT("topic desc is nil.\n")));
     117             :     }
     118           0 :     return DDS::DataReader::_nil();
     119             :   }
     120             : 
     121           0 :   DDS::DataReaderQos dr_qos;
     122           0 :   RcHandle<DomainParticipantImpl> participant = this->participant_.lock();
     123           0 :   if (!participant)
     124           0 :     return DDS::DataReader::_nil();
     125             : 
     126           0 :   TopicImpl* topic_servant = dynamic_cast<TopicImpl*>(a_topic_desc);
     127             : 
     128             : #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
     129           0 :   ContentFilteredTopicImpl* cft = 0;
     130             : #endif
     131             : #ifndef OPENDDS_NO_MULTI_TOPIC
     132           0 :   MultiTopicImpl* mt = 0;
     133             : #else
     134             :   bool mt = false;
     135             : #endif
     136             : 
     137           0 :   if (!topic_servant) {
     138             : #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
     139           0 :     cft = dynamic_cast<ContentFilteredTopicImpl*>(a_topic_desc);
     140           0 :     if (cft) {
     141           0 :       DDS::Topic_var related;
     142           0 :       related = cft->get_related_topic();
     143           0 :       topic_servant = dynamic_cast<TopicImpl*>(related.in());
     144           0 :     }
     145             :     else
     146             : #endif
     147             :     {
     148             : #ifndef OPENDDS_NO_MULTI_TOPIC
     149           0 :       mt = dynamic_cast<MultiTopicImpl*>(a_topic_desc);
     150             : #endif
     151             :     }
     152             :   }
     153             : 
     154           0 :   if (!validate_datareader_qos (qos, default_datareader_qos_, topic_servant, dr_qos, mt))
     155           0 :     return DDS::DataReader::_nil();
     156             : 
     157             : #ifndef OPENDDS_NO_MULTI_TOPIC
     158           0 :   if (mt) {
     159             :     try {
     160             :       DDS::DataReader_var dr =
     161           0 :         mt->get_type_support()->create_multitopic_datareader();
     162             :       MultiTopicDataReaderBase* mtdr =
     163           0 :         dynamic_cast<MultiTopicDataReaderBase*>(dr.in());
     164           0 :       mtdr->init(dr_qos, a_listener, mask, this, mt);
     165           0 :       if (enabled_ && qos_.entity_factory.autoenable_created_entities) {
     166           0 :         if (dr->enable() != DDS::RETCODE_OK) {
     167           0 :           if (DCPS_debug_level > 0) {
     168           0 :             ACE_ERROR((LM_ERROR,
     169             :                       ACE_TEXT("(%P|%t) ERROR: ")
     170             :                       ACE_TEXT("SubscriberImpl::create_datareader, ")
     171             :                       ACE_TEXT("enable of MultiTopicDataReader failed.\n")));
     172             :           }
     173           0 :           return DDS::DataReader::_nil();
     174             :         }
     175           0 :         multitopic_reader_enabled(dr);
     176             :       }
     177           0 :       return dr._retn();
     178           0 :     } catch (const std::exception& e) {
     179           0 :       if (DCPS_debug_level > 0) {
     180           0 :         ACE_ERROR((LM_ERROR,
     181             :                   ACE_TEXT("(%P|%t) ERROR: ")
     182             :                   ACE_TEXT("SubscriberImpl::create_datareader, ")
     183             :                   ACE_TEXT("creation of MultiTopicDataReader failed: %C.\n"),
     184             :                   e.what()));
     185             :       }
     186           0 :     }
     187           0 :     return DDS::DataReader::_nil();
     188             :   }
     189             : #endif
     190             : 
     191             :   OpenDDS::DCPS::TypeSupport_ptr typesupport =
     192           0 :     topic_servant->get_type_support();
     193             : 
     194           0 :   if (0 == typesupport) {
     195           0 :     CORBA::String_var name = a_topic_desc->get_name();
     196           0 :     if (DCPS_debug_level > 0) {
     197           0 :       ACE_ERROR((LM_ERROR,
     198             :                 ACE_TEXT("(%P|%t) ERROR: ")
     199             :                 ACE_TEXT("SubscriberImpl::create_datareader, ")
     200             :                 ACE_TEXT("typesupport(topic_name=%C) is nil.\n"),
     201             :                 name.in()));
     202             :     }
     203           0 :     return DDS::DataReader::_nil();
     204           0 :   }
     205             : 
     206           0 :   DDS::DataReader_var dr_obj = typesupport->create_datareader();
     207             : 
     208             :   DataReaderImpl* dr_servant =
     209           0 :     dynamic_cast<DataReaderImpl*>(dr_obj.in());
     210             : 
     211           0 :   if (dr_servant == 0) {
     212           0 :     if (DCPS_debug_level > 0) {
     213           0 :       ACE_ERROR((LM_ERROR,
     214             :           ACE_TEXT("(%P|%t) ERROR: ")
     215             :           ACE_TEXT("SubscriberImpl::create_datareader, ")
     216             :           ACE_TEXT("servant is nil.\n")));
     217             :     }
     218           0 :     return DDS::DataReader::_nil();
     219             :   }
     220             : 
     221             : #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
     222           0 :   if (cft) {
     223           0 :     dr_servant->enable_filtering(cft);
     224             :   }
     225             : #endif
     226             : 
     227             :   // Propagate the latency buffer data collection configuration.
     228             :   // @TODO: Determine whether we want to exclude the Builtin Topic
     229             :   //        readers from data gathering.
     230           0 :   dr_servant->raw_latency_buffer_size() = this->raw_latency_buffer_size_;
     231           0 :   dr_servant->raw_latency_buffer_type() = this->raw_latency_buffer_type_;
     232             : 
     233             : 
     234           0 :   dr_servant->init(topic_servant,
     235             :                    dr_qos,
     236             :                    a_listener,
     237             :                    mask,
     238             :                    participant.in(),
     239             :                    this);
     240             : 
     241           0 :   if (enabled_ && qos_.entity_factory.autoenable_created_entities) {
     242           0 :     const DDS::ReturnCode_t ret = dr_servant->enable();
     243             : 
     244           0 :     if (ret != DDS::RETCODE_OK) {
     245           0 :       if (DCPS_debug_level > 0) {
     246           0 :         ACE_ERROR((LM_WARNING,
     247             :                   ACE_TEXT("(%P|%t) WARNING: ")
     248             :                   ACE_TEXT("SubscriberImpl::create_datareader, ")
     249             :                   ACE_TEXT("enable failed.\n")));
     250             :       }
     251           0 :       return DDS::DataReader::_nil();
     252             :     }
     253             :   } else {
     254           0 :     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, si_lock_, 0);
     255           0 :     readers_not_enabled_.insert(rchandle_from(dr_servant));
     256           0 :   }
     257             : 
     258             :   // add created data reader to this' data reader container -
     259             :   // done in enable_reader
     260           0 :   return DDS::DataReader::_duplicate(dr_obj.in());
     261           0 : }
     262             : 
     263             : DDS::ReturnCode_t
     264           0 : SubscriberImpl::delete_datareader(::DDS::DataReader_ptr a_datareader)
     265             : {
     266             :   DBG_ENTRY_LVL("SubscriberImpl", "delete_datareader", 6);
     267             : 
     268           0 :   DataReaderImpl_rch dr_servant = rchandle_from(dynamic_cast<DataReaderImpl*>(a_datareader));
     269             : 
     270           0 :   if (dr_servant) { // for MultiTopic this will be false
     271           0 :     const char* reason = " (ERROR: unknown reason)";
     272           0 :     DDS::ReturnCode_t rc = DDS::RETCODE_OK;
     273           0 :     RcHandle<SubscriberImpl> dr_subscriber = dr_servant->get_subscriber_servant();
     274           0 :     if (dr_subscriber.get() != this) {
     275           0 :       reason = "doesn't belong to this subscriber.";
     276           0 :       rc = DDS::RETCODE_PRECONDITION_NOT_MET;
     277           0 :     } else if (dr_servant->has_zero_copies()) {
     278           0 :       reason = "has outstanding zero-copy samples loaned out.";
     279           0 :       rc = DDS::RETCODE_PRECONDITION_NOT_MET;
     280           0 :     } else if (!dr_servant->read_conditions_.empty()) {
     281           0 :       reason = "has read conditions attached.";
     282           0 :       rc = DDS::RETCODE_PRECONDITION_NOT_MET;
     283             :     }
     284           0 :     if (rc != DDS::RETCODE_OK) {
     285           0 :       if (log_level >= LogLevel::Notice) {
     286           0 :         DDS::TopicDescription_var topic = a_datareader->get_topicdescription();
     287           0 :         CORBA::String_var topic_name = topic->get_name();
     288           0 :         ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: SubscriberImpl::delete_datareader: "
     289             :           "on reader %C (topic \"%C\") will return \"%C\" because it %C\n",
     290             :           LogGuid(dr_servant->get_id()).c_str(), topic_name.in(),
     291             :           retcode_to_string(rc), reason));
     292           0 :       }
     293           0 :       return rc;
     294             :     }
     295             : 
     296             :     // marks entity as deleted and stops future associating
     297           0 :     dr_servant->prepare_to_delete();
     298           0 :   }
     299             : 
     300             :   {
     301           0 :     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
     302             :                      si_guard,
     303             :                      this->si_lock_,
     304             :                      DDS::RETCODE_ERROR);
     305             : 
     306           0 :     DataReaderMap::iterator it;
     307           0 :     for (it = datareader_map_.begin(); it != datareader_map_.end(); ++it) {
     308           0 :       if (it->second == dr_servant) {
     309           0 :         break;
     310             :       }
     311             :     }
     312             : 
     313           0 :     if (it == datareader_map_.end()) {
     314           0 :       DDS::TopicDescription_var td = a_datareader->get_topicdescription();
     315           0 :       CORBA::String_var topic_name = td->get_name();
     316             : #ifndef OPENDDS_NO_MULTI_TOPIC
     317           0 :       MultitopicReaderMap::iterator mt_iter = multitopic_reader_map_.find(topic_name.in());
     318           0 :       if (mt_iter != multitopic_reader_map_.end()) {
     319           0 :         DDS::DataReader_ptr ptr = mt_iter->second;
     320           0 :         MultiTopicDataReaderBase* mtdrb = dynamic_cast<MultiTopicDataReaderBase*>(ptr);
     321           0 :         if (!mtdrb) {
     322           0 :           if (DCPS_debug_level > 0) {
     323           0 :             ACE_ERROR((LM_ERROR,
     324             :               ACE_TEXT("(%P|%t) ERROR: ")
     325             :               ACE_TEXT("SubscriberImpl::delete_datareader: ")
     326             :               ACE_TEXT("datareader(topic_name=%C)")
     327             :               ACE_TEXT("failed to obtain MultiTopicDataReaderBase.\n"),
     328             :               topic_name.in()));
     329             :           }
     330           0 :           return ::DDS::RETCODE_ERROR;
     331             :         }
     332           0 :         mtdrb->cleanup();
     333           0 :         multitopic_reader_map_.erase(mt_iter);
     334           0 :         return DDS::RETCODE_OK;
     335             :       }
     336             : #endif
     337           0 :       if (!dr_servant) {
     338           0 :         if (DCPS_debug_level > 0) {
     339           0 :           ACE_ERROR((LM_ERROR,
     340             :                     ACE_TEXT("(%P|%t) ERROR: ")
     341             :                     ACE_TEXT("SubscriberImpl::delete_datareader: ")
     342             :                     ACE_TEXT("datareader(topic_name=%C)")
     343             :                     ACE_TEXT("for unknown repo id not found.\n"),
     344             :                     topic_name.in()));
     345             :         }
     346           0 :         return ::DDS::RETCODE_ERROR;
     347             :       }
     348           0 :       if (DCPS_debug_level > 0) {
     349           0 :         GUID_t id = dr_servant->get_guid();
     350           0 :         ACE_ERROR((LM_ERROR,
     351             :                   ACE_TEXT("(%P|%t) ERROR: ")
     352             :                   ACE_TEXT("SubscriberImpl::delete_datareader: ")
     353             :                   ACE_TEXT("datareader(topic_name=%C) %C not found.\n"),
     354             :                   topic_name.in(),
     355             :                   LogGuid(id).c_str()));
     356             :       }
     357           0 :       return ::DDS::RETCODE_ERROR;
     358           0 :     }
     359             : 
     360           0 :     datareader_map_.erase(it);
     361             : 
     362           0 :     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
     363             :                      dr_set_guard,
     364             :                      this->dr_set_lock_,
     365             :                      DDS::RETCODE_ERROR);
     366           0 :     datareader_set_.erase(dr_servant);
     367           0 :   }
     368             : 
     369           0 :   if (this->monitor_) {
     370           0 :     this->monitor_->report();
     371             :   }
     372             : 
     373           0 :   const GUID_t subscription_id = dr_servant->get_guid();
     374           0 :   Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
     375           0 :   if (!disco->remove_subscription(this->domain_id_,
     376           0 :                                   this->dp_id_,
     377             :                                   subscription_id)) {
     378           0 :     if (DCPS_debug_level > 0) {
     379           0 :       ACE_ERROR((LM_ERROR,
     380             :                 ACE_TEXT("(%P|%t) ERROR: ")
     381             :                 ACE_TEXT("SubscriberImpl::delete_datareader: ")
     382             :                 ACE_TEXT(" could not remove subscription from discovery.\n")));
     383             :     }
     384           0 :     return ::DDS::RETCODE_ERROR;
     385             :   }
     386             : 
     387             :   // Call remove association before unregistering the datareader from the transport,
     388             :   // otherwise some callbacks resulted from remove_association may be lost.
     389           0 :   dr_servant->remove_all_associations();
     390           0 :   dr_servant->cleanup();
     391           0 :   return DDS::RETCODE_OK;
     392           0 : }
     393             : 
     394             : DDS::ReturnCode_t
     395           0 : SubscriberImpl::delete_contained_entities()
     396             : {
     397             :   // mark that the entity is being deleted
     398           0 :   set_deleted(true);
     399             : 
     400           0 :   OPENDDS_VECTOR(DDS::DataReader*) drs;
     401             : 
     402             : #ifndef OPENDDS_NO_MULTI_TOPIC
     403             :   {
     404           0 :     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
     405             :                      guard,
     406             :                      this->si_lock_,
     407             :                      DDS::RETCODE_ERROR);
     408           0 :     for (MultitopicReaderMap::iterator mt_iter = multitopic_reader_map_.begin();
     409           0 :          mt_iter != multitopic_reader_map_.end(); ++mt_iter) {
     410           0 :       drs.push_back(mt_iter->second);
     411             :     }
     412           0 :   }
     413             : 
     414           0 :   for (size_t i = 0; i < drs.size(); ++i) {
     415           0 :     DDS::ReturnCode_t ret = drs[i]->delete_contained_entities();
     416           0 :     if (ret == DDS::RETCODE_OK) {
     417           0 :       ret = delete_datareader(drs[i]);
     418             :     }
     419           0 :     if (ret != DDS::RETCODE_OK) {
     420           0 :       if (DCPS_debug_level > 0) {
     421           0 :         ACE_ERROR((LM_ERROR,
     422             :                   ACE_TEXT("(%P|%t) ERROR: ")
     423             :                   ACE_TEXT("SubscriberImpl::delete_contained_entities, ")
     424             :                   ACE_TEXT("failed to delete datareader\n")));
     425             :       }
     426           0 :       return ret;
     427             :     }
     428             :   }
     429           0 :   drs.clear();
     430             : #endif
     431             : 
     432             :   {
     433           0 :     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
     434             :                      guard,
     435             :                      this->si_lock_,
     436             :                      DDS::RETCODE_ERROR);
     437           0 :     DataReaderMap::iterator it;
     438           0 :     DataReaderMap::iterator itEnd = datareader_map_.end();
     439             : 
     440           0 :     for (it = datareader_map_.begin(); it != itEnd; ++it) {
     441           0 :       drs.push_back(it->second.in());
     442             :     }
     443           0 :   }
     444             : 
     445           0 :   for (size_t i = 0; i < drs.size(); ++i) {
     446           0 :     DDS::ReturnCode_t ret = drs[i]->delete_contained_entities();
     447           0 :     if (ret == DDS::RETCODE_OK) {
     448           0 :       ret = delete_datareader(drs[i]);
     449             :     }
     450           0 :     if (ret != DDS::RETCODE_OK) {
     451           0 :       if (DCPS_debug_level > 0) {
     452           0 :         ACE_ERROR((LM_ERROR,
     453             :                   ACE_TEXT("(%P|%t) ERROR: ")
     454             :                   ACE_TEXT("SubscriberImpl::delete_contained_entities, ")
     455             :                   ACE_TEXT("failed to delete datareader\n")));
     456             :       }
     457           0 :       return ret;
     458             :     }
     459             :   }
     460             : 
     461             :   // the subscriber can now start creating new publications
     462           0 :   set_deleted(false);
     463             : 
     464           0 :   return DDS::RETCODE_OK;
     465           0 : }
     466             : 
     467             : DDS::DataReader_ptr
     468           0 : SubscriberImpl::lookup_datareader(
     469             :   const char * topic_name)
     470             : {
     471           0 :   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
     472             :                    guard,
     473             :                    this->si_lock_,
     474             :                    DDS::DataReader::_nil());
     475             : 
     476             :   // If multiple entries whose key is "topic_name" then which one is
     477             :   // returned ? Spec does not limit which one should give.
     478           0 :   DataReaderMap::iterator it = datareader_map_.find(topic_name);
     479             : 
     480           0 :   if (it == datareader_map_.end()) {
     481             : #ifndef OPENDDS_NO_MULTI_TOPIC
     482           0 :     MultitopicReaderMap::iterator mt_iter = multitopic_reader_map_.find(topic_name);
     483           0 :     if (mt_iter != multitopic_reader_map_.end()) {
     484           0 :       return DDS::DataReader::_duplicate(mt_iter->second);
     485             :     }
     486             : #endif
     487             : 
     488           0 :     if (DCPS_debug_level >= 2) {
     489           0 :       ACE_DEBUG((LM_DEBUG,
     490             :                  ACE_TEXT("(%P|%t) ")
     491             :                  ACE_TEXT("SubscriberImpl::lookup_datareader, ")
     492             :                  ACE_TEXT("The datareader(topic_name=%C) is not found\n"),
     493             :                  topic_name));
     494             :     }
     495             : 
     496           0 :     return DDS::DataReader::_nil();
     497             : 
     498             :   } else {
     499           0 :     return DDS::DataReader::_duplicate(it->second.in());
     500             :   }
     501           0 : }
     502             : 
     503             : DDS::ReturnCode_t
     504           0 : SubscriberImpl::get_datareaders(
     505             :   DDS::DataReaderSeq &   readers,
     506             :   DDS::SampleStateMask   sample_states,
     507             :   DDS::ViewStateMask     view_states,
     508             :   DDS::InstanceStateMask instance_states)
     509             : {
     510           0 :   DataReaderSet localreaders;
     511             :   {
     512           0 :     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
     513             :                      guard,
     514             :                      this->dr_set_lock_,
     515             :                      DDS::RETCODE_ERROR);
     516           0 :     localreaders = datareader_set_;
     517           0 :   }
     518             : 
     519             : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
     520             :   // If access_scope is GROUP and ordered_access is true then return readers as
     521             :   // list which may contain same readers multiple times. Otherwise return readers
     522             :   // as set.
     523           0 :   if (this->qos_.presentation.access_scope == ::DDS::GROUP_PRESENTATION_QOS) {
     524           0 :     if (this->access_depth_ == 0 && this->qos_.presentation.coherent_access) {
     525           0 :       return ::DDS::RETCODE_PRECONDITION_NOT_MET;
     526             :     }
     527           0 :     if (this->qos_.presentation.ordered_access) {
     528             : 
     529           0 :       GroupRakeData data;
     530           0 :       for (DataReaderSet::const_iterator pos = localreaders.begin();
     531           0 :            pos != localreaders.end(); ++pos) {
     532           0 :         (*pos)->get_ordered_data(data, sample_states, view_states, instance_states);
     533             :       }
     534             : 
     535             :       // Return list of readers in the order of the source timestamp of the received
     536             :       // samples from readers.
     537           0 :       data.get_datareaders(readers);
     538           0 :       return DDS::RETCODE_OK;
     539           0 :     }
     540             :   }
     541             : #endif
     542             : 
     543             :   // Return set of datareaders.
     544           0 :   readers.length(0);
     545           0 :   for (DataReaderSet::const_iterator pos = localreaders.begin();
     546           0 :        pos != localreaders.end(); ++pos) {
     547           0 :     if ((*pos)->have_sample_states(sample_states) &&
     548           0 :         (*pos)->have_view_states(view_states) &&
     549           0 :         (*pos)->have_instance_states(instance_states)) {
     550           0 :       push_back(readers, DDS::DataReader::_duplicate(pos->in()));
     551             :     }
     552             :   }
     553             : 
     554           0 :   return DDS::RETCODE_OK;
     555           0 : }
     556             : 
     557             : DDS::ReturnCode_t
     558           0 : SubscriberImpl::notify_datareaders()
     559             : {
     560           0 :   DataReaderMap localreadermap;
     561             :   {
     562           0 :     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
     563             :                     guard,
     564             :                     this->si_lock_,
     565             :                     DDS::RETCODE_ERROR);
     566           0 :     localreadermap = datareader_map_;
     567           0 :   }
     568           0 :   for (DataReaderMap::iterator it = localreadermap.begin(); it != localreadermap.end(); ++it) {
     569           0 :     if (it->second->have_sample_states(DDS::NOT_READ_SAMPLE_STATE)) {
     570           0 :       DDS::DataReaderListener_var listener = it->second->get_listener();
     571           0 :       if (!it->second->is_bit()) {
     572           0 :         it->second->set_status_changed_flag(DDS::DATA_AVAILABLE_STATUS, false);
     573           0 :         if (listener) {
     574           0 :           listener->on_data_available(it->second.in());
     575             :         }
     576             :       } else {
     577           0 :         TheServiceParticipant->job_queue()->enqueue(make_rch<DataReaderImpl::OnDataAvailable>(listener, it->second, listener, true, false));
     578             :       }
     579           0 :     }
     580             :   }
     581             : 
     582             : #ifndef OPENDDS_NO_MULTI_TOPIC
     583           0 :   MultitopicReaderMap localmtr;
     584             :   {
     585           0 :     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
     586             :                     guard,
     587             :                     this->si_lock_,
     588             :                     DDS::RETCODE_ERROR);
     589           0 :     localmtr = multitopic_reader_map_;
     590           0 :   }
     591             : 
     592           0 :   for (MultitopicReaderMap::iterator it = localmtr.begin();
     593           0 :       it != localmtr.end(); ++it) {
     594             :     MultiTopicDataReaderBase* dri =
     595           0 :       dynamic_cast<MultiTopicDataReaderBase*>(it->second.in());
     596             : 
     597           0 :     if (!dri) {
     598           0 :       if (DCPS_debug_level > 0) {
     599           0 :         ACE_ERROR((LM_ERROR,
     600             :           ACE_TEXT("(%P|%t) ERROR: SubscriberImpl::notify_datareaders: ")
     601             :           ACE_TEXT("failed to obtain MultiTopicDataReaderBase.\n")));
     602             :       }
     603           0 :       return ::DDS::RETCODE_ERROR;
     604             :     }
     605             : 
     606           0 :     if (dri->have_sample_states(DDS::NOT_READ_SAMPLE_STATE)) {
     607           0 :       DDS::DataReaderListener_var listener = dri->get_listener();
     608           0 :       dri->set_status_changed_flag(DDS::DATA_AVAILABLE_STATUS, false);
     609           0 :       if (!CORBA::is_nil(listener)) {
     610           0 :         listener->on_data_available(dri);
     611             :       }
     612           0 :     }
     613             :   }
     614             : #endif
     615             : 
     616           0 :   return DDS::RETCODE_OK;
     617           0 : }
     618             : 
     619             : DDS::ReturnCode_t
     620           0 : SubscriberImpl::set_qos(
     621             :   const DDS::SubscriberQos & qos)
     622             : {
     623             : 
     624             :   OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
     625             : 
     626           0 :   if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
     627           0 :     if (qos_ == qos)
     628           0 :       return DDS::RETCODE_OK;
     629             : 
     630             :     // for the not changeable qos, it can be changed before enable
     631           0 :     if (!Qos_Helper::changeable(qos_, qos) && enabled_) {
     632           0 :       return DDS::RETCODE_IMMUTABLE_POLICY;
     633             : 
     634             :     } else {
     635           0 :       qos_ = qos;
     636             : 
     637           0 :       DrIdToQosMap idToQosMap;
     638             :       {
     639           0 :         ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
     640             :                          guard,
     641             :                          this->si_lock_,
     642             :                          DDS::RETCODE_ERROR);
     643             :         // after FaceCTS bug 619 is fixed, make endIter and iter const iteratorsx
     644           0 :         DataReaderMap::iterator endIter = datareader_map_.end();
     645             : 
     646           0 :         for (DataReaderMap::iterator iter = datareader_map_.begin();
     647           0 :              iter != endIter; ++iter) {
     648           0 :           DataReaderImpl_rch reader = iter->second;
     649           0 :           reader->set_subscriber_qos (qos);
     650           0 :           DDS::DataReaderQos qos = reader->qos_;
     651           0 :           GUID_t id = reader->get_guid();
     652             :           std::pair<DrIdToQosMap::iterator, bool> pair
     653           0 :             = idToQosMap.insert(DrIdToQosMap::value_type(id, qos));
     654             : 
     655           0 :           if (!pair.second) {
     656           0 :             if (DCPS_debug_level > 0) {
     657           0 :               ACE_ERROR((LM_ERROR,
     658             :                         ACE_TEXT("(%P|%t) ERROR: SubscriberImpl::set_qos: ")
     659             :                         ACE_TEXT("insert %C to DrIdToQosMap failed.\n"),
     660             :                         LogGuid(id).c_str()));
     661             :             }
     662           0 :             return ::DDS::RETCODE_ERROR;
     663             :           }
     664           0 :         }
     665           0 :       }
     666             : 
     667           0 :       DrIdToQosMap::iterator iter = idToQosMap.begin();
     668             : 
     669           0 :       while (iter != idToQosMap.end()) {
     670           0 :         Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
     671             :         const bool status
     672           0 :           = disco->update_subscription_qos(this->domain_id_,
     673           0 :                                            this->dp_id_,
     674           0 :                                            iter->first,
     675           0 :                                            iter->second,
     676           0 :                                            this->qos_);
     677             : 
     678           0 :         if (!status) {
     679           0 :           if (DCPS_debug_level > 0) {
     680           0 :             ACE_ERROR((LM_ERROR,
     681             :                       ACE_TEXT("(%P|%t) SubscriberImpl::set_qos, ")
     682             :                       ACE_TEXT("failed.\n")));
     683             :           }
     684           0 :           return DDS::RETCODE_ERROR;
     685             :         }
     686             : 
     687           0 :         ++iter;
     688           0 :       }
     689           0 :     }
     690             : 
     691           0 :     return DDS::RETCODE_OK;
     692             : 
     693             :   } else {
     694           0 :     return DDS::RETCODE_INCONSISTENT_POLICY;
     695             :   }
     696             : }
     697             : 
     698             : DDS::ReturnCode_t
     699           0 : SubscriberImpl::get_qos(
     700             :   DDS::SubscriberQos & qos)
     701             : {
     702           0 :   qos = qos_;
     703           0 :   return DDS::RETCODE_OK;
     704             : }
     705             : 
     706             : DDS::ReturnCode_t
     707           0 : SubscriberImpl::set_listener(
     708             :   DDS::SubscriberListener_ptr a_listener,
     709             :   DDS::StatusMask             mask)
     710             : {
     711           0 :   ACE_Guard<ACE_Thread_Mutex> g(listener_mutex_);
     712           0 :   listener_mask_ = mask;
     713             :   //note: OK to duplicate  a nil object ref
     714           0 :   listener_ = DDS::SubscriberListener::_duplicate(a_listener);
     715           0 :   return DDS::RETCODE_OK;
     716           0 : }
     717             : 
     718             : DDS::SubscriberListener_ptr
     719           0 : SubscriberImpl::get_listener()
     720             : {
     721           0 :   ACE_Guard<ACE_Thread_Mutex> g(listener_mutex_);
     722           0 :   return DDS::SubscriberListener::_duplicate(listener_.in());
     723           0 : }
     724             : 
     725             : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
     726             : 
     727             : DDS::ReturnCode_t
     728           0 : SubscriberImpl::begin_access()
     729             : {
     730           0 :   DataReaderSet to_call;
     731             :   {
     732           0 :     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
     733             :                      si_guard,
     734             :                      si_lock_,
     735             :                      DDS::RETCODE_ERROR);
     736           0 :     if (!enabled_) {
     737           0 :       if (DCPS_debug_level > 0) {
     738           0 :         ACE_ERROR((LM_ERROR,
     739             :                    ACE_TEXT("(%P|%t) ERROR: SubscriberImpl::begin_access:")
     740             :                    ACE_TEXT(" Subscriber is not enabled!\n")));
     741             :       }
     742           0 :       return DDS::RETCODE_NOT_ENABLED;
     743             :     }
     744             : 
     745           0 :     if (qos_.presentation.access_scope != DDS::GROUP_PRESENTATION_QOS) {
     746           0 :       return DDS::RETCODE_OK;
     747             :     }
     748             : 
     749           0 :     ++access_depth_;
     750             :     // We should only notify subscription on the first
     751             :     // and last change to the current change set:
     752           0 :     if (access_depth_ == 1) {
     753           0 :       ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
     754             :                        dr_set_guard,
     755             :                        dr_set_lock_,
     756             :                        DDS::RETCODE_ERROR);
     757           0 :       to_call = datareader_set_;
     758           0 :     }
     759           0 :   }
     760             : 
     761           0 :   for (DataReaderSet::iterator it = to_call.begin(); it != to_call.end(); ++it) {
     762           0 :     (*it)->begin_access();
     763             :   }
     764           0 :   return DDS::RETCODE_OK;
     765           0 : }
     766             : 
     767             : DDS::ReturnCode_t
     768           0 : SubscriberImpl::end_access()
     769             : {
     770           0 :   DataReaderSet to_call;
     771             :   {
     772           0 :     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
     773             :                      si_guard,
     774             :                      si_lock_,
     775             :                      DDS::RETCODE_ERROR);
     776           0 :     if (!enabled_) {
     777           0 :       if (DCPS_debug_level > 0) {
     778           0 :         ACE_ERROR((LM_ERROR,
     779             :                    ACE_TEXT("(%P|%t) ERROR: SubscriberImpl::end_access:")
     780             :                    ACE_TEXT(" Publisher is not enabled!\n")));
     781             :       }
     782           0 :       return DDS::RETCODE_NOT_ENABLED;
     783             :     }
     784             : 
     785           0 :     if (qos_.presentation.access_scope != DDS::GROUP_PRESENTATION_QOS) {
     786           0 :       return DDS::RETCODE_OK;
     787             :     }
     788             : 
     789           0 :     if (access_depth_ == 0) {
     790           0 :       if (DCPS_debug_level > 0) {
     791           0 :         ACE_ERROR((LM_ERROR,
     792             :                    ACE_TEXT("(%P|%t) ERROR: SubscriberImpl::end_access:")
     793             :                    ACE_TEXT(" No matching call to begin_coherent_changes!\n")));
     794             :       }
     795           0 :       return DDS::RETCODE_PRECONDITION_NOT_MET;
     796             :     }
     797             : 
     798           0 :     --access_depth_;
     799             :     // We should only notify subscription on the first
     800             :     // and last change to the current change set:
     801           0 :     if (access_depth_ == 0) {
     802           0 :       ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
     803             :                        dr_set_guard,
     804             :                        dr_set_lock_,
     805             :                        DDS::RETCODE_ERROR);
     806           0 :       to_call = datareader_set_;
     807           0 :     }
     808           0 :   }
     809             : 
     810           0 :   for (DataReaderSet::iterator it = to_call.begin(); it != to_call.end(); ++it) {
     811           0 :     (*it)->end_access();
     812             :   }
     813           0 :   return DDS::RETCODE_OK;
     814           0 : }
     815             : 
     816             : #endif // OPENDDS_NO_OBJECT_MODEL_PROFILE
     817             : 
     818             : DDS::DomainParticipant_ptr
     819           0 : SubscriberImpl::get_participant()
     820             : {
     821           0 :   return participant_.lock()._retn();
     822             : }
     823             : 
     824             : DDS::ReturnCode_t
     825           0 : SubscriberImpl::set_default_datareader_qos(
     826             :   const DDS::DataReaderQos & qos)
     827             : {
     828           0 :   if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
     829           0 :     default_datareader_qos_ = qos;
     830           0 :     return DDS::RETCODE_OK;
     831             : 
     832             :   } else {
     833           0 :     return DDS::RETCODE_INCONSISTENT_POLICY;
     834             :   }
     835             : }
     836             : 
     837             : DDS::ReturnCode_t
     838           0 : SubscriberImpl::get_default_datareader_qos(
     839             :   DDS::DataReaderQos & qos)
     840             : {
     841           0 :   qos = default_datareader_qos_;
     842           0 :   return DDS::RETCODE_OK;
     843             : }
     844             : 
     845             : DDS::ReturnCode_t
     846           0 : SubscriberImpl::copy_from_topic_qos(
     847             :   DDS::DataReaderQos &  a_datareader_qos,
     848             :   const DDS::TopicQos & a_topic_qos)
     849             : {
     850           0 :   if (Qos_Helper::copy_from_topic_qos(a_datareader_qos, a_topic_qos) ) {
     851           0 :     return DDS::RETCODE_OK;
     852             : 
     853             :   } else {
     854           0 :     return DDS::RETCODE_INCONSISTENT_POLICY;
     855             :   }
     856             : }
     857             : 
     858             : DDS::ReturnCode_t
     859           0 : SubscriberImpl::enable()
     860             : {
     861             :   //According spec:
     862             :   // - Calling enable on an already enabled Entity returns OK and has no
     863             :   // effect.
     864             :   // - Calling enable on an Entity whose factory is not enabled will fail
     865             :   // and return PRECONDITION_NOT_MET.
     866             : 
     867           0 :   if (this->is_enabled()) {
     868           0 :     return DDS::RETCODE_OK;
     869             :   }
     870             : 
     871           0 :   RcHandle<DomainParticipantImpl> participant = this->participant_.lock();
     872           0 :   if (!participant || !participant->is_enabled()) {
     873           0 :     return DDS::RETCODE_PRECONDITION_NOT_MET;
     874             :   }
     875             : 
     876           0 :   dp_id_ = participant->get_id();
     877             : 
     878           0 :   if (this->monitor_) {
     879           0 :     this->monitor_->report();
     880             :   }
     881             : 
     882           0 :   this->set_enabled();
     883             : 
     884           0 :   if (qos_.entity_factory.autoenable_created_entities) {
     885           0 :     DataReaderSet readers;
     886             :     {
     887           0 :       ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, si_lock_, DDS::RETCODE_ERROR);
     888           0 :       readers_not_enabled_.swap(readers);
     889           0 :     }
     890           0 :     for (DataReaderSet::iterator it = readers.begin(); it != readers.end(); ++it) {
     891           0 :       (*it)->enable();
     892             :     }
     893           0 :   }
     894             : 
     895           0 :   return DDS::RETCODE_OK;
     896           0 : }
     897             : 
     898           0 : bool SubscriberImpl::is_clean(String* leftover_entities) const
     899             : {
     900           0 :   if (leftover_entities) {
     901           0 :     leftover_entities->clear();
     902             :   }
     903             : 
     904           0 :   size_t reader_count = datareader_map_.size();
     905           0 :   if (reader_count && !TheTransientKludge->is_enabled()) {
     906             :     // BIT datareaders.
     907           0 :     reader_count = reader_count == NUMBER_OF_BUILT_IN_TOPICS ? 0 : reader_count;
     908             :   }
     909           0 :   if (leftover_entities && reader_count) {
     910           0 :     *leftover_entities += to_dds_string(reader_count) + " reader(s)";
     911             :   }
     912             : 
     913           0 :   return reader_count == 0;
     914             : }
     915             : 
     916             : void
     917           0 : SubscriberImpl::data_received(DataReaderImpl* reader)
     918             : {
     919           0 :   ACE_GUARD(ACE_Recursive_Thread_Mutex,
     920             :             guard,
     921             :             this->dr_set_lock_);
     922           0 :   datareader_set_.insert(rchandle_from(reader));
     923           0 : }
     924             : 
     925             : DDS::ReturnCode_t
     926           0 : SubscriberImpl::reader_enabled(const char*     topic_name,
     927             :                                DataReaderImpl* reader_ptr)
     928             : {
     929           0 :   if (DCPS_debug_level >= 4) {
     930           0 :     ACE_DEBUG((LM_DEBUG,
     931             :                ACE_TEXT("(%P|%t) SubscriberImpl::reader_enabled, ")
     932             :                ACE_TEXT("datareader(topic_name=%C) enabled\n"),
     933             :                topic_name));
     934             :   }
     935             : 
     936           0 :   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, si_lock_, DDS::RETCODE_ERROR);
     937           0 :   DataReaderImpl_rch reader = rchandle_from(reader_ptr);
     938           0 :   readers_not_enabled_.erase(reader);
     939             : 
     940           0 :   this->datareader_map_.insert(DataReaderMap::value_type(topic_name, reader));
     941             : 
     942           0 :   if (this->monitor_) {
     943           0 :     this->monitor_->report();
     944             :   }
     945             : 
     946           0 :   return DDS::RETCODE_OK;
     947           0 : }
     948             : 
     949             : #ifndef OPENDDS_NO_MULTI_TOPIC
     950             : DDS::ReturnCode_t
     951           0 : SubscriberImpl::multitopic_reader_enabled(DDS::DataReader_ptr reader)
     952             : {
     953           0 :   DDS::TopicDescription_var td = reader->get_topicdescription();
     954           0 :   CORBA::String_var topic = td->get_name();
     955           0 :   multitopic_reader_map_[topic.in()] = DDS::DataReader::_duplicate(reader);
     956           0 :   return DDS::RETCODE_OK;
     957           0 : }
     958             : 
     959             : void
     960           0 : SubscriberImpl::remove_from_datareader_set(DataReaderImpl* reader)
     961             : {
     962           0 :   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, dr_set_lock_);
     963           0 :   datareader_set_.erase(rchandle_from(reader));
     964           0 : }
     965             : #endif
     966             : 
     967             : DDS::SubscriberListener_ptr
     968           0 : SubscriberImpl::listener_for(::DDS::StatusKind kind)
     969             : {
     970             :   // per 2.1.4.3.1 Listener Access to Plain Communication Status
     971             :   // use this entities factory if listener is mask not enabled
     972             :   // for this kind.
     973           0 :   RcHandle<DomainParticipantImpl> participant = this->participant_.lock();
     974           0 :   if (! participant)
     975           0 :     return 0;
     976             : 
     977           0 :   ACE_Guard<ACE_Thread_Mutex> g(listener_mutex_);
     978           0 :   if (CORBA::is_nil(listener_.in()) || (listener_mask_ & kind) == 0) {
     979           0 :     g.release();
     980           0 :     return participant->listener_for(kind);
     981             : 
     982             :   } else {
     983           0 :     return DDS::SubscriberListener::_duplicate(listener_.in());
     984             :   }
     985           0 : }
     986             : 
     987             : unsigned int&
     988           0 : SubscriberImpl::raw_latency_buffer_size()
     989             : {
     990           0 :   return this->raw_latency_buffer_size_;
     991             : }
     992             : 
     993             : DataCollector<double>::OnFull&
     994           0 : SubscriberImpl::raw_latency_buffer_type()
     995             : {
     996           0 :   return this->raw_latency_buffer_type_;
     997             : }
     998             : 
     999             : void
    1000           0 : SubscriberImpl::get_subscription_ids(SubscriptionIdVec& subs)
    1001             : {
    1002           0 :   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
    1003             :                    guard,
    1004             :                    this->si_lock_,
    1005             :                    );
    1006             : 
    1007           0 :   subs.reserve(datareader_map_.size());
    1008           0 :   for (DataReaderMap::iterator iter = datareader_map_.begin();
    1009           0 :        iter != datareader_map_.end();
    1010           0 :        ++iter) {
    1011           0 :     subs.push_back(iter->second->get_guid());
    1012             :   }
    1013           0 : }
    1014             : 
    1015             : #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
    1016             : void
    1017           0 : SubscriberImpl::update_ownership_strength (const GUID_t& pub_id,
    1018             :                                            const CORBA::Long&   ownership_strength)
    1019             : {
    1020           0 :   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
    1021             :                    guard,
    1022             :                    this->si_lock_,
    1023             :                    );
    1024             : 
    1025           0 :   for (DataReaderMap::iterator iter = datareader_map_.begin();
    1026           0 :        iter != datareader_map_.end();
    1027           0 :        ++iter) {
    1028           0 :     if (!iter->second->is_bit()) {
    1029           0 :       iter->second->update_ownership_strength(pub_id, ownership_strength);
    1030             :     }
    1031             :   }
    1032           0 : }
    1033             : #endif
    1034             : 
    1035             : 
    1036             : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
    1037             : void
    1038           0 : SubscriberImpl::coherent_change_received (const GUID_t& publisher_id,
    1039             :                                           DataReaderImpl* reader,
    1040             :                                           Coherent_State& group_state)
    1041             : {
    1042           0 :   DataReaderSet localdrs;
    1043             :   {
    1044           0 :     ACE_GUARD(ACE_Recursive_Thread_Mutex,
    1045             :               guard,
    1046             :               this->dr_set_lock_);
    1047           0 :      localdrs = datareader_set_;
    1048           0 :   }
    1049             :   // Verify if all readers complete the coherent changes. The result
    1050             :   // is either COMPLETED or REJECTED.
    1051           0 :   group_state = COMPLETED;
    1052           0 :   for (DataReaderSet::const_iterator iter = localdrs.begin();
    1053           0 :        iter != localdrs.end(); ++iter) {
    1054             : 
    1055           0 :     Coherent_State state = COMPLETED;
    1056           0 :     (*iter)->coherent_change_received (publisher_id, state);
    1057           0 :     if (state == NOT_COMPLETED_YET) {
    1058           0 :       group_state = NOT_COMPLETED_YET;
    1059           0 :       return;
    1060             :     }
    1061           0 :     else if (state == REJECTED) {
    1062           0 :       group_state = REJECTED;
    1063             :     }
    1064             :   }
    1065             : 
    1066           0 :   GUID_t writerId = GUID_UNKNOWN;
    1067           0 :   for (DataReaderSet::const_iterator iter = localdrs.begin();
    1068           0 :        iter != localdrs.end(); ++iter) {
    1069           0 :     if (group_state == COMPLETED) {
    1070           0 :       (*iter)->accept_coherent (writerId, publisher_id);
    1071             :     }
    1072             :     else {   //REJECTED
    1073           0 :       (*iter)->reject_coherent (writerId, publisher_id);
    1074             :     }
    1075             :   }
    1076             : 
    1077           0 :   if (group_state == COMPLETED) {
    1078           0 :     for (DataReaderSet::const_iterator iter = localdrs.begin();
    1079           0 :          iter != localdrs.end(); ++iter) {
    1080           0 :       (*iter)->coherent_changes_completed (reader);
    1081           0 :       (*iter)->reset_coherent_info (writerId, publisher_id);
    1082             :     }
    1083             :   }
    1084           0 : }
    1085             : #endif
    1086             : 
    1087             : RcHandle<EntityImpl>
    1088           0 : SubscriberImpl::parent() const
    1089             : {
    1090           0 :   return this->participant_.lock();
    1091             : }
    1092             : 
    1093             : bool
    1094           0 : SubscriberImpl::validate_datareader_qos(const DDS::DataReaderQos & qos,
    1095             :                                         const DDS::DataReaderQos & default_qos,
    1096             :                                         DDS::Topic_ptr             a_topic,
    1097             :                                         DDS::DataReaderQos &       dr_qos,
    1098             :                                         bool                       mt)
    1099             : {
    1100             : 
    1101             : 
    1102           0 :   if (qos == DATAREADER_QOS_DEFAULT) {
    1103           0 :     dr_qos = default_qos;
    1104             : 
    1105           0 :   } else if (qos == DATAREADER_QOS_USE_TOPIC_QOS) {
    1106             : 
    1107             : #ifndef OPENDDS_NO_MULTI_TOPIC
    1108           0 :     if (mt) {
    1109           0 :       if (DCPS_debug_level > 0) {
    1110           0 :         ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
    1111             :                    ACE_TEXT("SubscriberImpl::create_datareader, ")
    1112             :                    ACE_TEXT("DATAREADER_QOS_USE_TOPIC_QOS can not be used ")
    1113             :                    ACE_TEXT("to create a MultiTopic DataReader.\n")));
    1114             :       }
    1115           0 :       return false;
    1116             :     }
    1117             : #else
    1118             :     ACE_UNUSED_ARG(mt);
    1119             : #endif
    1120           0 :     DDS::TopicQos topic_qos;
    1121           0 :     a_topic->get_qos(topic_qos);
    1122             : 
    1123           0 :     dr_qos = default_qos;
    1124             : 
    1125           0 :     Qos_Helper::copy_from_topic_qos(dr_qos,
    1126             :                                     topic_qos);
    1127             : 
    1128           0 :   } else {
    1129           0 :     dr_qos = qos;
    1130             :   }
    1131             : 
    1132             :   OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(dr_qos, false);
    1133             :   OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(dr_qos, false);
    1134             :   OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(dr_qos, false);
    1135             : 
    1136           0 :   if (!Qos_Helper::valid(dr_qos)) {
    1137           0 :     if (DCPS_debug_level > 0) {
    1138           0 :       ACE_ERROR((LM_ERROR,
    1139             :                 ACE_TEXT("(%P|%t) ERROR: ")
    1140             :                 ACE_TEXT("SubscriberImpl::create_datareader, ")
    1141             :                 ACE_TEXT("invalid qos.\n")));
    1142             :     }
    1143           0 :     return false;
    1144             :   }
    1145             : 
    1146           0 :   if (!Qos_Helper::consistent(dr_qos)) {
    1147           0 :     if (DCPS_debug_level > 0) {
    1148           0 :       ACE_ERROR((LM_ERROR,
    1149             :                 ACE_TEXT("(%P|%t) ERROR: ")
    1150             :                 ACE_TEXT("SubscriberImpl::create_datareader, ")
    1151             :                 ACE_TEXT("inconsistent qos.\n")));
    1152             :     }
    1153           0 :     return false;
    1154             :   }
    1155             : 
    1156           0 :   return true;
    1157             : }
    1158             : 
    1159             : 
    1160             : } // namespace DCPS
    1161             : } // namespace OpenDDS
    1162             : 
    1163             : OPENDDS_END_VERSIONED_NAMESPACE_DECL

Generated by: LCOV version 1.16