LCOV - code coverage report
Current view: top level - DCPS - DomainParticipantImpl.cpp (source / functions) Hit Total Coverage
Test: coverage.info Lines: 8 1211 0.7 %
Date: 2023-04-30 01:32:43 Functions: 2 95 2.1 %

          Line data    Source code
       1             : /*
       2             :  * Distributed under the OpenDDS License.
       3             :  * See: http://www.opendds.org/license.html
       4             :  */
       5             : 
       6             : #include <DCPS/DdsDcps_pch.h> // Only the _pch include should start with DCPS/
       7             : 
       8             : #include "DomainParticipantImpl.h"
       9             : 
      10             : #include "FeatureDisabledQosCheck.h"
      11             : #include "Service_Participant.h"
      12             : #include "Qos_Helper.h"
      13             : #include "GuidConverter.h"
      14             : #include "PublisherImpl.h"
      15             : #include "SubscriberImpl.h"
      16             : #include "DataWriterImpl.h"
      17             : #include "Marked_Default_Qos.h"
      18             : #include "Registered_Data_Types.h"
      19             : #include "Transient_Kludge.h"
      20             : #include "DomainParticipantFactoryImpl.h"
      21             : #include "Util.h"
      22             : #include "DCPS_Utils.h"
      23             : #include "MonitorFactory.h"
      24             : #include "ContentFilteredTopicImpl.h"
      25             : #include "MultiTopicImpl.h"
      26             : #include "Service_Participant.h"
      27             : #include "RecorderImpl.h"
      28             : #include "ReplayerImpl.h"
      29             : #include "BuiltInTopicUtils.h"
      30             : #include "transport/framework/TransportRegistry.h"
      31             : #include "transport/framework/TransportExceptions.h"
      32             : #ifdef OPENDDS_SECURITY
      33             : #  include "security/framework/SecurityRegistry.h"
      34             : #  include "security/framework/SecurityConfig.h"
      35             : #  include "security/framework/Properties.h"
      36             : #endif
      37             : #include "XTypes/Utils.h"
      38             : 
      39             : #include <dds/DdsDcpsGuidC.h>
      40             : #ifndef DDS_HAS_MINIMUM_BIT
      41             : #  include <dds/DdsDcpsCoreTypeSupportImpl.h>
      42             : #endif
      43             : 
      44             : #include <ace/Reactor.h>
      45             : #include <ace/OS_NS_unistd.h>
      46             : 
      47             : namespace Util {
      48             : 
      49             :   template <typename Key>
      50           0 :   int find(
      51             :     OpenDDS::DCPS::DomainParticipantImpl::TopicMap& c,
      52             :     const Key& key,
      53             :     OpenDDS::DCPS::DomainParticipantImpl::TopicMap::mapped_type*& value)
      54             :   {
      55             :     OpenDDS::DCPS::DomainParticipantImpl::TopicMap::iterator iter =
      56           0 :       c.find(key);
      57             : 
      58           0 :     if (iter == c.end()) {
      59           0 :       return -1;
      60             :     }
      61             : 
      62           0 :     value = &iter->second;
      63           0 :     return 0;
      64             :   }
      65             : 
      66           0 :   DDS::PropertySeq filter_properties(const DDS::PropertySeq& properties, const std::string& prefix)
      67             :   {
      68           0 :     DDS::PropertySeq result(properties.length());
      69           0 :     result.length(properties.length());
      70           0 :     unsigned int count = 0;
      71           0 :     for (unsigned int i = 0; i < properties.length(); ++i) {
      72           0 :       if (std::string(properties[i].name.in()).find(prefix) == 0) {
      73           0 :         result[count++] = properties[i];
      74             :       }
      75             :     }
      76           0 :     result.length(count);
      77           0 :     return result;
      78           0 :   }
      79             : 
      80             : } // namespace Util
      81             : 
      82             : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
      83             : 
      84             : namespace OpenDDS {
      85             : namespace DCPS {
      86             : 
      87             : //TBD - add check for enabled in most methods.
      88             : //      Currently this is not needed because auto_enable_created_entities
      89             : //      cannot be false.
      90             : 
      91             : // Implementation skeleton constructor
      92           0 : DomainParticipantImpl::DomainParticipantImpl(
      93             :   InstanceHandleGenerator& handle_generator,
      94             :   const DDS::DomainId_t& domain_id,
      95             :   const DDS::DomainParticipantQos& qos,
      96             :   DDS::DomainParticipantListener_ptr a_listener,
      97           0 :   const DDS::StatusMask& mask)
      98           0 :   : default_topic_qos_(TheServiceParticipant->initial_TopicQos())
      99           0 :   , default_publisher_qos_(TheServiceParticipant->initial_PublisherQos())
     100           0 :   , default_subscriber_qos_(TheServiceParticipant->initial_SubscriberQos())
     101           0 :   , qos_(qos)
     102             : #ifdef OPENDDS_SECURITY
     103           0 :   , id_handle_(DDS::HANDLE_NIL)
     104           0 :   , perm_handle_(DDS::HANDLE_NIL)
     105           0 :   , part_crypto_handle_(DDS::HANDLE_NIL)
     106             : #endif
     107           0 :   , domain_id_(domain_id)
     108           0 :   , dp_id_(GUID_UNKNOWN)
     109           0 :   , federated_(false)
     110           0 :   , handle_waiters_(handle_protector_)
     111           0 :   , shutdown_condition_(shutdown_mutex_)
     112           0 :   , shutdown_complete_(false)
     113           0 :   , participant_handles_(handle_generator)
     114           0 :   , pub_id_gen_(dp_id_)
     115           0 :   , automatic_liveliness_timer_(make_rch<AutomaticLivelinessTimer>(ref(*this)))
     116           0 :   , automatic_liveliness_task_(make_rch<AutomaticLivelinessTask>(
     117             :     TheServiceParticipant->time_source(),
     118           0 :     TheServiceParticipant->interceptor(),
     119           0 :     automatic_liveliness_timer_,
     120           0 :     &LivelinessTimer::execute))
     121           0 :   , participant_liveliness_timer_(make_rch<ParticipantLivelinessTimer>(ref(*this)))
     122           0 :   , participant_liveliness_task_(make_rch<ParticipantLivelinessTask>(
     123             :     TheServiceParticipant->time_source(),
     124           0 :     TheServiceParticipant->interceptor(),
     125           0 :     participant_liveliness_timer_,
     126           0 :     &LivelinessTimer::execute))
     127             : {
     128           0 :   (void) this->set_listener(a_listener, mask);
     129           0 :   monitor_.reset(TheServiceParticipant->monitor_factory_->create_dp_monitor(this));
     130           0 :   type_lookup_service_ = make_rch<XTypes::TypeLookupService>();
     131           0 : }
     132             : 
     133           0 : DomainParticipantImpl::~DomainParticipantImpl()
     134             : {
     135             : #ifdef OPENDDS_SECURITY
     136           0 :   if (security_config_ && perm_handle_ != DDS::HANDLE_NIL) {
     137           0 :     Security::AccessControl_var access = security_config_->get_access_control();
     138           0 :     DDS::Security::SecurityException se;
     139           0 :     if (!access->return_permissions_handle(perm_handle_, se)) {
     140           0 :       if (DCPS::security_debug.auth_warn) {
     141           0 :         ACE_ERROR((LM_ERROR,
     142             :                    ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::~DomainParticipantImpl: ")
     143             :                    ACE_TEXT("Unable to return permissions handle. SecurityException[%d.%d]: %C\n"),
     144             :                    se.code, se.minor_code, se.message.in()));
     145             :       }
     146             :     }
     147           0 :   }
     148             : #endif
     149             : 
     150           0 : }
     151             : 
     152             : DDS::Publisher_ptr
     153           0 : DomainParticipantImpl::create_publisher(
     154             :   const DDS::PublisherQos & qos,
     155             :   DDS::PublisherListener_ptr a_listener,
     156             :   DDS::StatusMask mask)
     157             : {
     158           0 :   DDS::PublisherQos pub_qos = qos;
     159             : 
     160           0 :   if (! this->validate_publisher_qos(pub_qos))
     161           0 :     return DDS::Publisher::_nil();
     162             : 
     163             :   // Although Publisher entities have GUIDs assigned (see pub_id_gen_),
     164             :   // these are not GUIDs from the RTPS spec and
     165             :   // so the handle doesn't need to correlate to the GUID.
     166           0 :   const DDS::InstanceHandle_t handle = assign_handle();
     167             : 
     168           0 :   PublisherImpl* pub = 0;
     169           0 :   ACE_NEW_RETURN(pub,
     170             :                  PublisherImpl(handle,
     171             :                                pub_id_gen_.next(),
     172             :                                pub_qos,
     173             :                                a_listener,
     174             :                                mask,
     175             :                                this),
     176             :                  DDS::Publisher::_nil());
     177             : 
     178           0 :   if (enabled_ && qos_.entity_factory.autoenable_created_entities) {
     179           0 :     pub->enable();
     180             :   }
     181             : 
     182           0 :   DDS::Publisher_ptr pub_obj(pub);
     183             : 
     184             :   // this object will also act as the guard for leaking Publisher Impl
     185           0 :   Publisher_Pair pair(pub, pub_obj, false);
     186             : 
     187           0 :   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
     188             :                    tao_mon,
     189             :                    this->publishers_protector_,
     190             :                    DDS::Publisher::_nil());
     191             : 
     192           0 :   if (OpenDDS::DCPS::insert(publishers_, pair) == -1) {
     193           0 :     if (DCPS_debug_level > 0) {
     194           0 :       ACE_ERROR((LM_ERROR,
     195             :                  ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::create_publisher, ")
     196             :                  ACE_TEXT("%p\n"),
     197             :                  ACE_TEXT("insert")));
     198             :     }
     199           0 :     return DDS::Publisher::_nil();
     200             :   }
     201             : 
     202           0 :   return DDS::Publisher::_duplicate(pub_obj);
     203           0 : }
     204             : 
     205             : DDS::ReturnCode_t
     206           0 : DomainParticipantImpl::delete_publisher(
     207             :   DDS::Publisher_ptr p)
     208             : {
     209             :   // The servant's ref count should be 2 at this point,
     210             :   // one referenced by poa, one referenced by the publisher
     211             :   // set.
     212           0 :   PublisherImpl* the_servant = dynamic_cast<PublisherImpl*>(p);
     213           0 :   if (!the_servant) {
     214           0 :     if (log_level >= LogLevel::Notice) {
     215           0 :       ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DomainParticipantImpl::delete_publisher: "
     216             :                  "Failed to obtain PublisherImpl\n"));
     217             :     }
     218           0 :     return DDS::RETCODE_ERROR;
     219             :   }
     220             : 
     221           0 :   const Publisher_Pair pub_pair(the_servant, p, true);
     222             : 
     223             :   {
     224           0 :     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, g,
     225             :       publishers_protector_, DDS::RETCODE_ERROR);
     226           0 :     if (publishers_.count(pub_pair) == 0) {
     227           0 :       if (log_level >= LogLevel::Notice) {
     228           0 :         ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DomainParticipantImpl::delete_publisher: "
     229             :                    "This publisher doesn't belong to this participant\n"));
     230             :       }
     231           0 :       return DDS::RETCODE_PRECONDITION_NOT_MET;
     232             :     }
     233           0 :   }
     234             : 
     235           0 :   String leftover_entities;
     236           0 :   if (!the_servant->is_clean(&leftover_entities)) {
     237           0 :     if (log_level >= LogLevel::Notice) {
     238           0 :       ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DomainParticipantImpl::delete_publisher: "
     239             :                  "The publisher is not empty. %C leftover\n",
     240             :                  leftover_entities.c_str()));
     241             :     }
     242           0 :     return DDS::RETCODE_PRECONDITION_NOT_MET;
     243             :   }
     244             : 
     245           0 :   const DDS::ReturnCode_t ret = the_servant->delete_contained_entities();
     246           0 :   if (ret != DDS::RETCODE_OK) {
     247           0 :     if (log_level >= LogLevel::Notice) {
     248           0 :       ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DomainParticipantImpl::delete_publisher: "
     249             :                  "Failed to delete contained entities: %C\n", retcode_to_string(ret)));
     250             :     }
     251           0 :     return ret;
     252             :   }
     253             : 
     254             :   {
     255           0 :     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, g,
     256             :       publishers_protector_, DDS::RETCODE_ERROR);
     257           0 :     if (remove(publishers_, pub_pair) == -1) {
     258           0 :       if (log_level >= LogLevel::Notice) {
     259           0 :         ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DomainParticipantImpl::delete_publisher: "
     260             :           "publisher not found\n"));
     261             :       }
     262           0 :       return DDS::RETCODE_ERROR;
     263             :     }
     264           0 :   }
     265             : 
     266           0 :   return DDS::RETCODE_OK;
     267           0 : }
     268             : 
     269             : DDS::Subscriber_ptr
     270           0 : DomainParticipantImpl::create_subscriber(
     271             :   const DDS::SubscriberQos & qos,
     272             :   DDS::SubscriberListener_ptr a_listener,
     273             :   DDS::StatusMask mask)
     274             : {
     275           0 :   DDS::SubscriberQos sub_qos = qos;
     276             : 
     277           0 :   if (! this->validate_subscriber_qos(sub_qos)) {
     278           0 :     return DDS::Subscriber::_nil();
     279             :   }
     280             : 
     281           0 :   const DDS::InstanceHandle_t handle = assign_handle();
     282             : 
     283           0 :   SubscriberImpl* sub = 0;
     284           0 :   ACE_NEW_RETURN(sub,
     285             :                  SubscriberImpl(handle,
     286             :                                 sub_qos,
     287             :                                 a_listener,
     288             :                                 mask,
     289             :                                 this),
     290             :                  DDS::Subscriber::_nil());
     291             : 
     292           0 :   if (enabled_ && qos_.entity_factory.autoenable_created_entities) {
     293           0 :     sub->enable();
     294             :   }
     295             : 
     296           0 :   DDS::Subscriber_ptr sub_obj(sub);
     297             : 
     298           0 :   Subscriber_Pair pair(sub, sub_obj, false);
     299             : 
     300           0 :   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
     301             :                    tao_mon,
     302             :                    this->subscribers_protector_,
     303             :                    DDS::Subscriber::_nil());
     304             : 
     305           0 :   if (OpenDDS::DCPS::insert(subscribers_, pair) == -1) {
     306           0 :     if (DCPS_debug_level > 0) {
     307           0 :       ACE_ERROR((LM_ERROR,
     308             :                  ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::create_subscriber, ")
     309             :                  ACE_TEXT("%p\n"),
     310             :                  ACE_TEXT("insert")));
     311             :     }
     312           0 :     return DDS::Subscriber::_nil();
     313             :   }
     314             : 
     315           0 :   return DDS::Subscriber::_duplicate(sub_obj);
     316           0 : }
     317             : 
     318             : DDS::ReturnCode_t
     319           0 : DomainParticipantImpl::delete_subscriber(
     320             :   DDS::Subscriber_ptr s)
     321             : {
     322             :   // The servant's ref count should be 2 at this point,
     323             :   // one referenced by poa, one referenced by the subscriber
     324             :   // set.
     325           0 :   SubscriberImpl* const the_servant = dynamic_cast<SubscriberImpl*>(s);
     326           0 :   if (!the_servant) {
     327           0 :     if (log_level >= LogLevel::Notice) {
     328           0 :       ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DomainParticipantImpl::delete_subscriber: "
     329             :                  "Failed to obtain SubscriberImpl\n"));
     330             :     }
     331           0 :     return DDS::RETCODE_ERROR;
     332             :   }
     333             : 
     334           0 :   const Subscriber_Pair sub_pair(the_servant, s, true);
     335             : 
     336             :   {
     337           0 :     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, g,
     338             :       subscribers_protector_, DDS::RETCODE_ERROR);
     339           0 :     if (subscribers_.count(sub_pair) == 0) {
     340           0 :       if (log_level >= LogLevel::Notice) {
     341           0 :         ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DomainParticipantImpl::delete_subscriber: "
     342             :                    "This subscriber doesn't belong to this participant\n"));
     343             :       }
     344           0 :       return DDS::RETCODE_PRECONDITION_NOT_MET;
     345             :     }
     346           0 :   }
     347             : 
     348           0 :   String leftover_entities;
     349           0 :   if (!the_servant->is_clean(&leftover_entities)) {
     350           0 :     if (log_level >= LogLevel::Notice) {
     351           0 :       ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DomainParticipantImpl::delete_subscriber: "
     352             :                  "The subscriber is not empty. %C leftover\n",
     353             :                  leftover_entities.c_str()));
     354             :     }
     355           0 :     return DDS::RETCODE_PRECONDITION_NOT_MET;
     356             :   }
     357             : 
     358           0 :   const DDS::ReturnCode_t ret = the_servant->delete_contained_entities();
     359           0 :   if (ret != DDS::RETCODE_OK) {
     360           0 :     if (log_level >= LogLevel::Notice) {
     361           0 :       ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DomainParticipantImpl::delete_subscriber: "
     362             :                  "Failed to delete contained entities: %C\n", retcode_to_string(ret)));
     363             :     }
     364           0 :     return ret;
     365             :   }
     366             : 
     367             :   {
     368           0 :     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, g,
     369             :       subscribers_protector_, DDS::RETCODE_ERROR);
     370           0 :     if (remove(subscribers_, sub_pair) == -1) {
     371           0 :       if (log_level >= LogLevel::Notice) {
     372           0 :         ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DomainParticipantImpl::delete_subscriber: "
     373             :           "subscriber not found\n"));
     374             :       }
     375           0 :       return DDS::RETCODE_ERROR;
     376             :     }
     377           0 :   }
     378             : 
     379           0 :   return DDS::RETCODE_OK;
     380           0 : }
     381             : 
     382             : DDS::Subscriber_ptr
     383           0 : DomainParticipantImpl::get_builtin_subscriber()
     384             : {
     385           0 :   return bit_subscriber_->get();
     386             : }
     387             : 
     388             : RcHandle<BitSubscriber>
     389           0 : DomainParticipantImpl::get_builtin_subscriber_proxy()
     390             : {
     391           0 :   return bit_subscriber_;
     392             : }
     393             : 
     394             : DDS::Topic_ptr
     395           0 : DomainParticipantImpl::create_topic(
     396             :   const char * topic_name,
     397             :   const char * type_name,
     398             :   const DDS::TopicQos & qos,
     399             :   DDS::TopicListener_ptr a_listener,
     400             :   DDS::StatusMask mask)
     401             : {
     402           0 :   return create_topic_i(topic_name,
     403             :                         type_name,
     404             :                         qos,
     405             :                         a_listener,
     406             :                         mask,
     407           0 :                         0);
     408             : }
     409             : 
     410             : DDS::Topic_ptr
     411           0 : DomainParticipantImpl::create_typeless_topic(
     412             :   const char * topic_name,
     413             :   const char * type_name,
     414             :   bool type_has_keys,
     415             :   const DDS::TopicQos & qos,
     416             :   DDS::TopicListener_ptr a_listener,
     417             :   DDS::StatusMask mask)
     418             : {
     419           0 :   int topic_mask = (type_has_keys ? TOPIC_TYPE_HAS_KEYS : 0 ) | TOPIC_TYPELESS;
     420             : 
     421           0 :   return create_topic_i(topic_name,
     422             :                         type_name,
     423             :                         qos,
     424             :                         a_listener,
     425             :                         mask,
     426           0 :                         topic_mask);
     427             : }
     428             : 
     429             : 
     430             : DDS::Topic_ptr
     431           0 : DomainParticipantImpl::create_topic_i(
     432             :   const char * topic_name,
     433             :   const char * type_name,
     434             :   const DDS::TopicQos & qos,
     435             :   DDS::TopicListener_ptr a_listener,
     436             :   DDS::StatusMask mask,
     437             :   int topic_mask)
     438             : {
     439           0 :   DDS::TopicQos topic_qos;
     440             : 
     441           0 :   if (qos == TOPIC_QOS_DEFAULT) {
     442           0 :     this->get_default_topic_qos(topic_qos);
     443             : 
     444             :   } else {
     445           0 :     topic_qos = qos;
     446             :   }
     447             : 
     448             :   OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(qos, DDS::Topic::_nil());
     449             :   OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(qos, DDS::Topic::_nil());
     450             :   OPENDDS_NO_DURABILITY_SERVICE_COMPATIBILITY_CHECK(qos, DDS::Topic::_nil());
     451             :   OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(qos, DDS::Topic::_nil());
     452             : 
     453           0 :   if (!Qos_Helper::valid(topic_qos)) {
     454           0 :     if (DCPS_debug_level > 0) {
     455           0 :       ACE_ERROR((LM_ERROR,
     456             :                  ACE_TEXT("(%P|%t) ERROR: ")
     457             :                  ACE_TEXT("DomainParticipantImpl::create_topic, ")
     458             :                  ACE_TEXT("invalid qos.\n")));
     459             :     }
     460           0 :     return DDS::Topic::_nil();
     461             :   }
     462             : 
     463           0 :   if (!Qos_Helper::consistent(topic_qos)) {
     464           0 :     if (DCPS_debug_level > 0) {
     465           0 :       ACE_ERROR((LM_ERROR,
     466             :                  ACE_TEXT("(%P|%t) ERROR: ")
     467             :                  ACE_TEXT("DomainParticipantImpl::create_topic, ")
     468             :                  ACE_TEXT("inconsistent qos.\n")));
     469             :     }
     470           0 :     return DDS::Topic::_nil();
     471             :   }
     472             : 
     473             :   // See if there is a Topic with the same name.
     474           0 :   TopicMap::mapped_type* entry = 0;
     475           0 :   bool found = false;
     476             :   {
     477           0 :     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
     478             :                      tao_mon,
     479             :                      this->topics_protector_,
     480             :                      DDS::Topic::_nil());
     481             : 
     482             : #if !defined(OPENDDS_NO_CONTENT_FILTERED_TOPIC) || !defined(OPENDDS_NO_MULTI_TOPIC)
     483           0 :     if (topic_descrs_.count(topic_name)) {
     484           0 :       if (DCPS_debug_level > 3) {
     485           0 :         ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
     486             :                    ACE_TEXT("DomainParticipantImpl::create_topic, ")
     487             :                    ACE_TEXT("can't create a Topic due to name \"%C\" already in use ")
     488             :                    ACE_TEXT("by a TopicDescription.\n"), topic_name));
     489             :       }
     490           0 :       return 0;
     491             :     }
     492             : #endif
     493             : 
     494           0 :     if (Util::find(topics_, topic_name, entry) == 0) {
     495           0 :       found = true;
     496             :     }
     497           0 :   }
     498             : 
     499             :   /*
     500             :    * If there is a topic with the same name, return the topic if it has the
     501             :    * same type name and QoS, else it is an error.
     502             :    */
     503           0 :   if (found) {
     504           0 :     CORBA::String_var found_type = entry->pair_.svt_->get_type_name();
     505           0 :     if (ACE_OS::strcmp(type_name, found_type) == 0) {
     506           0 :       DDS::TopicQos found_qos;
     507           0 :       entry->pair_.svt_->get_qos(found_qos);
     508             : 
     509           0 :       if (topic_qos == found_qos) { // match type name, qos
     510             :         {
     511           0 :           ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
     512             :                            tao_mon,
     513             :                            this->topics_protector_,
     514             :                            DDS::Topic::_nil());
     515           0 :           ++entry->client_refs_;
     516           0 :         }
     517           0 :         return DDS::Topic::_duplicate(entry->pair_.obj_.in());
     518             : 
     519             :       } else { // Same Name and Type, Different QoS
     520           0 :         if (DCPS_debug_level >= 1) {
     521           0 :           ACE_ERROR((LM_ERROR,
     522             :             ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::create_topic: ")
     523             :             ACE_TEXT("topic with name \"%C\" and type %C already exists, ")
     524             :             ACE_TEXT("but the QoS doesn't match.\n"),
     525             :             topic_name, type_name));
     526             :         }
     527             : 
     528           0 :         return DDS::Topic::_nil();
     529             :       }
     530             : 
     531           0 :     } else { // Same Name, Different Type
     532           0 :       if (DCPS_debug_level >= 1) {
     533           0 :         ACE_ERROR((LM_ERROR,
     534             :           ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::create_topic: ")
     535             :           ACE_TEXT("topic with name \"%C\" already exists, but its type, %C ")
     536             :           ACE_TEXT("is not the same as %C.\n"),
     537             :           topic_name, found_type.in(), type_name));
     538             :       }
     539             : 
     540           0 :       return DDS::Topic::_nil();
     541             :     }
     542             : 
     543           0 :   } else {
     544             : 
     545           0 :     OpenDDS::DCPS::TypeSupport_var type_support;
     546             : 
     547           0 :     if (0 == topic_mask) {
     548             :       // creating a topic with compile time type
     549           0 :       type_support = Registered_Data_Types->lookup(this, type_name);
     550           0 :       if (CORBA::is_nil(type_support)) {
     551           0 :         if (DCPS_debug_level >= 1) {
     552           0 :            ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
     553             :                       ACE_TEXT("DomainParticipantImpl::create_topic, ")
     554             :                       ACE_TEXT("can't create a topic=%C type_name=%C ")
     555             :                       ACE_TEXT("is not registered.\n"),
     556             :                       topic_name, type_name));
     557             :         }
     558           0 :         return DDS::Topic::_nil();
     559             :       }
     560             :     }
     561             : 
     562           0 :     DDS::Topic_var new_topic = create_new_topic(topic_name,
     563             :                                                 type_name,
     564             :                                                 topic_qos,
     565             :                                                 a_listener,
     566             :                                                 mask,
     567           0 :                                                 type_support);
     568             : 
     569           0 :     if (!new_topic) {
     570           0 :       if (DCPS_debug_level > 0) {
     571           0 :         ACE_ERROR((LM_WARNING,
     572             :                    ACE_TEXT("(%P|%t) WARNING: ")
     573             :                    ACE_TEXT("DomainParticipantImpl::create_topic, ")
     574             :                    ACE_TEXT("create_new_topic failed.\n")));
     575             :       }
     576           0 :       return DDS::Topic::_nil();
     577             :     }
     578             : 
     579           0 :     if (enabled_ && qos_.entity_factory.autoenable_created_entities) {
     580           0 :       if (new_topic->enable() != DDS::RETCODE_OK) {
     581           0 :         if (DCPS_debug_level > 0) {
     582           0 :           ACE_ERROR((LM_WARNING,
     583             :                      ACE_TEXT("(%P|%t) WARNING: ")
     584             :                      ACE_TEXT("DomainParticipantImpl::create_topic, ")
     585             :                      ACE_TEXT("enable failed.\n")));
     586             :         }
     587           0 :         return DDS::Topic::_nil();
     588             :       }
     589             :     }
     590           0 :     return new_topic._retn();
     591           0 :   }
     592           0 : }
     593             : 
     594             : DDS::ReturnCode_t
     595           0 : DomainParticipantImpl::delete_topic(
     596             :   DDS::Topic_ptr a_topic)
     597             : {
     598           0 :   return delete_topic_i(a_topic, false);
     599             : }
     600             : 
     601           0 : DDS::ReturnCode_t DomainParticipantImpl::delete_topic_i(
     602             :   DDS::Topic_ptr a_topic,
     603             :   bool remove_objref)
     604             : {
     605           0 :   DDS::ReturnCode_t ret = DDS::RETCODE_OK;
     606             : 
     607             :   try {
     608             :     // The servant's ref count should be greater than 2 at this point,
     609             :     // one referenced by poa, one referenced by the topic map and
     610             :     // others referenced by the datareader/datawriter.
     611           0 :     TopicImpl* the_topic_servant = dynamic_cast<TopicImpl*>(a_topic);
     612             : 
     613           0 :     if (!the_topic_servant) {
     614           0 :       if (log_level >= LogLevel::Notice) {
     615           0 :         ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DomainParticipantImpl::delete_topic_i: %p\n"
     616             :                    "failed to obtain TopicImpl."));
     617             :       }
     618           0 :       return DDS::RETCODE_ERROR;
     619             :     }
     620             : 
     621           0 :     DDS::DomainParticipant_var dp = the_topic_servant->get_participant();
     622             : 
     623             :     DomainParticipantImpl* the_dp_servant =
     624           0 :       dynamic_cast<DomainParticipantImpl*>(dp.in());
     625             : 
     626           0 :     if (the_dp_servant != this) {
     627           0 :       if (log_level >= LogLevel::Notice) {
     628           0 :         ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DomainParticipantImpl::delete_topic_i: "
     629             :                    "will return PRECONDITION_NOT_MET because this is not the "
     630             :                    "participant that owns this topic\n"));
     631             :       }
     632           0 :       return DDS::RETCODE_PRECONDITION_NOT_MET;
     633             :     }
     634           0 :     if (!remove_objref && the_topic_servant->has_entity_refs()) {
     635             :       // If entity_refs is true (nonzero), then some reader or writer is using
     636             :       // this topic and the spec requires delete_topic() to fail with the error:
     637           0 :       if (log_level >= LogLevel::Notice) {
     638           0 :         ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DomainParticipantImpl::delete_topic_i: "
     639             :                    "will return PRECONDITION_NOT_MET because there are still "
     640             :                    "outstanding references to this topic\n"));
     641             :       }
     642           0 :       return DDS::RETCODE_PRECONDITION_NOT_MET;
     643             :     }
     644             : 
     645             :     {
     646           0 :       ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
     647             :                        tao_mon,
     648             :                        this->topics_protector_,
     649             :                        DDS::RETCODE_ERROR);
     650             : 
     651           0 :       CORBA::String_var topic_name = the_topic_servant->get_name();
     652           0 :       TopicMap::mapped_type* entry = 0;
     653             : 
     654           0 :       TopicMapIteratorPair iters = topics_.equal_range(topic_name.in());
     655           0 :       TopicMapIterator iter;
     656           0 :       for (iter = iters.first; iter != iters.second; ++iter) {
     657           0 :         if (iter->second.pair_.svt_ == the_topic_servant) {
     658           0 :           entry = &iter->second;
     659           0 :           break;
     660             :         }
     661             :       }
     662           0 :       if (entry == 0) {
     663           0 :         if (log_level >= LogLevel::Notice) {
     664           0 :           ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DomainParticipantImpl::delete_topic_i: not found\n"));
     665             :         }
     666           0 :         return DDS::RETCODE_ERROR;
     667             :       }
     668             : 
     669           0 :       const CORBA::ULong client_refs = --entry->client_refs_;
     670             : 
     671           0 :       if (remove_objref || 0 == client_refs) {
     672           0 :         const GUID_t topicId = the_topic_servant->get_id();
     673           0 :         topics_.erase(iter);
     674             : 
     675           0 :         Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
     676           0 :         TopicStatus status = disco->remove_topic(
     677           0 :           the_dp_servant->get_domain_id(), the_dp_servant->get_id(), topicId);
     678             : 
     679           0 :         if (status != REMOVED) {
     680           0 :           if (log_level >= LogLevel::Notice) {
     681           0 :             ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DomainParticipantImpl::delete_topic_i, "
     682             :                        "remove_topic failed with return value <%C>\n", topicstatus_to_string(status)));
     683             :            }
     684           0 :           return DDS::RETCODE_ERROR;
     685             :         }
     686             : 
     687           0 :         return DDS::RETCODE_OK;
     688             : 
     689           0 :       } else {
     690           0 :         if (DCPS_debug_level > 4) {
     691           0 :           ACE_DEBUG((LM_DEBUG, "(%P|%t) DomainParticipantImpl::delete_topic_i: "
     692             :                      "Didn't remove topic from the map, remove_objref %d client_refs %d\n",
     693             :                      remove_objref, client_refs));
     694             :         }
     695             :       }
     696           0 :     }
     697             : 
     698           0 :   } catch (...) {
     699           0 :     if (log_level >= LogLevel::Notice) {
     700           0 :       ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DomainParticipantImpl::delete_topic_i, "
     701             :                  " Caught Unknown Exception\n"));
     702             :     }
     703           0 :     ret = DDS::RETCODE_ERROR;
     704           0 :   }
     705             : 
     706           0 :   return ret;
     707             : }
     708             : 
     709             : DDS::Topic_ptr
     710           0 : DomainParticipantImpl::find_topic(
     711             :   const char* topic_name,
     712             :   const DDS::Duration_t& timeout)
     713             : {
     714           0 :   const MonotonicTimePoint timeout_at(MonotonicTimePoint::now() + TimeDuration(timeout));
     715             : 
     716           0 :   bool first_time = true;
     717           0 :   while (first_time || MonotonicTimePoint::now() < timeout_at) {
     718           0 :     if (first_time) {
     719           0 :       first_time = false;
     720             :     }
     721             : 
     722             :     GUID_t topic_id;
     723           0 :     CORBA::String_var type_name;
     724           0 :     DDS::TopicQos_var qos;
     725             : 
     726           0 :     Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
     727           0 :     TopicStatus status = disco->find_topic(domain_id_,
     728           0 :                                            get_id(),
     729             :                                            topic_name,
     730           0 :                                            type_name.out(),
     731             :                                            qos.out(),
     732             :                                            topic_id);
     733             : 
     734           0 :     const MonotonicTimePoint now = MonotonicTimePoint::now();
     735           0 :     if (status == FOUND) {
     736             :       OpenDDS::DCPS::TypeSupport_var type_support =
     737           0 :         Registered_Data_Types->lookup(this, type_name.in());
     738           0 :       if (CORBA::is_nil(type_support)) {
     739           0 :         if (DCPS_debug_level) {
     740           0 :             ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
     741             :                        ACE_TEXT("DomainParticipantImpl::find_topic, ")
     742             :                        ACE_TEXT("can't create a Topic: type_name \"%C\" ")
     743             :                        ACE_TEXT("is not registered.\n"), type_name.in()));
     744             :         }
     745             : 
     746           0 :         return DDS::Topic::_nil();
     747             :       }
     748             : 
     749           0 :       DDS::Topic_ptr new_topic = create_new_topic(topic_name,
     750             :                                                   type_name,
     751             :                                                   qos,
     752             :                                                   DDS::TopicListener::_nil(),
     753             :                                                   OpenDDS::DCPS::DEFAULT_STATUS_MASK,
     754             :                                                   type_support);
     755           0 :       return new_topic;
     756             : 
     757           0 :     } else if (status == INTERNAL_ERROR) {
     758           0 :       if (DCPS_debug_level > 0) {
     759           0 :         ACE_ERROR((LM_ERROR,
     760             :                    ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::find_topic - ")
     761             :                    ACE_TEXT("topic not found, discovery returned INTERNAL_ERROR!\n")));
     762             :       }
     763           0 :       return DDS::Topic::_nil();
     764           0 :     } else if (now < timeout_at) {
     765           0 :       const TimeDuration remaining = timeout_at - now;
     766             : 
     767           0 :       if (remaining.value().sec() >= 1) {
     768           0 :         ACE_OS::sleep(1);
     769             : 
     770             :       } else {
     771           0 :         ACE_OS::sleep(remaining.value());
     772             :       }
     773           0 :     }
     774           0 :   }
     775             : 
     776           0 :   if (DCPS_debug_level >= 1) {
     777             :     // timed out
     778           0 :     ACE_DEBUG((LM_DEBUG,
     779             :                ACE_TEXT("(%P|%t) DomainParticipantImpl::find_topic, ")
     780             :                ACE_TEXT("timed out.\n")));
     781             :   }
     782             : 
     783           0 :   return DDS::Topic::_nil();
     784           0 : }
     785             : 
     786             : DDS::TopicDescription_ptr
     787           0 : DomainParticipantImpl::lookup_topicdescription(const char* name)
     788             : {
     789           0 :   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
     790             :                    tao_mon,
     791             :                    this->topics_protector_,
     792             :                    DDS::Topic::_nil());
     793             : 
     794           0 :   TopicMap::mapped_type* entry = 0;
     795             : 
     796           0 :   if (Util::find(topics_, name, entry) == -1) {
     797             : #if !defined(OPENDDS_NO_CONTENT_FILTERED_TOPIC) || !defined(OPENDDS_NO_MULTI_TOPIC)
     798           0 :     TopicDescriptionMap::iterator iter = topic_descrs_.find(name);
     799           0 :     if (iter != topic_descrs_.end()) {
     800           0 :       return DDS::TopicDescription::_duplicate(iter->second);
     801             :     }
     802             : #endif
     803           0 :     return DDS::TopicDescription::_nil();
     804             : 
     805             :   } else {
     806           0 :     return DDS::TopicDescription::_duplicate(entry->pair_.obj_.in());
     807             :   }
     808           0 : }
     809             : 
     810             : #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
     811             : 
     812             : DDS::ContentFilteredTopic_ptr
     813           0 : DomainParticipantImpl::create_contentfilteredtopic(
     814             :   const char* name,
     815             :   DDS::Topic_ptr related_topic,
     816             :   const char* filter_expression,
     817             :   const DDS::StringSeq& expression_parameters)
     818             : {
     819           0 :   if (CORBA::is_nil(related_topic)) {
     820           0 :     if (DCPS_debug_level > 3) {
     821           0 :       ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
     822             :                  ACE_TEXT("DomainParticipantImpl::create_contentfilteredtopic, ")
     823             :                  ACE_TEXT("can't create a content-filtered topic due to null related ")
     824             :                  ACE_TEXT("topic.\n")));
     825             :     }
     826           0 :     return 0;
     827             :   }
     828             : 
     829           0 :   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, topics_protector_, 0);
     830             : 
     831           0 :   if (topics_.count(name)) {
     832           0 :     if (DCPS_debug_level > 3) {
     833           0 :       ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
     834             :                  ACE_TEXT("DomainParticipantImpl::create_contentfilteredtopic, ")
     835             :                  ACE_TEXT("can't create a content-filtered topic due to name \"%C\" ")
     836             :                  ACE_TEXT("already in use by a Topic.\n"), name));
     837             :     }
     838           0 :     return 0;
     839             :   }
     840             : 
     841           0 :   if (topic_descrs_.count(name)) {
     842           0 :     if (DCPS_debug_level > 3) {
     843           0 :       ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
     844             :                  ACE_TEXT("DomainParticipantImpl::create_contentfilteredtopic, ")
     845             :                  ACE_TEXT("can't create a content-filtered topic due to name \"%C\" ")
     846             :                  ACE_TEXT("already in use by a TopicDescription.\n"), name));
     847             :     }
     848           0 :     return 0;
     849             :   }
     850             : 
     851           0 :   DDS::ContentFilteredTopic_var cft;
     852             :   try {
     853             :     // Create the cft in two steps so that we only have one place to
     854             :     // check the expression parameters
     855           0 :     cft = new ContentFilteredTopicImpl(name, related_topic, filter_expression, this);
     856           0 :     if (cft->set_expression_parameters(expression_parameters) != DDS::RETCODE_OK) {
     857           0 :       return 0;
     858             :     }
     859           0 :   } catch (const std::exception& e) {
     860           0 :     if (DCPS_debug_level) {
     861           0 :       ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
     862             :                  ACE_TEXT("DomainParticipantImpl::create_contentfilteredtopic, ")
     863             :                  ACE_TEXT("can't create a content-filtered topic due to runtime error: ")
     864             :                  ACE_TEXT("%C.\n"), e.what()));
     865             :     }
     866           0 :     return 0;
     867           0 :   }
     868           0 :   DDS::TopicDescription_var td = DDS::TopicDescription::_duplicate(cft);
     869           0 :   topic_descrs_[name] = td;
     870           0 :   return cft._retn();
     871           0 : }
     872             : 
     873           0 : DDS::ReturnCode_t DomainParticipantImpl::delete_contentfilteredtopic(
     874             :   DDS::ContentFilteredTopic_ptr a_contentfilteredtopic)
     875             : {
     876           0 :   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, topics_protector_,
     877             :                    DDS::RETCODE_OUT_OF_RESOURCES);
     878             :   DDS::ContentFilteredTopic_var cft =
     879           0 :     DDS::ContentFilteredTopic::_duplicate(a_contentfilteredtopic);
     880           0 :   CORBA::String_var name = cft->get_name();
     881           0 :   TopicDescriptionMap::iterator iter = topic_descrs_.find(name.in());
     882           0 :   if (iter == topic_descrs_.end()) {
     883           0 :     if (DCPS_debug_level > 3) {
     884           0 :       ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
     885             :                  ACE_TEXT("DomainParticipantImpl::delete_contentfilteredtopic, ")
     886             :                  ACE_TEXT("can't delete a content-filtered topic \"%C\" ")
     887             :                  ACE_TEXT("because it is not in the set.\n"), name.in ()));
     888             :     }
     889           0 :     return DDS::RETCODE_PRECONDITION_NOT_MET;
     890             :   }
     891             : 
     892           0 :   TopicDescriptionImpl* tdi = dynamic_cast<TopicDescriptionImpl*>(iter->second.in());
     893             : 
     894           0 :   if (!tdi) {
     895           0 :     if (DCPS_debug_level > 3) {
     896           0 :       ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
     897             :                  ACE_TEXT("DomainParticipantImpl::delete_contentfilteredtopic, ")
     898             :                  ACE_TEXT("can't delete a content-filtered topic \"%C\" ")
     899             :                  ACE_TEXT("failed to obtain TopicDescriptionImpl\n"), name.in()));
     900             :     }
     901           0 :     return DDS::RETCODE_ERROR;
     902             :   }
     903             : 
     904           0 :   if (tdi->has_entity_refs()) {
     905           0 :     if (DCPS_debug_level > 3) {
     906           0 :       ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
     907             :                  ACE_TEXT("DomainParticipantImpl::delete_contentfilteredtopic, ")
     908             :                  ACE_TEXT("can't delete a content-filtered topic \"%C\" ")
     909             :                  ACE_TEXT("because it is used by a datareader\n"), name.in ()));
     910             :     }
     911           0 :     return DDS::RETCODE_PRECONDITION_NOT_MET;
     912             :   }
     913           0 :   topic_descrs_.erase(iter);
     914           0 :   return DDS::RETCODE_OK;
     915           0 : }
     916             : 
     917             : #endif // OPENDDS_NO_CONTENT_FILTERED_TOPIC
     918             : 
     919             : #ifndef OPENDDS_NO_MULTI_TOPIC
     920             : 
     921           0 : DDS::MultiTopic_ptr DomainParticipantImpl::create_multitopic(
     922             :   const char* name, const char* type_name,
     923             :   const char* subscription_expression,
     924             :   const DDS::StringSeq& expression_parameters)
     925             : {
     926           0 :   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, topics_protector_, 0);
     927             : 
     928           0 :   if (topics_.count(name)) {
     929           0 :     if (DCPS_debug_level > 3) {
     930           0 :       ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
     931             :                  ACE_TEXT("DomainParticipantImpl::create_multitopic, ")
     932             :                  ACE_TEXT("can't create a multi topic due to name \"%C\" ")
     933             :                  ACE_TEXT("already in use by a Topic.\n"), name));
     934             :     }
     935           0 :     return 0;
     936             :   }
     937             : 
     938           0 :   if (topic_descrs_.count(name)) {
     939           0 :     if (DCPS_debug_level > 3) {
     940           0 :       ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
     941             :                  ACE_TEXT("DomainParticipantImpl::create_multitopic, ")
     942             :                  ACE_TEXT("can't create a multi topic due to name \"%C\" ")
     943             :                  ACE_TEXT("already in use by a TopicDescription.\n"), name));
     944             :     }
     945           0 :     return 0;
     946             :   }
     947             : 
     948           0 :   DDS::MultiTopic_var mt;
     949             :   try {
     950           0 :     mt = new MultiTopicImpl(name, type_name, subscription_expression,
     951           0 :       expression_parameters, this);
     952           0 :   } catch (const std::exception& e) {
     953           0 :     if (DCPS_debug_level) {
     954           0 :       ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
     955             :                  ACE_TEXT("DomainParticipantImpl::create_multitopic, ")
     956             :                  ACE_TEXT("can't create a multi topic due to runtime error: ")
     957             :                  ACE_TEXT("%C.\n"), e.what()));
     958             :     }
     959           0 :     return 0;
     960           0 :   }
     961           0 :   DDS::TopicDescription_var td = DDS::TopicDescription::_duplicate(mt);
     962           0 :   topic_descrs_[name] = td;
     963           0 :   return mt._retn();
     964           0 : }
     965             : 
     966           0 : DDS::ReturnCode_t DomainParticipantImpl::delete_multitopic(
     967             :   DDS::MultiTopic_ptr a_multitopic)
     968             : {
     969           0 :   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, topics_protector_,
     970             :                    DDS::RETCODE_OUT_OF_RESOURCES);
     971           0 :   DDS::MultiTopic_var mt = DDS::MultiTopic::_duplicate(a_multitopic);
     972           0 :   CORBA::String_var mt_name = mt->get_name();
     973           0 :   TopicDescriptionMap::iterator iter = topic_descrs_.find(mt_name.in());
     974           0 :   if (iter == topic_descrs_.end()) {
     975           0 :     if (DCPS_debug_level > 3) {
     976           0 :       ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
     977             :                  ACE_TEXT("DomainParticipantImpl::delete_multitopic, ")
     978             :                  ACE_TEXT("can't delete a multitopic \"%C\" ")
     979             :                  ACE_TEXT("because it is not in the set.\n"), mt_name.in ()));
     980             :     }
     981           0 :     return DDS::RETCODE_PRECONDITION_NOT_MET;
     982             :   }
     983             : 
     984           0 :   TopicDescriptionImpl* tdi = dynamic_cast<TopicDescriptionImpl*>(iter->second.in());
     985             : 
     986           0 :   if (!tdi) {
     987           0 :     if (DCPS_debug_level > 3) {
     988           0 :       ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
     989             :                  ACE_TEXT("DomainParticipantImpl::delete_multitopic, ")
     990             :                  ACE_TEXT("can't delete a multitopic topic \"%C\" ")
     991             :                  ACE_TEXT("failed to obtain TopicDescriptionImpl.\n"),
     992             :                  mt_name.in()));
     993             :     }
     994           0 :     return DDS::RETCODE_ERROR;
     995             :   }
     996             : 
     997           0 :   if (tdi->has_entity_refs()) {
     998           0 :     if (DCPS_debug_level > 3) {
     999           0 :       ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
    1000             :                  ACE_TEXT("DomainParticipantImpl::delete_multitopic, ")
    1001             :                  ACE_TEXT("can't delete a multitopic topic \"%C\" ")
    1002             :                  ACE_TEXT("because it is used by a datareader.\n"), mt_name.in ()));
    1003             :     }
    1004           0 :     return DDS::RETCODE_PRECONDITION_NOT_MET;
    1005             :   }
    1006           0 :   topic_descrs_.erase(iter);
    1007           0 :   return DDS::RETCODE_OK;
    1008           0 : }
    1009             : 
    1010             : #endif // OPENDDS_NO_MULTI_TOPIC
    1011             : 
    1012             : #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
    1013             : 
    1014             : RcHandle<FilterEvaluator>
    1015           0 : DomainParticipantImpl::get_filter_eval(const char* filter)
    1016             : {
    1017           0 :   ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, filter_cache_lock_,
    1018             :                    RcHandle<FilterEvaluator>());
    1019             : 
    1020           0 :   RcHandle<FilterEvaluator>& result = filter_cache_[filter];
    1021           0 :   if (!result) {
    1022             :     try {
    1023           0 :       result = make_rch<FilterEvaluator>(filter, false);
    1024           0 :     } catch (const std::exception& e) {
    1025           0 :       filter_cache_.erase(filter);
    1026           0 :       if (DCPS_debug_level) {
    1027           0 :         ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
    1028             :                    ACE_TEXT("DomainParticipantImpl::get_filter_eval, ")
    1029             :                    ACE_TEXT("can't create a writer-side content filter due to ")
    1030             :                    ACE_TEXT("runtime error: %C.\n"), e.what()));
    1031             :       }
    1032           0 :     }
    1033             :   }
    1034           0 :   return result;
    1035           0 : }
    1036             : 
    1037             : void
    1038           0 : DomainParticipantImpl::deref_filter_eval(const char* filter)
    1039             : {
    1040           0 :   ACE_GUARD(ACE_Thread_Mutex, guard, filter_cache_lock_);
    1041             :   typedef std::map<OPENDDS_STRING, RcHandle<FilterEvaluator> > Map;
    1042           0 :   Map::iterator iter = filter_cache_.find(filter);
    1043           0 :   if (iter != filter_cache_.end()) {
    1044           0 :     if (iter->second->ref_count() == 1) {
    1045           0 :       filter_cache_.erase(iter);
    1046             :     }
    1047             :   }
    1048           0 : }
    1049             : 
    1050             : #endif
    1051             : 
    1052             : DDS::ReturnCode_t
    1053           0 : DomainParticipantImpl::delete_contained_entities()
    1054             : {
    1055           0 :   if (!get_deleted()) {
    1056             :     // mark that the entity is being deleted
    1057           0 :     set_deleted(true);
    1058             : 
    1059           0 :     if (!prepare_to_delete_datawriters()) {
    1060           0 :       return DDS::RETCODE_ERROR;
    1061             :     }
    1062           0 :     if (!set_wait_pending_deadline(TheServiceParticipant->new_pending_timeout_deadline())) {
    1063           0 :       return DDS::RETCODE_ERROR;
    1064             :     }
    1065             :   }
    1066             : 
    1067             :   // BIT subscriber and data readers will be deleted with the
    1068             :   // rest of the entities, so need to report to discovery that
    1069             :   // BIT is no longer available
    1070           0 :   Discovery_rch disc = TheServiceParticipant->get_discovery(this->domain_id_);
    1071           0 :   if (disc)
    1072           0 :     disc->fini_bit(this);
    1073             : 
    1074           0 :   if (ACE_OS::thr_equal(TheServiceParticipant->reactor_owner(),
    1075             :                         ACE_Thread::self())) {
    1076           0 :     handle_exception(0);
    1077             : 
    1078             :   } else {
    1079           0 :     TheServiceParticipant->reactor()->notify(this);
    1080             : 
    1081           0 :     shutdown_mutex_.acquire();
    1082           0 :     ThreadStatusManager& thread_status_manager = TheServiceParticipant->get_thread_status_manager();
    1083           0 :     while (!shutdown_complete_) {
    1084           0 :       shutdown_condition_.wait(thread_status_manager);
    1085             :     }
    1086           0 :     shutdown_complete_ = false;
    1087           0 :     shutdown_mutex_.release();
    1088             :   }
    1089             : 
    1090           0 :   bit_subscriber_.reset();
    1091             : 
    1092           0 :   Registered_Data_Types->unregister_participant(this);
    1093             : 
    1094             :   // the participant can now start creating new contained entities
    1095           0 :   set_deleted(false);
    1096           0 :   return shutdown_result_;
    1097           0 : }
    1098             : 
    1099             : CORBA::Boolean
    1100           0 : DomainParticipantImpl::contains_entity(DDS::InstanceHandle_t a_handle)
    1101             : {
    1102             :   /// Check top-level containers for Topic, Subscriber,
    1103             :   /// and Publisher instances.
    1104             :   {
    1105           0 :     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
    1106             :                      guard,
    1107             :                      this->topics_protector_,
    1108             :                      false);
    1109             : 
    1110           0 :     for (TopicMap::iterator it(topics_.begin());
    1111           0 :          it != topics_.end(); ++it) {
    1112           0 :       if (a_handle == it->second.pair_.svt_->get_instance_handle())
    1113           0 :         return true;
    1114             :     }
    1115           0 :   }
    1116             : 
    1117             :   {
    1118           0 :     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
    1119             :                      guard,
    1120             :                      this->subscribers_protector_,
    1121             :                      false);
    1122             : 
    1123           0 :     for (SubscriberSet::iterator it(subscribers_.begin());
    1124           0 :          it != subscribers_.end(); ++it) {
    1125           0 :       if (a_handle == it->svt_->get_instance_handle())
    1126           0 :         return true;
    1127             :     }
    1128           0 :   }
    1129             : 
    1130             :   {
    1131           0 :     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
    1132             :                      guard,
    1133             :                      this->publishers_protector_,
    1134             :                      false);
    1135             : 
    1136           0 :     for (PublisherSet::iterator it(publishers_.begin());
    1137           0 :          it != publishers_.end(); ++it) {
    1138           0 :       if (a_handle == it->svt_->get_instance_handle())
    1139           0 :         return true;
    1140             :     }
    1141           0 :   }
    1142             : 
    1143             :   /// Recurse into SubscriberImpl and PublisherImpl for
    1144             :   /// DataReader and DataWriter instances respectively.
    1145           0 :   for (SubscriberSet::iterator it(subscribers_.begin());
    1146           0 :        it != subscribers_.end(); ++it) {
    1147           0 :     if (it->svt_->contains_reader(a_handle))
    1148           0 :       return true;
    1149             :   }
    1150             : 
    1151           0 :   for (PublisherSet::iterator it(publishers_.begin());
    1152           0 :        it != publishers_.end(); ++it) {
    1153           0 :     if (it->svt_->contains_writer(a_handle))
    1154           0 :       return true;
    1155             :   }
    1156             : 
    1157           0 :   return false;
    1158             : }
    1159             : 
    1160             : DDS::ReturnCode_t
    1161           0 : DomainParticipantImpl::set_qos(
    1162             :   const DDS::DomainParticipantQos & qos)
    1163             : {
    1164           0 :   if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
    1165           0 :     if (qos_ == qos)
    1166           0 :       return DDS::RETCODE_OK;
    1167             : 
    1168             :     // for the not changeable qos, it can be changed before enable
    1169           0 :     if (!Qos_Helper::changeable(qos_, qos) && enabled_) {
    1170           0 :       return DDS::RETCODE_IMMUTABLE_POLICY;
    1171             : 
    1172             :     } else {
    1173           0 :       qos_ = qos;
    1174             : 
    1175           0 :       Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
    1176             :       const bool status =
    1177           0 :         disco->update_domain_participant_qos(domain_id_,
    1178           0 :                                              dp_id_,
    1179           0 :                                              qos_);
    1180             : 
    1181           0 :       if (!status) {
    1182           0 :         if (DCPS_debug_level > 0) {
    1183           0 :           ACE_ERROR((LM_ERROR,
    1184             :                      ACE_TEXT("(%P|%t) DomainParticipantImpl::set_qos, ")
    1185             :                      ACE_TEXT("failed on compatibility check.\n")));
    1186             :         }
    1187           0 :         return DDS::RETCODE_ERROR;
    1188             :       }
    1189           0 :     }
    1190             : 
    1191           0 :     return DDS::RETCODE_OK;
    1192             : 
    1193             :   } else {
    1194           0 :     return DDS::RETCODE_INCONSISTENT_POLICY;
    1195             :   }
    1196             : }
    1197             : 
    1198             : DDS::ReturnCode_t
    1199           0 : DomainParticipantImpl::get_qos(
    1200             :   DDS::DomainParticipantQos & qos)
    1201             : {
    1202           0 :   qos = qos_;
    1203           0 :   return DDS::RETCODE_OK;
    1204             : }
    1205             : 
    1206             : DDS::ReturnCode_t
    1207           0 : DomainParticipantImpl::set_listener(
    1208             :   DDS::DomainParticipantListener_ptr a_listener,
    1209             :   DDS::StatusMask mask)
    1210             : {
    1211           0 :   ACE_Guard<ACE_Thread_Mutex> g(listener_mutex_);
    1212           0 :   listener_mask_ = mask;
    1213             :   //note: OK to duplicate  a nil object ref
    1214           0 :   listener_ = DDS::DomainParticipantListener::_duplicate(a_listener);
    1215           0 :   return DDS::RETCODE_OK;
    1216           0 : }
    1217             : 
    1218             : DDS::DomainParticipantListener_ptr
    1219           0 : DomainParticipantImpl::get_listener()
    1220             : {
    1221           0 :   ACE_Guard<ACE_Thread_Mutex> g(listener_mutex_);
    1222           0 :   return DDS::DomainParticipantListener::_duplicate(listener_.in());
    1223           0 : }
    1224             : 
    1225             : DDS::ReturnCode_t
    1226           0 : DomainParticipantImpl::ignore_participant(
    1227             :   DDS::InstanceHandle_t handle)
    1228             : {
    1229             : #ifndef DDS_HAS_MINIMUM_BIT
    1230           0 :   if (!enabled_) {
    1231           0 :     if (DCPS_debug_level > 0) {
    1232           0 :       ACE_ERROR((LM_ERROR,
    1233             :                  ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::ignore_participant, ")
    1234             :                  ACE_TEXT("Entity is not enabled.\n")));
    1235             :     }
    1236           0 :     return DDS::RETCODE_NOT_ENABLED;
    1237             :   }
    1238             : 
    1239           0 :   GUID_t ignoreId = get_repoid(handle);
    1240           0 :   HandleMap::const_iterator location = this->ignored_participants_.find(ignoreId);
    1241             : 
    1242           0 :   if (location == this->ignored_participants_.end()) {
    1243           0 :     this->ignored_participants_[ ignoreId] = handle;
    1244             :   }
    1245             :   else {// ignore same participant again, just return ok.
    1246           0 :     return DDS::RETCODE_OK;
    1247             :   }
    1248             : 
    1249           0 :   if (DCPS_debug_level >= 4) {
    1250           0 :     ACE_DEBUG((LM_DEBUG,
    1251             :                ACE_TEXT("(%P|%t) DomainParticipantImpl::ignore_participant: ")
    1252             :                ACE_TEXT("%C ignoring handle %x.\n"),
    1253             :                LogGuid(dp_id_).c_str(),
    1254             :                handle));
    1255             :   }
    1256             : 
    1257           0 :   Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
    1258           0 :   if (!disco->ignore_domain_participant(domain_id_,
    1259           0 :                                         dp_id_,
    1260             :                                         ignoreId)) {
    1261           0 :     if (DCPS_debug_level > 0) {
    1262           0 :       ACE_ERROR((LM_ERROR,
    1263             :                  ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::ignore_participant, ")
    1264             :                  ACE_TEXT("Could not ignore domain participant.\n")));
    1265             :     }
    1266           0 :     return DDS::RETCODE_ERROR;
    1267             :   }
    1268             : 
    1269             : 
    1270           0 :   if (DCPS_debug_level >= 4) {
    1271           0 :     ACE_DEBUG((LM_DEBUG,
    1272             :                ACE_TEXT("(%P|%t) DomainParticipantImpl::ignore_participant: ")
    1273             :                ACE_TEXT("%C repo call returned.\n"),
    1274             :                LogGuid(dp_id_).c_str()));
    1275             :   }
    1276             : 
    1277           0 :   return DDS::RETCODE_OK;
    1278             : #else
    1279             :   ACE_UNUSED_ARG(handle);
    1280             :   return DDS::RETCODE_UNSUPPORTED;
    1281             : #endif // !defined (DDS_HAS_MINIMUM_BIT)
    1282           0 : }
    1283             : 
    1284             : DDS::ReturnCode_t
    1285           0 : DomainParticipantImpl::ignore_topic(
    1286             :   DDS::InstanceHandle_t handle)
    1287             : {
    1288             : #ifndef DDS_HAS_MINIMUM_BIT
    1289           0 :   if (!enabled_) {
    1290           0 :     if (DCPS_debug_level > 0) {
    1291           0 :       ACE_ERROR((LM_ERROR,
    1292             :                  ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::ignore_topic, ")
    1293             :                  ACE_TEXT(" Entity is not enabled.\n")));
    1294             :     }
    1295           0 :     return DDS::RETCODE_NOT_ENABLED;
    1296             :   }
    1297             : 
    1298           0 :   GUID_t ignoreId = get_repoid(handle);
    1299           0 :   HandleMap::const_iterator location = this->ignored_topics_.find(ignoreId);
    1300             : 
    1301           0 :   if (location == this->ignored_topics_.end()) {
    1302           0 :     this->ignored_topics_[ ignoreId] = handle;
    1303             :   }
    1304             :   else { // ignore same topic again, just return ok.
    1305           0 :     return DDS::RETCODE_OK;
    1306             :   }
    1307             : 
    1308           0 :   if (DCPS_debug_level >= 4) {
    1309           0 :     ACE_DEBUG((LM_DEBUG,
    1310             :                ACE_TEXT("(%P|%t) DomainParticipantImpl::ignore_topic: ")
    1311             :                ACE_TEXT("%C ignoring handle %x.\n"),
    1312             :                LogGuid(dp_id_).c_str(),
    1313             :                handle));
    1314             :   }
    1315             : 
    1316           0 :   Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
    1317           0 :   if (!disco->ignore_topic(domain_id_,
    1318           0 :                            dp_id_,
    1319             :                            ignoreId)) {
    1320           0 :     if (DCPS_debug_level > 0) {
    1321           0 :       ACE_ERROR((LM_ERROR,
    1322             :                  ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::ignore_topic, ")
    1323             :                  ACE_TEXT(" Could not ignore topic.\n")));
    1324             :     }
    1325             :   }
    1326             : 
    1327           0 :   return DDS::RETCODE_OK;
    1328             : #else
    1329             :   ACE_UNUSED_ARG(handle);
    1330             :   return DDS::RETCODE_UNSUPPORTED;
    1331             : #endif // !defined (DDS_HAS_MINIMUM_BIT)
    1332           0 : }
    1333             : 
    1334             : DDS::ReturnCode_t
    1335           0 : DomainParticipantImpl::ignore_publication(
    1336             :   DDS::InstanceHandle_t handle)
    1337             : {
    1338             : #ifndef DDS_HAS_MINIMUM_BIT
    1339           0 :   if (!enabled_) {
    1340           0 :     if (DCPS_debug_level > 0) {
    1341           0 :       ACE_ERROR((LM_ERROR,
    1342             :                  ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::ignore_publication, ")
    1343             :                  ACE_TEXT(" Entity is not enabled.\n")));
    1344             :     }
    1345           0 :     return DDS::RETCODE_NOT_ENABLED;
    1346             :   }
    1347             : 
    1348           0 :   if (DCPS_debug_level >= 4) {
    1349           0 :     ACE_DEBUG((LM_DEBUG,
    1350             :                ACE_TEXT("(%P|%t) DomainParticipantImpl::ignore_publication: ")
    1351             :                ACE_TEXT("%C ignoring handle %x.\n"),
    1352             :                LogGuid(dp_id_).c_str(),
    1353             :                handle));
    1354             :   }
    1355             : 
    1356           0 :   GUID_t ignoreId = get_repoid(handle);
    1357           0 :   Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
    1358           0 :   if (!disco->ignore_publication(domain_id_,
    1359           0 :                                  dp_id_,
    1360             :                                  ignoreId)) {
    1361           0 :     if (DCPS_debug_level > 0) {
    1362           0 :       ACE_ERROR((LM_ERROR,
    1363             :                  ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::ignore_publication, ")
    1364             :                  ACE_TEXT(" could not ignore publication in discovery.\n")));
    1365             :     }
    1366           0 :     return DDS::RETCODE_ERROR;
    1367             :   }
    1368             : 
    1369           0 :   return DDS::RETCODE_OK;
    1370             : #else
    1371             :   ACE_UNUSED_ARG(handle);
    1372             :   return DDS::RETCODE_UNSUPPORTED;
    1373             : #endif // !defined (DDS_HAS_MINIMUM_BIT)
    1374           0 : }
    1375             : 
    1376             : DDS::ReturnCode_t
    1377           0 : DomainParticipantImpl::ignore_subscription(
    1378             :   DDS::InstanceHandle_t handle)
    1379             : {
    1380             : #ifndef DDS_HAS_MINIMUM_BIT
    1381           0 :   if (!enabled_) {
    1382           0 :     if (DCPS_debug_level > 0) {
    1383           0 :       ACE_ERROR((LM_ERROR,
    1384             :                  ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::ignore_subscription, ")
    1385             :                  ACE_TEXT(" Entity is not enabled.\n")));
    1386             :     }
    1387           0 :     return DDS::RETCODE_NOT_ENABLED;
    1388             :   }
    1389             : 
    1390           0 :   if (DCPS_debug_level >= 4) {
    1391           0 :     ACE_DEBUG((LM_DEBUG,
    1392             :                ACE_TEXT("(%P|%t) DomainParticipantImpl::ignore_subscription: ")
    1393             :                ACE_TEXT("%C ignoring handle %d.\n"),
    1394             :                LogGuid(dp_id_).c_str(),
    1395             :                handle));
    1396             :   }
    1397             : 
    1398           0 :   GUID_t ignoreId = get_repoid(handle);
    1399           0 :   Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
    1400           0 :   if (!disco->ignore_subscription(domain_id_,
    1401           0 :                                   dp_id_,
    1402             :                                   ignoreId)) {
    1403           0 :     if (DCPS_debug_level > 0) {
    1404           0 :       ACE_ERROR((LM_ERROR,
    1405             :                  ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::ignore_subscription, ")
    1406             :                  ACE_TEXT(" could not ignore subscription in discovery.\n")));
    1407             :     }
    1408           0 :     return DDS::RETCODE_ERROR;
    1409             :   }
    1410             : 
    1411           0 :   return DDS::RETCODE_OK;
    1412             : #else
    1413             :   ACE_UNUSED_ARG(handle);
    1414             :   return DDS::RETCODE_UNSUPPORTED;
    1415             : #endif // !defined (DDS_HAS_MINIMUM_BIT)
    1416           0 : }
    1417             : 
    1418             : DDS::DomainId_t
    1419           0 : DomainParticipantImpl::get_domain_id()
    1420             : {
    1421           0 :   return domain_id_;
    1422             : }
    1423             : 
    1424             : DDS::ReturnCode_t
    1425           0 : DomainParticipantImpl::assert_liveliness()
    1426             : {
    1427             :   // This operation needs to only be used if the DomainParticipant contains
    1428             :   // DataWriter entities with the LIVELINESS set to MANUAL_BY_PARTICIPANT and
    1429             :   // it only affects the liveliness of those DataWriter entities. Otherwise,
    1430             :   // it has no effect.
    1431             :   // This will do nothing in current implementation since we only
    1432             :   // support the AUTOMATIC liveliness qos for datawriter.
    1433             :   // Add implementation here.
    1434             : 
    1435           0 :   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
    1436             :                    tao_mon,
    1437             :                    this->publishers_protector_,
    1438             :                    DDS::RETCODE_ERROR);
    1439             : 
    1440           0 :   for (PublisherSet::iterator it(publishers_.begin());
    1441           0 :        it != publishers_.end(); ++it) {
    1442           0 :     it->svt_->assert_liveliness_by_participant();
    1443             :   }
    1444             : 
    1445           0 :   last_liveliness_activity_.set_to_now();
    1446             : 
    1447           0 :   return DDS::RETCODE_OK;
    1448           0 : }
    1449             : 
    1450             : DDS::ReturnCode_t
    1451           0 : DomainParticipantImpl::set_default_publisher_qos(
    1452             :   const DDS::PublisherQos & qos)
    1453             : {
    1454           0 :   if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
    1455           0 :     default_publisher_qos_ = qos;
    1456           0 :     return DDS::RETCODE_OK;
    1457             : 
    1458             :   } else {
    1459           0 :     return DDS::RETCODE_INCONSISTENT_POLICY;
    1460             :   }
    1461             : }
    1462             : 
    1463             : DDS::ReturnCode_t
    1464           0 : DomainParticipantImpl::get_default_publisher_qos(
    1465             :   DDS::PublisherQos & qos)
    1466             : {
    1467           0 :   qos = default_publisher_qos_;
    1468           0 :   return DDS::RETCODE_OK;
    1469             : }
    1470             : 
    1471             : DDS::ReturnCode_t
    1472           0 : DomainParticipantImpl::set_default_subscriber_qos(
    1473             :   const DDS::SubscriberQos & qos)
    1474             : {
    1475           0 :   if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
    1476           0 :     default_subscriber_qos_ = qos;
    1477           0 :     return DDS::RETCODE_OK;
    1478             : 
    1479             :   } else {
    1480           0 :     return DDS::RETCODE_INCONSISTENT_POLICY;
    1481             :   }
    1482             : }
    1483             : 
    1484             : DDS::ReturnCode_t
    1485           0 : DomainParticipantImpl::get_default_subscriber_qos(
    1486             :   DDS::SubscriberQos & qos)
    1487             : {
    1488           0 :   qos = default_subscriber_qos_;
    1489           0 :   return DDS::RETCODE_OK;
    1490             : }
    1491             : 
    1492             : DDS::ReturnCode_t
    1493           0 : DomainParticipantImpl::set_default_topic_qos(
    1494             :   const DDS::TopicQos & qos)
    1495             : {
    1496           0 :   if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
    1497           0 :     default_topic_qos_ = qos;
    1498           0 :     return DDS::RETCODE_OK;
    1499             : 
    1500             :   } else {
    1501           0 :     return DDS::RETCODE_INCONSISTENT_POLICY;
    1502             :   }
    1503             : }
    1504             : 
    1505             : DDS::ReturnCode_t
    1506           0 : DomainParticipantImpl::get_default_topic_qos(
    1507             :   DDS::TopicQos & qos)
    1508             : {
    1509           0 :   qos = default_topic_qos_;
    1510           0 :   return DDS::RETCODE_OK;
    1511             : }
    1512             : 
    1513             : DDS::ReturnCode_t
    1514           0 : DomainParticipantImpl::get_current_time(DDS::Time_t& current_time)
    1515             : {
    1516           0 :   current_time = SystemTimePoint::now().to_dds_time();
    1517           0 :   return DDS::RETCODE_OK;
    1518             : }
    1519             : 
    1520             : #if !defined (DDS_HAS_MINIMUM_BIT)
    1521             : 
    1522             : DDS::ReturnCode_t
    1523           0 : DomainParticipantImpl::get_discovered_participants(DDS::InstanceHandleSeq& participant_handles)
    1524             : {
    1525           0 :   ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, handle_protector_, DDS::RETCODE_ERROR);
    1526             : 
    1527           0 :   const CountedHandleMap::const_iterator itEnd = handles_.end();
    1528           0 :   for (CountedHandleMap::const_iterator iter = handles_.begin(); iter != itEnd; ++iter) {
    1529           0 :     GuidConverter converter(iter->first);
    1530             : 
    1531           0 :     if (converter.entityKind() == KIND_PARTICIPANT) {
    1532             :       // skip itself and the ignored participant
    1533           0 :       if (iter->first == dp_id_ || ignored_participants_.count(iter->first)) {
    1534           0 :         continue;
    1535             :       }
    1536             : 
    1537           0 :       push_back(participant_handles, iter->second.first);
    1538             :     }
    1539           0 :   }
    1540             : 
    1541           0 :   return DDS::RETCODE_OK;
    1542           0 : }
    1543             : 
    1544             : DDS::ReturnCode_t
    1545           0 : DomainParticipantImpl::get_discovered_participant_data(DDS::ParticipantBuiltinTopicData& participant_data,
    1546             :                                                        DDS::InstanceHandle_t participant_handle)
    1547             : {
    1548             :   {
    1549           0 :     ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, handle_protector_, DDS::RETCODE_ERROR);
    1550             : 
    1551           0 :     bool found = false;
    1552           0 :     const CountedHandleMap::const_iterator itEnd = handles_.end();
    1553           0 :     for (CountedHandleMap::const_iterator iter = handles_.begin(); iter != itEnd; ++iter) {
    1554           0 :       GuidConverter converter(iter->first);
    1555             : 
    1556           0 :       if (participant_handle == iter->second.first
    1557           0 :           && converter.entityKind() == KIND_PARTICIPANT) {
    1558           0 :         found = true;
    1559           0 :         break;
    1560             :       }
    1561           0 :     }
    1562             : 
    1563           0 :     if (!found)
    1564           0 :       return DDS::RETCODE_PRECONDITION_NOT_MET;
    1565           0 :   }
    1566             : 
    1567           0 :   return bit_subscriber_->get_discovered_participant_data(participant_data, participant_handle);
    1568             : }
    1569             : 
    1570             : DDS::ReturnCode_t
    1571           0 : DomainParticipantImpl::get_discovered_topics(DDS::InstanceHandleSeq& topic_handles)
    1572             : {
    1573           0 :   ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, handle_protector_, DDS::RETCODE_ERROR);
    1574             : 
    1575           0 :   const CountedHandleMap::const_iterator itEnd = handles_.end();
    1576           0 :   for (CountedHandleMap::const_iterator iter = handles_.begin(); iter != itEnd; ++iter) {
    1577           0 :     GuidConverter converter(iter->first);
    1578           0 :     if (converter.isTopic()) {
    1579           0 :       if (ignored_topics_.count(iter->first)) {
    1580           0 :         continue;
    1581             :       }
    1582             : 
    1583           0 :       push_back(topic_handles, iter->second.first);
    1584             :     }
    1585           0 :   }
    1586             : 
    1587           0 :   return DDS::RETCODE_OK;
    1588           0 : }
    1589             : 
    1590             : DDS::ReturnCode_t
    1591           0 : DomainParticipantImpl::get_discovered_topic_data(DDS::TopicBuiltinTopicData& topic_data,
    1592             :                                                  DDS::InstanceHandle_t topic_handle)
    1593             : {
    1594             :   {
    1595           0 :     ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, handle_protector_, DDS::RETCODE_ERROR);
    1596             : 
    1597           0 :     bool found = false;
    1598           0 :     const CountedHandleMap::const_iterator itEnd = handles_.end();
    1599           0 :     for (CountedHandleMap::const_iterator iter = handles_.begin(); iter != itEnd; ++iter) {
    1600           0 :       GuidConverter converter(iter->first);
    1601           0 :       if (topic_handle == iter->second.first && converter.isTopic()) {
    1602           0 :         found = true;
    1603           0 :         break;
    1604             :       }
    1605           0 :     }
    1606             : 
    1607           0 :     if (!found)
    1608           0 :       return DDS::RETCODE_PRECONDITION_NOT_MET;
    1609           0 :   }
    1610             : 
    1611           0 :   return bit_subscriber_->get_discovered_topic_data(topic_data, topic_handle);
    1612             : }
    1613             : 
    1614             : #endif
    1615             : 
    1616             : DDS::ReturnCode_t
    1617           0 : DomainParticipantImpl::enable()
    1618             : {
    1619             :   //According spec:
    1620             :   // - Calling enable on an already enabled Entity returns OK and has no
    1621             :   // effect.
    1622             :   // - Calling enable on an Entity whose factory is not enabled will fail
    1623             :   // and return PRECONDITION_NOT_MET.
    1624             : 
    1625           0 :   if (this->is_enabled()) {
    1626           0 :     return DDS::RETCODE_OK;
    1627             :   }
    1628             : 
    1629             : #ifdef OPENDDS_SECURITY
    1630           0 :   if (!security_config_ && TheServiceParticipant->get_security()) {
    1631           0 :     security_config_ = TheSecurityRegistry->default_config();
    1632           0 :     if (!security_config_) {
    1633           0 :       security_config_ = TheSecurityRegistry->builtin_config();
    1634           0 :       TheSecurityRegistry->default_config(security_config_);
    1635             :     }
    1636             :   }
    1637             : #endif
    1638             : 
    1639           0 :   Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
    1640             : 
    1641           0 :   if (disco.is_nil()) {
    1642           0 :     if (DCPS_debug_level > 0) {
    1643           0 :       ACE_ERROR((LM_ERROR,
    1644             :                 ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::enable, ")
    1645             :                 ACE_TEXT("no discovery found for domain id: %d.\n"), domain_id_));
    1646             :     }
    1647           0 :     return DDS::RETCODE_ERROR;
    1648             :   }
    1649             : 
    1650             : #ifdef OPENDDS_SECURITY
    1651           0 :   if (TheServiceParticipant->get_security() && !security_config_) {
    1652           0 :     if (DCPS::security_debug.new_entity_error) {
    1653           0 :       ACE_ERROR((LM_ERROR,
    1654             :                  ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::enable, ")
    1655             :                  ACE_TEXT("DCPSSecurity flag is set, but unable to load security plugin configuration.\n")));
    1656             :     }
    1657           0 :     return DDS::RETCODE_ERROR;
    1658             :   }
    1659             : #endif
    1660             : 
    1661           0 :   AddDomainStatus value = {GUID_UNKNOWN, false};
    1662             : 
    1663             : #ifdef OPENDDS_SECURITY
    1664           0 :   if (TheServiceParticipant->get_security() && security_config_->qos_implies_security(qos_)) {
    1665           0 :     Security::Authentication_var auth = security_config_->get_authentication();
    1666             : 
    1667           0 :     DDS::Security::SecurityException se;
    1668             :     DDS::Security::ValidationResult_t val_res =
    1669           0 :       auth->validate_local_identity(id_handle_, dp_id_, domain_id_, qos_, disco->generate_participant_guid(), se);
    1670             : 
    1671             :     /* TODO - Handle VALIDATION_PENDING_RETRY */
    1672           0 :     if (val_res != DDS::Security::VALIDATION_OK) {
    1673           0 :       if (DCPS::security_debug.new_entity_error) {
    1674           0 :         ACE_ERROR((LM_ERROR,
    1675             :                    ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::enable, ")
    1676             :                    ACE_TEXT("Unable to validate local identity. SecurityException[%d.%d]: %C\n"),
    1677             :                    se.code, se.minor_code, se.message.in()));
    1678             :       }
    1679           0 :       return DDS::Security::RETCODE_NOT_ALLOWED_BY_SECURITY;
    1680             :     }
    1681             : 
    1682           0 :     Security::AccessControl_var access = security_config_->get_access_control();
    1683             : 
    1684           0 :     perm_handle_ = access->validate_local_permissions(auth, id_handle_, domain_id_, qos_, se);
    1685             : 
    1686           0 :     if (perm_handle_ == DDS::HANDLE_NIL) {
    1687           0 :       if (DCPS::security_debug.new_entity_error) {
    1688           0 :         ACE_ERROR((LM_ERROR,
    1689             :                    ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::enable, ")
    1690             :                    ACE_TEXT("Unable to validate local permissions. SecurityException[%d.%d]: %C\n"),
    1691             :                    se.code, se.minor_code, se.message.in()));
    1692             :       }
    1693           0 :       return DDS::Security::RETCODE_NOT_ALLOWED_BY_SECURITY;
    1694             :     }
    1695             : 
    1696           0 :     const bool check_create = access->check_create_participant(perm_handle_, domain_id_, qos_, se);
    1697           0 :     if (!check_create) {
    1698           0 :       if (DCPS::security_debug.new_entity_error) {
    1699           0 :         ACE_ERROR((LM_ERROR,
    1700             :                    ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::enable, ")
    1701             :                    ACE_TEXT("Unable to create participant. SecurityException[%d.%d]: %C\n"),
    1702             :                    se.code, se.minor_code, se.message.in()));
    1703             :       }
    1704           0 :       return DDS::Security::RETCODE_NOT_ALLOWED_BY_SECURITY;
    1705             :     }
    1706             : 
    1707           0 :     DDS::Security::ParticipantSecurityAttributes part_sec_attr;
    1708           0 :     const bool check_part_sec_attr = access->get_participant_sec_attributes(perm_handle_, part_sec_attr, se);
    1709             : 
    1710           0 :     if (!check_part_sec_attr) {
    1711           0 :       if (DCPS::security_debug.new_entity_error) {
    1712           0 :         ACE_ERROR((LM_ERROR,
    1713             :                    ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::enable,")
    1714             :                    ACE_TEXT("Unable to get participant security attributes. SecurityException[%d.%d]: %C\n"),
    1715             :                    se.code, se.minor_code, se.message.in()));
    1716             :       }
    1717           0 :       return DDS::RETCODE_ERROR;
    1718             :     }
    1719             : 
    1720           0 :     if (part_sec_attr.is_rtps_protected) { // DDS-Security v1.1 8.4.2.4 Table 27 is_rtps_protected
    1721           0 :       if (part_sec_attr.allow_unauthenticated_participants) {
    1722           0 :         if (DCPS::security_debug.new_entity_error) {
    1723           0 :           ACE_ERROR((LM_ERROR,
    1724             :                      ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::enable, ")
    1725             :                      ACE_TEXT("allow_unauthenticated_participants is not possible with is_rtps_protected\n")));
    1726             :         }
    1727           0 :         return DDS::Security::RETCODE_NOT_ALLOWED_BY_SECURITY;
    1728             :       }
    1729             : 
    1730           0 :       const Security::CryptoKeyFactory_var crypto = security_config_->get_crypto_key_factory();
    1731           0 :       part_crypto_handle_ = crypto->register_local_participant(id_handle_, perm_handle_,
    1732           0 :         Util::filter_properties(qos_.property.value, "dds.sec.crypto."), part_sec_attr, se);
    1733           0 :       if (part_crypto_handle_ == DDS::HANDLE_NIL) {
    1734           0 :         if (DCPS::security_debug.new_entity_error) {
    1735           0 :           ACE_ERROR((LM_ERROR,
    1736             :                      ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::enable, ")
    1737             :                      ACE_TEXT("Unable to register local participant. SecurityException[%d.%d]: %C\n"),
    1738             :                      se.code, se.minor_code, se.message.in()));
    1739             :         }
    1740           0 :         return DDS::RETCODE_ERROR;
    1741             :       }
    1742             : 
    1743           0 :     } else {
    1744           0 :       part_crypto_handle_ = DDS::HANDLE_NIL;
    1745             :     }
    1746             : 
    1747           0 :     value = disco->add_domain_participant_secure(domain_id_, qos_, type_lookup_service_,
    1748           0 :                                                  dp_id_, id_handle_, perm_handle_, part_crypto_handle_);
    1749             : 
    1750           0 :     if (value.id == GUID_UNKNOWN) {
    1751           0 :       if (DCPS::security_debug.new_entity_error) {
    1752           0 :         ACE_ERROR((LM_ERROR,
    1753             :                    ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::enable, ")
    1754             :                    ACE_TEXT("add_domain_participant_secure returned invalid id.\n")));
    1755             :       }
    1756           0 :       return DDS::RETCODE_ERROR;
    1757             :     }
    1758             : 
    1759           0 :   } else {
    1760             : #endif
    1761             : 
    1762           0 :     value = disco->add_domain_participant(domain_id_, qos_, type_lookup_service_);
    1763             : 
    1764           0 :     if (value.id == GUID_UNKNOWN) {
    1765           0 :       if (DCPS_debug_level > 0) {
    1766           0 :         ACE_ERROR((LM_ERROR,
    1767             :                    ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::enable, ")
    1768             :                    ACE_TEXT("add_domain_participant returned invalid id.\n")));
    1769             :       }
    1770           0 :       return DDS::RETCODE_ERROR;
    1771             :     }
    1772             : 
    1773             : #ifdef OPENDDS_SECURITY
    1774             :   }
    1775             : #endif
    1776             : 
    1777           0 :   dp_id_ = value.id;
    1778           0 :   federated_ = value.federated;
    1779             : 
    1780           0 :   if (monitor_) {
    1781           0 :     monitor_->report();
    1782             :   }
    1783             : 
    1784           0 :   if (TheServiceParticipant->monitor_) {
    1785           0 :     TheServiceParticipant->monitor_->report();
    1786             :   }
    1787             : 
    1788           0 :   const DDS::ReturnCode_t ret = this->set_enabled();
    1789             : 
    1790           0 :   if (DCPS_debug_level > 1) {
    1791           0 :     ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DomainParticipantImpl::enable: ")
    1792             :                ACE_TEXT("enabled participant %C in domain %d\n"),
    1793             :                LogGuid(dp_id_).c_str(), domain_id_));
    1794             :   }
    1795             : 
    1796           0 :   if (ret == DDS::RETCODE_OK && !TheTransientKludge->is_enabled()) {
    1797           0 :     Discovery_rch disc = TheServiceParticipant->get_discovery(this->domain_id_);
    1798           0 :     this->bit_subscriber_ = disc->init_bit(this);
    1799           0 :   }
    1800             : 
    1801           0 :   if (ret != DDS::RETCODE_OK) {
    1802           0 :     return ret;
    1803             :   }
    1804             : 
    1805           0 :   if (qos_.entity_factory.autoenable_created_entities) {
    1806             : 
    1807           0 :     for (TopicMap::iterator it = topics_.begin(); it != topics_.end(); ++it) {
    1808           0 :       it->second.pair_.svt_->enable();
    1809             :     }
    1810             : 
    1811           0 :     for (PublisherSet::iterator it = publishers_.begin(); it != publishers_.end(); ++it) {
    1812           0 :       it->svt_->enable();
    1813             :     }
    1814             : 
    1815           0 :     for (SubscriberSet::iterator it = subscribers_.begin(); it != subscribers_.end(); ++it) {
    1816           0 :       it->svt_->enable();
    1817             :     }
    1818             :   }
    1819             : 
    1820           0 :   return DDS::RETCODE_OK;
    1821           0 : }
    1822             : 
    1823             : GUID_t
    1824           0 : DomainParticipantImpl::get_id() const
    1825             : {
    1826           0 :   return dp_id_;
    1827             : }
    1828             : 
    1829             : OPENDDS_STRING
    1830           0 : DomainParticipantImpl::get_unique_id()
    1831             : {
    1832           0 :   return GuidConverter(dp_id_).uniqueParticipantId();
    1833             : }
    1834             : 
    1835             : 
    1836             : DDS::InstanceHandle_t
    1837           0 : DomainParticipantImpl::get_instance_handle()
    1838             : {
    1839           0 :   return get_entity_instance_handle(dp_id_, rchandle_from(this));
    1840             : }
    1841             : 
    1842           0 : DDS::InstanceHandle_t DomainParticipantImpl::assign_handle(const GUID_t& id)
    1843             : {
    1844           0 :   ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, handle_protector_, DDS::HANDLE_NIL);
    1845           0 :   if (id == GUID_UNKNOWN) {
    1846             :     const DDS::InstanceHandle_t ih =
    1847           0 :       reusable_handles_.empty() ? participant_handles_.next() : reusable_handles_.pop_front();
    1848           0 :     if (DCPS_debug_level > 5) {
    1849           0 :       ACE_DEBUG((LM_DEBUG, "(%P|%t) DomainParticipantImpl::assign_handle: "
    1850             :                  "New unmapped InstanceHandle %d\n", ih));
    1851             :     }
    1852           0 :     return ih;
    1853             :   }
    1854             : 
    1855           0 :   const CountedHandleMap::iterator location = handles_.find(id);
    1856           0 :   if (location == handles_.end()) {
    1857             :     const DDS::InstanceHandle_t handle =
    1858           0 :       reusable_handles_.empty() ? participant_handles_.next() : reusable_handles_.pop_front();
    1859           0 :     if (DCPS_debug_level > 5) {
    1860           0 :       ACE_DEBUG((LM_DEBUG, "(%P|%t) DomainParticipantImpl::assign_handle: "
    1861             :                  "New mapped InstanceHandle %d for %C\n",
    1862             :                  handle, LogGuid(id).c_str()));
    1863             :     }
    1864           0 :     handles_[id] = std::make_pair(handle, 1);
    1865           0 :     repoIds_[handle] = id;
    1866           0 :     handle_waiters_.notify_all();
    1867           0 :     return handle;
    1868             :   }
    1869             : 
    1870           0 :   HandleWithCounter& mapped = location->second;
    1871           0 :   ++mapped.second;
    1872           0 :   if (DCPS_debug_level > 5) {
    1873           0 :     ACE_DEBUG((LM_DEBUG, "(%P|%t) DomainParticipantImpl::assign_handle: "
    1874             :                "Incremented refcount for InstanceHandle %d to %d\n",
    1875             :                mapped.first, mapped.second));
    1876             :   }
    1877           0 :   return mapped.first;
    1878           0 : }
    1879             : 
    1880           0 : DDS::InstanceHandle_t DomainParticipantImpl::await_handle(const GUID_t& id,
    1881             :                                                           TimeDuration max_wait) const
    1882             : {
    1883           0 :   MonotonicTimePoint expire_at = MonotonicTimePoint::now() + max_wait;
    1884           0 :   ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, handle_protector_, DDS::HANDLE_NIL);
    1885           0 :   CountedHandleMap::const_iterator iter = handles_.find(id);
    1886           0 :   CvStatus res = CvStatus_NoTimeout;
    1887           0 :   ThreadStatusManager& thread_status_manager = TheServiceParticipant->get_thread_status_manager();
    1888           0 :   while (res == CvStatus_NoTimeout && iter == handles_.end()) {
    1889           0 :     res = max_wait.is_zero() ? handle_waiters_.wait(thread_status_manager) : handle_waiters_.wait_until(expire_at, thread_status_manager);
    1890           0 :     iter = handles_.find(id);
    1891             :   }
    1892           0 :   return iter == handles_.end() ? DDS::HANDLE_NIL : iter->second.first;
    1893           0 : }
    1894             : 
    1895           0 : DDS::InstanceHandle_t DomainParticipantImpl::lookup_handle(const GUID_t& id) const
    1896             : {
    1897           0 :   ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, handle_protector_, DDS::HANDLE_NIL);
    1898           0 :   const CountedHandleMap::const_iterator iter = handles_.find(id);
    1899           0 :   return iter == handles_.end() ? DDS::HANDLE_NIL : iter->second.first;
    1900           0 : }
    1901             : 
    1902           0 : void DomainParticipantImpl::return_handle(DDS::InstanceHandle_t handle)
    1903             : {
    1904           0 :   ACE_GUARD(ACE_Thread_Mutex, guard, handle_protector_);
    1905           0 :   const RepoIdMap::iterator r_iter = repoIds_.find(handle);
    1906           0 :   if (r_iter == repoIds_.end()) {
    1907           0 :     reusable_handles_.add(handle);
    1908           0 :     if (DCPS_debug_level > 5) {
    1909           0 :       ACE_DEBUG((LM_DEBUG, "(%P|%t) DomainParticipantImpl::return_handle: "
    1910             :                  "Returned unmapped InstanceHandle %d\n", handle));
    1911             :     }
    1912           0 :     return;
    1913             :   }
    1914             : 
    1915           0 :   const CountedHandleMap::iterator h_iter = handles_.find(r_iter->second);
    1916           0 :   if (h_iter == handles_.end()) {
    1917           0 :     return;
    1918             :   }
    1919             : 
    1920           0 :   if (DCPS_debug_level > 5) {
    1921           0 :     ACE_DEBUG((LM_DEBUG, "(%P|%t) DomainParticipantImpl::return_handle: "
    1922             :                "Returned mapped InstanceHandle %d refcount %d\n",
    1923             :                handle, h_iter->second.second));
    1924             :   }
    1925             : 
    1926           0 :   HandleWithCounter& mapped = h_iter->second;
    1927           0 :   if (--mapped.second == 0) {
    1928           0 :     handles_.erase(h_iter);
    1929           0 :     repoIds_.erase(r_iter);
    1930           0 :     reusable_handles_.add(handle);
    1931             :   }
    1932           0 : }
    1933             : 
    1934           0 : GUID_t DomainParticipantImpl::get_repoid(DDS::InstanceHandle_t handle) const
    1935             : {
    1936           0 :   ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, handle_protector_, GUID_UNKNOWN);
    1937           0 :   const RepoIdMap::const_iterator location = repoIds_.find(handle);
    1938           0 :   return location == repoIds_.end() ? GUID_UNKNOWN : location->second;
    1939           0 : }
    1940             : 
    1941             : DDS::Topic_ptr
    1942           0 : DomainParticipantImpl::create_new_topic(
    1943             :   const char * topic_name,
    1944             :   const char * type_name,
    1945             :   const DDS::TopicQos & qos,
    1946             :   DDS::TopicListener_ptr a_listener,
    1947             :   const DDS::StatusMask & mask,
    1948             :   OpenDDS::DCPS::TypeSupport_ptr type_support)
    1949             : {
    1950           0 :   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
    1951             :                    tao_mon,
    1952             :                    this->topics_protector_,
    1953             :                    DDS::Topic::_nil());
    1954             : 
    1955             : #ifdef OPENDDS_SECURITY
    1956           0 :   if (perm_handle_ && !topicIsBIT(topic_name, type_name)) {
    1957           0 :     Security::AccessControl_var access = security_config_->get_access_control();
    1958             : 
    1959           0 :     DDS::Security::SecurityException se;
    1960             : 
    1961             :     DDS::Security::TopicSecurityAttributes sec_attr;
    1962           0 :     if (!access->get_topic_sec_attributes(perm_handle_, topic_name, sec_attr, se)) {
    1963           0 :       if (DCPS::security_debug.new_entity_warn) {
    1964           0 :         ACE_ERROR((LM_WARNING,
    1965             :                    ACE_TEXT("(%P|%t) WARNING: ")
    1966             :                    ACE_TEXT("DomainParticipantImpl::create_new_topic, ")
    1967             :                    ACE_TEXT("Unable to get security attributes for topic '%C'. SecurityException[%d.%d]: %C\n"),
    1968             :                    topic_name, se.code, se.minor_code, se.message.in()));
    1969             :         }
    1970           0 :       return DDS::Topic::_nil();
    1971             :     }
    1972             : 
    1973           0 :     if ((sec_attr.is_write_protected || sec_attr.is_read_protected) &&
    1974           0 :         !access->check_create_topic(perm_handle_, domain_id_, topic_name, qos, se)) {
    1975           0 :       if (DCPS::security_debug.new_entity_warn) {
    1976           0 :         ACE_ERROR((LM_WARNING,
    1977             :                    ACE_TEXT("(%P|%t) WARNING: ")
    1978             :                    ACE_TEXT("DomainParticipantImpl::create_new_topic, ")
    1979             :                    ACE_TEXT("Permissions check failed to create new topic '%C'. SecurityException[%d.%d]: %C\n"),
    1980             :                    topic_name, se.code, se.minor_code, se.message.in()));
    1981             :       }
    1982           0 :       return DDS::Topic::_nil();
    1983             :     }
    1984           0 :   }
    1985             : #endif
    1986             : 
    1987           0 :   TopicImpl* topic_servant = 0;
    1988             : 
    1989           0 :   ACE_NEW_RETURN(topic_servant,
    1990             :                  TopicImpl(topic_name,
    1991             :                            type_name,
    1992             :                            type_support,
    1993             :                            qos,
    1994             :                            a_listener,
    1995             :                            mask,
    1996             :                            this),
    1997             :                  DDS::Topic::_nil());
    1998             : 
    1999           0 :   if (enabled_ && qos_.entity_factory.autoenable_created_entities) {
    2000           0 :     const DDS::ReturnCode_t ret = topic_servant->enable();
    2001             : 
    2002           0 :     if (ret != DDS::RETCODE_OK) {
    2003           0 :       ACE_ERROR((LM_WARNING,
    2004             :           ACE_TEXT("(%P|%t) WARNING: ")
    2005             :           ACE_TEXT("DomainParticipantImpl::create_new_topic, ")
    2006             :           ACE_TEXT("enable failed.\n")));
    2007           0 :       return DDS::Topic::_nil();
    2008             :     }
    2009             :   }
    2010             : 
    2011           0 :   DDS::Topic_ptr obj(topic_servant);
    2012             : 
    2013             :   // this object will also act as a guard against leaking the new TopicImpl
    2014           0 :   RefCounted_Topic refCounted_topic(Topic_Pair(topic_servant, obj, false));
    2015           0 :   topics_.insert(std::make_pair(topic_name, refCounted_topic));
    2016             : 
    2017           0 :   if (this->monitor_) {
    2018           0 :     this->monitor_->report();
    2019             :   }
    2020             : 
    2021             :   // the topics_ map has one reference and we duplicate to give
    2022             :   // the caller another reference.
    2023           0 :   return DDS::Topic::_duplicate(refCounted_topic.pair_.obj_.in());
    2024           0 : }
    2025             : 
    2026           0 : bool DomainParticipantImpl::is_clean(String* leftover_entities) const
    2027             : {
    2028           0 :   if (leftover_entities) {
    2029           0 :     leftover_entities->clear();
    2030             :   }
    2031             : 
    2032             :   // check that the only remaining topics are built-in topics
    2033           0 :   size_t topic_count = 0;
    2034           0 :   for (TopicMap::const_iterator it = topics_.begin(); it != topics_.end(); ++it) {
    2035           0 :     if (!topicIsBIT(it->second.pair_.svt_->topic_name(), it->second.pair_.svt_->type_name())) {
    2036           0 :       ++topic_count;
    2037             :     }
    2038             :   }
    2039           0 :   if (topic_count) {
    2040           0 :     *leftover_entities += to_dds_string(topic_count) + " topic(s)";
    2041             :   }
    2042             : 
    2043           0 :   size_t sub_count = subscribers_.size();
    2044           0 :   if (!TheTransientKludge->is_enabled()) {
    2045             :     // There are built-in topics and built-in topic subscribers left.
    2046           0 :     sub_count = sub_count <= 1 ? 0 : sub_count;
    2047             :   }
    2048           0 :   if (leftover_entities && sub_count) {
    2049           0 :     if (leftover_entities->size()) {
    2050           0 :       *leftover_entities += ", ";
    2051             :     }
    2052           0 :     *leftover_entities += to_dds_string(sub_count) + " subscriber(s)";
    2053             :   }
    2054             : 
    2055           0 :   const size_t pub_count = publishers_.size();
    2056           0 :   if (leftover_entities && pub_count) {
    2057           0 :     if (leftover_entities->size()) {
    2058           0 :       *leftover_entities += ", ";
    2059             :     }
    2060           0 :     *leftover_entities += to_dds_string(pub_count) + " publisher(s)";
    2061             :   }
    2062             : 
    2063           0 :   return topic_count == 0 && sub_count == 0 && pub_count == 0;
    2064             : }
    2065             : 
    2066             : DDS::DomainParticipantListener_ptr
    2067           0 : DomainParticipantImpl::listener_for(DDS::StatusKind kind)
    2068             : {
    2069           0 :   ACE_Guard<ACE_Thread_Mutex> g(listener_mutex_);
    2070           0 :   if (CORBA::is_nil(listener_.in()) || (listener_mask_ & kind) == 0) {
    2071           0 :     return DDS::DomainParticipantListener::_nil ();
    2072             :   } else {
    2073           0 :     return DDS::DomainParticipantListener::_duplicate(listener_.in());
    2074             :   }
    2075           0 : }
    2076             : 
    2077             : void
    2078           0 : DomainParticipantImpl::get_topic_ids(TopicIdVec& topics)
    2079             : {
    2080           0 :   ACE_GUARD(ACE_Recursive_Thread_Mutex,
    2081             :             guard,
    2082             :             this->topics_protector_);
    2083             : 
    2084           0 :   topics.reserve(topics_.size());
    2085           0 :   for (TopicMap::iterator it(topics_.begin());
    2086           0 :        it != topics_.end(); ++it) {
    2087           0 :     topics.push_back(it->second.pair_.svt_->get_id());
    2088             :   }
    2089           0 : }
    2090             : 
    2091             : #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
    2092             : 
    2093             : OwnershipManager*
    2094           0 : DomainParticipantImpl::ownership_manager()
    2095             : {
    2096             : #if !defined (DDS_HAS_MINIMUM_BIT)
    2097           0 :   if (bit_subscriber_) {
    2098           0 :     bit_subscriber_->bit_pub_listener_hack(this);
    2099             :   } else {
    2100           0 :     if (log_level >= LogLevel::Warning) {
    2101           0 :       ACE_ERROR((LM_WARNING,
    2102             :                  "(%P|%t) WARNING: DomainParticipantImpl::ownership_manager: bit_subscriber_ is null"));
    2103             :     }
    2104             :   }
    2105             : #endif
    2106           0 :   return &owner_man_;
    2107             : }
    2108             : 
    2109             : void
    2110           0 : DomainParticipantImpl::update_ownership_strength (const GUID_t& pub_id,
    2111             :                                                   const CORBA::Long& ownership_strength)
    2112             : {
    2113           0 :   ACE_GUARD(ACE_Recursive_Thread_Mutex,
    2114             :             tao_mon,
    2115             :             this->subscribers_protector_);
    2116             : 
    2117           0 :   if (this->get_deleted ())
    2118           0 :     return;
    2119             : 
    2120           0 :   for (SubscriberSet::iterator it(this->subscribers_.begin());
    2121           0 :       it != this->subscribers_.end(); ++it) {
    2122           0 :     it->svt_->update_ownership_strength(pub_id, ownership_strength);
    2123             :   }
    2124           0 : }
    2125             : 
    2126             : #endif // OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
    2127             : 
    2128           2 : DomainParticipantImpl::RepoIdSequence::RepoIdSequence(const GUID_t& base) :
    2129           2 :   base_(base),
    2130           2 :   serial_(0),
    2131           2 :   builder_(base_)
    2132             : {
    2133           2 : }
    2134             : 
    2135             : GUID_t
    2136           7 : DomainParticipantImpl::RepoIdSequence::next()
    2137             : {
    2138           7 :   builder_.entityKey(++serial_);
    2139           7 :   return builder_;
    2140             : }
    2141             : 
    2142             : 
    2143             : ////////////////////////////////////////////////////////////////
    2144             : 
    2145             : 
    2146             : bool
    2147           0 : DomainParticipantImpl::validate_publisher_qos(DDS::PublisherQos & pub_qos)
    2148             : {
    2149           0 :   if (pub_qos == PUBLISHER_QOS_DEFAULT) {
    2150           0 :     this->get_default_publisher_qos(pub_qos);
    2151             :   }
    2152             : 
    2153             :   OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK(pub_qos, false);
    2154             : 
    2155           0 :   if (!Qos_Helper::valid(pub_qos) || !Qos_Helper::consistent(pub_qos)) {
    2156           0 :     if (DCPS_debug_level > 0) {
    2157           0 :       ACE_ERROR((LM_ERROR,
    2158             :                  ACE_TEXT("(%P|%t) ERROR: ")
    2159             :                  ACE_TEXT("DomainParticipantImpl::validate_publisher_qos, ")
    2160             :                  ACE_TEXT("invalid qos.\n")));
    2161             :     }
    2162           0 :     return false;
    2163             :   }
    2164             : 
    2165           0 :   return true;
    2166             : }
    2167             : 
    2168             : bool
    2169           0 : DomainParticipantImpl::validate_subscriber_qos(DDS::SubscriberQos & subscriber_qos)
    2170             : {
    2171           0 :   if (subscriber_qos == SUBSCRIBER_QOS_DEFAULT) {
    2172           0 :     this->get_default_subscriber_qos(subscriber_qos);
    2173             :   }
    2174             : 
    2175             :   OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK(subscriber_qos, false);
    2176             : 
    2177           0 :   if (!Qos_Helper::valid(subscriber_qos) || !Qos_Helper::consistent(subscriber_qos)) {
    2178           0 :     if (DCPS_debug_level > 0) {
    2179           0 :       ACE_ERROR((LM_ERROR,
    2180             :                  ACE_TEXT("(%P|%t) ERROR: ")
    2181             :                  ACE_TEXT("DomainParticipantImpl::validate_subscriber_qos, ")
    2182             :                  ACE_TEXT("invalid qos.\n")));
    2183             :     }
    2184           0 :     return false;
    2185             :   }
    2186             : 
    2187             : 
    2188           0 :   return true;
    2189             : }
    2190             : 
    2191             : Recorder_ptr
    2192           0 : DomainParticipantImpl::create_recorder(DDS::Topic_ptr a_topic,
    2193             :                                        const DDS::SubscriberQos& subscriber_qos,
    2194             :                                        const DDS::DataReaderQos& datareader_qos,
    2195             :                                        const RecorderListener_rch& a_listener,
    2196             :                                        DDS::StatusMask mask)
    2197             : {
    2198           0 :   if (CORBA::is_nil(a_topic)) {
    2199           0 :     if (DCPS_debug_level > 0) {
    2200           0 :       ACE_ERROR((LM_ERROR,
    2201             :                  ACE_TEXT("(%P|%t) ERROR: ")
    2202             :                  ACE_TEXT("DomainParticipantImpl::create_recorder, ")
    2203             :                  ACE_TEXT("topic desc is nil.\n")));
    2204             :     }
    2205           0 :     return 0;
    2206             :   }
    2207             : 
    2208           0 :   DDS::SubscriberQos sub_qos = subscriber_qos;
    2209           0 :   DDS::DataReaderQos dr_qos;
    2210             : 
    2211           0 :   if (! this->validate_subscriber_qos(sub_qos) ||
    2212           0 :       ! SubscriberImpl::validate_datareader_qos(datareader_qos,
    2213             :                                                 TheServiceParticipant->initial_DataReaderQos(),
    2214             :                                                 a_topic,
    2215             :                                                 dr_qos, false) ) {
    2216           0 :     return 0;
    2217             :   }
    2218             : 
    2219           0 :   RecorderImpl* recorder(new RecorderImpl);
    2220           0 :   Recorder_var result(recorder);
    2221             : 
    2222           0 :   recorder->init(dynamic_cast<TopicDescriptionImpl*>(a_topic),
    2223             :     dr_qos, a_listener,
    2224             :     mask, this, sub_qos);
    2225             : 
    2226           0 :   if (enabled_ && qos_.entity_factory.autoenable_created_entities) {
    2227           0 :     recorder->enable();
    2228             :   }
    2229             : 
    2230           0 :   ACE_Guard<ACE_Recursive_Thread_Mutex> guard(recorders_protector_);
    2231           0 :   recorders_.insert(result);
    2232             : 
    2233           0 :   return result._retn();
    2234           0 : }
    2235             : 
    2236             : Replayer_ptr
    2237           0 : DomainParticipantImpl::create_replayer(DDS::Topic_ptr a_topic,
    2238             :                                        const DDS::PublisherQos& publisher_qos,
    2239             :                                        const DDS::DataWriterQos& datawriter_qos,
    2240             :                                        const ReplayerListener_rch& a_listener,
    2241             :                                        DDS::StatusMask mask)
    2242             : {
    2243           0 :   if (CORBA::is_nil(a_topic)) {
    2244           0 :     if (DCPS_debug_level > 0) {
    2245           0 :       ACE_ERROR((LM_ERROR,
    2246             :                  ACE_TEXT("(%P|%t) ERROR: ")
    2247             :                  ACE_TEXT("DomainParticipantImpl::create_replayer, ")
    2248             :                  ACE_TEXT("topic desc is nil.\n")));
    2249             :     }
    2250           0 :     return 0;
    2251             :   }
    2252             : 
    2253           0 :   DDS::PublisherQos pub_qos = publisher_qos;
    2254           0 :   DDS::DataWriterQos dw_qos;
    2255             : 
    2256           0 :   if (! this->validate_publisher_qos(pub_qos) ||
    2257           0 :       ! PublisherImpl::validate_datawriter_qos(datawriter_qos,
    2258             :                                                TheServiceParticipant->initial_DataWriterQos(),
    2259             :                                                a_topic,
    2260             :                                                dw_qos)) {
    2261           0 :     return 0;
    2262             :   }
    2263             : 
    2264           0 :   TopicImpl* topic_servant = dynamic_cast<TopicImpl*>(a_topic);
    2265             : 
    2266           0 :   ReplayerImpl* replayer(new ReplayerImpl);
    2267           0 :   Replayer_var result(replayer);
    2268             : 
    2269           0 :   replayer->init(a_topic, topic_servant, dw_qos, a_listener, mask, this, pub_qos);
    2270             : 
    2271           0 :   if (enabled_ && qos_.entity_factory.autoenable_created_entities) {
    2272           0 :     const DDS::ReturnCode_t ret = replayer->enable();
    2273             : 
    2274           0 :     if (ret != DDS::RETCODE_OK) {
    2275           0 :       if (DCPS_debug_level > 0) {
    2276           0 :         ACE_ERROR((LM_ERROR,
    2277             :                    ACE_TEXT("(%P|%t) ERROR: ")
    2278             :                    ACE_TEXT("DomainParticipantImpl::create_replayer, ")
    2279             :                    ACE_TEXT("enable failed.\n")));
    2280             :       }
    2281           0 :       return 0;
    2282             :     }
    2283             :   }
    2284             : 
    2285           0 :   ACE_Guard<ACE_Recursive_Thread_Mutex> guard(replayers_protector_);
    2286           0 :   replayers_.insert(result);
    2287           0 :   return result._retn();
    2288           0 : }
    2289             : 
    2290             : void
    2291           0 : DomainParticipantImpl::delete_recorder(Recorder_ptr recorder)
    2292             : {
    2293           0 :   const Recorder_var recvar(Recorder::_duplicate(recorder));
    2294           0 :   ACE_Guard<ACE_Recursive_Thread_Mutex> guard(recorders_protector_);
    2295           0 :   recorders_.erase(recvar);
    2296           0 : }
    2297             : 
    2298             : void
    2299           0 : DomainParticipantImpl::delete_replayer(Replayer_ptr replayer)
    2300             : {
    2301           0 :   const Replayer_var repvar(Replayer::_duplicate(replayer));
    2302           0 :   ACE_Guard<ACE_Recursive_Thread_Mutex> guard(replayers_protector_);
    2303           0 :   replayers_.erase(repvar);
    2304           0 : }
    2305             : 
    2306             : void
    2307           0 : DomainParticipantImpl::add_adjust_liveliness_timers(DataWriterImpl* writer)
    2308             : {
    2309           0 :   automatic_liveliness_timer_->add_adjust(writer);
    2310           0 :   participant_liveliness_timer_->add_adjust(writer);
    2311           0 : }
    2312             : 
    2313             : void
    2314           0 : DomainParticipantImpl::remove_adjust_liveliness_timers()
    2315             : {
    2316           0 :   automatic_liveliness_timer_->remove_adjust();
    2317           0 :   participant_liveliness_timer_->remove_adjust();
    2318           0 : }
    2319             : 
    2320           0 : DomainParticipantImpl::LivelinessTimer::LivelinessTimer(DomainParticipantImpl& impl,
    2321           0 :                                                         DDS::LivelinessQosPolicyKind kind)
    2322           0 :   : impl_(impl)
    2323           0 :   , kind_(kind)
    2324           0 :   , interval_(TimeDuration::max_value)
    2325           0 :   , recalculate_interval_(false)
    2326           0 :   , scheduled_(false)
    2327           0 : { }
    2328             : 
    2329           0 : DomainParticipantImpl::LivelinessTimer::~LivelinessTimer()
    2330             : {
    2331           0 : }
    2332             : 
    2333             : void
    2334           0 : DomainParticipantImpl::LivelinessTimer::add_adjust(OpenDDS::DCPS::DataWriterImpl* writer)
    2335             : {
    2336           0 :   ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
    2337             : 
    2338           0 :   const MonotonicTimePoint now = MonotonicTimePoint::now();
    2339             : 
    2340             :   // Calculate the time remaining to liveliness check.
    2341           0 :   const TimeDuration remaining = interval_ - (now - last_liveliness_check_);
    2342             : 
    2343             :   // Adopt a smaller interval.
    2344           0 :   interval_ = std::min(interval_, writer->liveliness_check_interval(kind_));
    2345             : 
    2346             :   // Reschedule or schedule a timer if necessary.
    2347           0 :   if (scheduled_ && interval_ < remaining) {
    2348           0 :     cancel();
    2349           0 :     schedule(interval_);
    2350           0 :   } else if (!scheduled_) {
    2351           0 :     schedule(interval_);
    2352           0 :     scheduled_ = true;
    2353           0 :     last_liveliness_check_ = now;
    2354             :   }
    2355           0 : }
    2356             : 
    2357             : void
    2358           0 : DomainParticipantImpl::LivelinessTimer::remove_adjust()
    2359             : {
    2360           0 :   ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
    2361             : 
    2362           0 :   recalculate_interval_ = true;
    2363           0 : }
    2364             : 
    2365           0 : void DomainParticipantImpl::LivelinessTimer::execute(const MonotonicTimePoint& now)
    2366             : {
    2367           0 :   ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
    2368             : 
    2369           0 :   if (recalculate_interval_) {
    2370           0 :     ACE_Reverse_Lock<ACE_Thread_Mutex> rev_lock(lock_);
    2371           0 :     TimeDuration interval;
    2372           0 :     while (recalculate_interval_) {
    2373           0 :       recalculate_interval_ = false;
    2374           0 :       ACE_GUARD(ACE_Reverse_Lock<ACE_Thread_Mutex>, rev_guard, rev_lock);
    2375           0 :       interval = impl_.liveliness_check_interval(kind_);
    2376           0 :     }
    2377           0 :     interval_ = interval;
    2378           0 :   }
    2379             : 
    2380           0 :   scheduled_ = false;
    2381             : 
    2382           0 :   if (!interval_.is_max()) {
    2383           0 :     dispatch(now);
    2384           0 :     last_liveliness_check_ = now;
    2385           0 :     schedule(interval_);
    2386           0 :     scheduled_ = true;
    2387             :   }
    2388           0 : }
    2389             : 
    2390           0 : DomainParticipantImpl::AutomaticLivelinessTimer::AutomaticLivelinessTimer(DomainParticipantImpl& impl)
    2391           0 :   : LivelinessTimer (impl, DDS::AUTOMATIC_LIVELINESS_QOS)
    2392           0 : { }
    2393             : 
    2394             : void
    2395           0 : DomainParticipantImpl::AutomaticLivelinessTimer::dispatch(const MonotonicTimePoint& /* tv */)
    2396             : {
    2397           0 :   impl_.signal_liveliness(kind_);
    2398           0 : }
    2399             : 
    2400           0 : DomainParticipantImpl::ParticipantLivelinessTimer::ParticipantLivelinessTimer(DomainParticipantImpl& impl)
    2401           0 :   : LivelinessTimer(impl, DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS)
    2402           0 : { }
    2403             : 
    2404             : void
    2405           0 : DomainParticipantImpl::ParticipantLivelinessTimer::dispatch(const MonotonicTimePoint& tv)
    2406             : {
    2407           0 :   if (impl_.participant_liveliness_activity_after (tv - interval())) {
    2408           0 :     impl_.signal_liveliness(kind_);
    2409             :   }
    2410           0 : }
    2411             : 
    2412             : TimeDuration
    2413           0 : DomainParticipantImpl::liveliness_check_interval(DDS::LivelinessQosPolicyKind kind)
    2414             : {
    2415           0 :   TimeDuration tv(TimeDuration::max_value);
    2416             : 
    2417           0 :   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
    2418             :                    tao_mon,
    2419             :                    publishers_protector_,
    2420             :                    tv);
    2421             : 
    2422           0 :   for (PublisherSet::iterator it = publishers_.begin(); it != publishers_.end(); ++it) {
    2423           0 :     tv = std::min(tv, it->svt_->liveliness_check_interval(kind));
    2424             :   }
    2425             : 
    2426           0 :   return tv;
    2427           0 : }
    2428             : 
    2429             : bool
    2430           0 : DomainParticipantImpl::participant_liveliness_activity_after(const MonotonicTimePoint& tv)
    2431             : {
    2432           0 :   if (last_liveliness_activity_ > tv) {
    2433           0 :     return true;
    2434             :   }
    2435             : 
    2436           0 :   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, tao_mon, this->publishers_protector_, !tv.is_zero());
    2437             : 
    2438           0 :   for (PublisherSet::iterator it(publishers_.begin());
    2439           0 :        it != publishers_.end(); ++it) {
    2440           0 :     if (it->svt_->participant_liveliness_activity_after(tv)) {
    2441           0 :       return true;
    2442             :     }
    2443             :   }
    2444             : 
    2445           0 :   return false;
    2446           0 : }
    2447             : 
    2448             : void
    2449           0 : DomainParticipantImpl::signal_liveliness (DDS::LivelinessQosPolicyKind kind)
    2450             : {
    2451           0 :   TheServiceParticipant->get_discovery(domain_id_)->signal_liveliness (domain_id_, get_id(), kind);
    2452           0 : }
    2453             : 
    2454             : int
    2455           0 : DomainParticipantImpl::handle_exception(ACE_HANDLE /*fd*/)
    2456             : {
    2457           0 :   ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager());
    2458             : 
    2459           0 :   DDS::ReturnCode_t ret = DDS::RETCODE_OK;
    2460             : 
    2461           0 :   automatic_liveliness_timer_->cancel();
    2462           0 :   participant_liveliness_timer_->cancel();
    2463             : 
    2464             :   // delete publishers
    2465             :   {
    2466           0 :     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
    2467             :                      tao_mon,
    2468             :                      this->publishers_protector_,
    2469             :                      DDS::RETCODE_ERROR);
    2470             : 
    2471           0 :     PublisherSet::iterator pubIter = publishers_.begin();
    2472             :     DDS::Publisher_ptr pubPtr;
    2473           0 :     size_t pubsize = publishers_.size();
    2474             : 
    2475           0 :     while (pubsize > 0) {
    2476           0 :       pubPtr = (*pubIter).obj_.in();
    2477           0 :       ++pubIter;
    2478             : 
    2479           0 :       DDS::ReturnCode_t result = pubPtr->delete_contained_entities();
    2480           0 :       if (result != DDS::RETCODE_OK) {
    2481           0 :         ret = result;
    2482             :       }
    2483             : 
    2484           0 :       result = delete_publisher(pubPtr);
    2485             : 
    2486           0 :       if (result != DDS::RETCODE_OK) {
    2487           0 :         ret = result;
    2488             :       }
    2489             : 
    2490           0 :       --pubsize;
    2491             :     }
    2492             : 
    2493           0 :   }
    2494             : 
    2495             :   // delete subscribers
    2496             :   {
    2497           0 :     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
    2498             :                      tao_mon,
    2499             :                      this->subscribers_protector_,
    2500             :                      DDS::RETCODE_ERROR);
    2501             : 
    2502           0 :     SubscriberSet::iterator subIter = subscribers_.begin();
    2503             :     DDS::Subscriber_ptr subPtr;
    2504           0 :     size_t subsize = subscribers_.size();
    2505             : 
    2506           0 :     while (subsize > 0) {
    2507           0 :       subPtr = (*subIter).obj_.in();
    2508           0 :       ++subIter;
    2509             : 
    2510           0 :       DDS::ReturnCode_t result = subPtr->delete_contained_entities();
    2511             : 
    2512           0 :       if (result != DDS::RETCODE_OK) {
    2513           0 :         ret = result;
    2514             :       }
    2515             : 
    2516           0 :       result = delete_subscriber(subPtr);
    2517             : 
    2518           0 :       if (result != DDS::RETCODE_OK) {
    2519           0 :         ret = result;
    2520             :       }
    2521             : 
    2522           0 :       --subsize;
    2523             :     }
    2524           0 :   }
    2525             : 
    2526             :   {
    2527           0 :     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
    2528             :                      tao_mon,
    2529             :                      this->recorders_protector_,
    2530             :                      DDS::RETCODE_ERROR);
    2531             : 
    2532           0 :     RecorderSet::iterator it = recorders_.begin();
    2533           0 :     for (; it != recorders_.end(); ++it ){
    2534           0 :       RecorderImpl* impl = dynamic_cast<RecorderImpl* >(it->in());
    2535           0 :       DDS::ReturnCode_t result = DDS::RETCODE_ERROR;
    2536           0 :       if (impl) result = impl->cleanup();
    2537           0 :       if (result != DDS::RETCODE_OK) ret = result;
    2538             :     }
    2539           0 :     recorders_.clear();
    2540           0 :   }
    2541             : 
    2542             :   {
    2543           0 :     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
    2544             :                      tao_mon,
    2545             :                      this->replayers_protector_,
    2546             :                      DDS::RETCODE_ERROR);
    2547             : 
    2548           0 :     ReplayerSet::iterator it = replayers_.begin();
    2549           0 :     for (; it != replayers_.end(); ++it ){
    2550           0 :       ReplayerImpl* impl = static_cast<ReplayerImpl* >(it->in());
    2551           0 :       DDS::ReturnCode_t result = DDS::RETCODE_ERROR;
    2552           0 :       if (impl) result = impl->cleanup();
    2553           0 :       if (result != DDS::RETCODE_OK) ret = result;
    2554             : 
    2555             :     }
    2556             : 
    2557           0 :     replayers_.clear();
    2558           0 :   }
    2559             : 
    2560             :   // delete topics
    2561             :   {
    2562           0 :     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
    2563             :                      tao_mon,
    2564             :                      this->topics_protector_,
    2565             :                      DDS::RETCODE_ERROR);
    2566             : 
    2567           0 :     TopicMap::iterator topicIter = topics_.begin();
    2568             :     DDS::Topic_ptr topicPtr;
    2569           0 :     size_t topicsize = topics_.size();
    2570             : 
    2571           0 :     while (topicsize > 0) {
    2572           0 :       topicPtr = topicIter->second.pair_.obj_.in();
    2573           0 :       ++topicIter;
    2574             : 
    2575             :       // Delete the topic the reference count.
    2576           0 :       const DDS::ReturnCode_t result = this->delete_topic_i(topicPtr, true);
    2577             : 
    2578           0 :       if (result != DDS::RETCODE_OK) {
    2579           0 :         ret = result;
    2580             :       }
    2581           0 :       --topicsize;
    2582             :     }
    2583           0 :   }
    2584             : 
    2585           0 :   shutdown_mutex_.acquire();
    2586           0 :   shutdown_result_ = ret;
    2587           0 :   shutdown_complete_ = true;
    2588           0 :   shutdown_condition_.notify_all();
    2589           0 :   shutdown_mutex_.release();
    2590             : 
    2591           0 :   return 0;
    2592           0 : }
    2593             : 
    2594           0 : bool DomainParticipantImpl::prepare_to_delete_datawriters()
    2595             : {
    2596           0 :   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, publishers_protector_, false);
    2597           0 :   bool result = true;
    2598           0 :   const PublisherSet::iterator end = publishers_.end();
    2599           0 :   for (PublisherSet::iterator i = publishers_.begin(); i != end; ++i) {
    2600           0 :     result &= i->svt_->prepare_to_delete_datawriters();
    2601             :   }
    2602           0 :   return result;
    2603           0 : }
    2604             : 
    2605           0 : bool DomainParticipantImpl::set_wait_pending_deadline(const MonotonicTimePoint& deadline)
    2606             : {
    2607           0 :   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, publishers_protector_, false);
    2608           0 :   bool result = true;
    2609           0 :   const PublisherSet::iterator end = publishers_.end();
    2610           0 :   for (PublisherSet::iterator i = publishers_.begin(); i != end; ++i) {
    2611           0 :     result &= i->svt_->set_wait_pending_deadline(deadline);
    2612             :   }
    2613           0 :   return result;
    2614           0 : }
    2615             : 
    2616             : #ifndef OPENDDS_SAFETY_PROFILE
    2617           0 : DDS::ReturnCode_t DomainParticipantImpl::get_dynamic_type(
    2618             :   DDS::DynamicType_var& type, const DDS::BuiltinTopicKey_t& key)
    2619             : {
    2620           0 :   if (!type_lookup_service_) {
    2621           0 :     if (log_level >= LogLevel::Notice) {
    2622           0 :       ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DomainParticipantImpl::get_dynamic_type: "
    2623             :         "Can't get a DynamicType, no type lookup service\n"));
    2624             :     }
    2625           0 :     return DDS::RETCODE_UNSUPPORTED;
    2626             :   }
    2627             : 
    2628           0 :   XTypes::TypeInformation ti = type_lookup_service_->get_type_info(key);
    2629           0 :   if (ti.complete.typeid_with_size.typeobject_serialized_size == 0) {
    2630           0 :     if (log_level >= LogLevel::Notice) {
    2631           0 :       ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DomainParticipantImpl::get_dynamic_type: "
    2632             :         "Can't get a DynamicType, type info is missing complete\n"));
    2633             :     }
    2634           0 :     return DDS::RETCODE_NO_DATA;
    2635             :   }
    2636             : 
    2637           0 :   const XTypes::TypeIdentifier& ctid = ti.complete.typeid_with_size.type_id;
    2638           0 :   const GUID_t entity = bit_key_to_guid(key);
    2639           0 :   if (!type_lookup_service_->has_complete(ctid)) {
    2640             :     // We don't have it, try to asking the remote for the complete
    2641             :     // TypeObjects.
    2642           0 :     if (DCPS_debug_level >= 4) {
    2643           0 :       ACE_DEBUG((LM_DEBUG, "(%P|%t) DomainParticipantImpl::get_dynamic_type: "
    2644             :         "requesting remote complete TypeObject from %C\n", LogGuid(entity).c_str()));
    2645             :     }
    2646           0 :     Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
    2647           0 :     TypeObjReqCond cond;
    2648           0 :     disco->request_remote_complete_type_objects(domain_id_, dp_id_, entity, ti, cond);
    2649           0 :     const DDS::ReturnCode_t rc = cond.wait();
    2650           0 :     if (rc != DDS::RETCODE_OK) {
    2651           0 :       if (log_level >= LogLevel::Notice) {
    2652           0 :         ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DomainParticipantImpl::get_dynamic_type: "
    2653             :           "Couldn't get remote complete type object: %C\n", retcode_to_string(rc)));
    2654             :       }
    2655           0 :       return rc;
    2656             :     }
    2657             : 
    2658           0 :     if (!type_lookup_service_->has_complete(ctid)) {
    2659           0 :       if (log_level >= LogLevel::Notice) {
    2660           0 :         ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DomainParticipantImpl::get_dynamic_type: "
    2661             :           "request_remote_complete_type_objects succeeded, but type lookup service still says it "
    2662             :           "doesn't have the complete TypeObject?\n"));
    2663             :       }
    2664           0 :       return DDS::RETCODE_ERROR;
    2665             :     }
    2666           0 :   }
    2667             : 
    2668           0 :   DDS::DynamicType_var got_type = type_lookup_service_->type_identifier_to_dynamic(ctid, entity);
    2669           0 :   if (!XTypes::dynamic_type_is_valid(got_type)) {
    2670           0 :     if (log_level >= LogLevel::Notice) {
    2671           0 :       ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DomainParticipantImpl::get_dynamic_type: "
    2672             :         "Got an invalid DynamicType\n"));
    2673             :     }
    2674           0 :     return DDS::RETCODE_ERROR;
    2675             :   }
    2676           0 :   type = got_type;
    2677             : 
    2678           0 :   XTypes::DynamicTypeImpl* impl = dynamic_cast<XTypes::DynamicTypeImpl*>(type.in());
    2679           0 :   impl->set_complete_type_identifier(ctid);
    2680           0 :   impl->set_minimal_type_identifier(ti.minimal.typeid_with_size.type_id);
    2681           0 :   impl->set_preset_type_info(ti);
    2682             : 
    2683           0 :   return DDS::RETCODE_OK;
    2684           0 : }
    2685             : #endif
    2686             : 
    2687             : } // namespace DCPS
    2688             : } // namespace OpenDDS
    2689             : 
    2690             : OPENDDS_END_VERSIONED_NAMESPACE_DECL

Generated by: LCOV version 1.16