DomainParticipantImpl.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
00009 #include "DomainParticipantImpl.h"
00010 #include "dds/DdsDcpsGuidC.h"
00011 #include "FeatureDisabledQosCheck.h"
00012 #include "Service_Participant.h"
00013 #include "Qos_Helper.h"
00014 #include "GuidConverter.h"
00015 #include "PublisherImpl.h"
00016 #include "SubscriberImpl.h"
00017 #include "DataWriterImpl.h"
00018 #include "Marked_Default_Qos.h"
00019 #include "Registered_Data_Types.h"
00020 #include "Transient_Kludge.h"
00021 #include "DomainParticipantFactoryImpl.h"
00022 #include "Util.h"
00023 #include "MonitorFactory.h"
00024 #include "BitPubListenerImpl.h"
00025 #include "ContentFilteredTopicImpl.h"
00026 #include "MultiTopicImpl.h"
00027 #include "dds/DCPS/transport/framework/TransportRegistry.h"
00028 #include "dds/DCPS/transport/framework/TransportExceptions.h"
00029 
00030 #if defined(OPENDDS_SECURITY)
00031 #include "dds/DCPS/security/framework/SecurityRegistry.h"
00032 #endif
00033 
00034 #include "RecorderImpl.h"
00035 #include "ReplayerImpl.h"
00036 
00037 #if !defined (DDS_HAS_MINIMUM_BIT)
00038 #include "BuiltInTopicUtils.h"
00039 #include "dds/DdsDcpsCoreTypeSupportImpl.h"
00040 #endif // !defined (DDS_HAS_MINIMUM_BIT)
00041 
00042 #include "tao/debug.h"
00043 #include "ace/Reactor.h"
00044 #include "ace/OS_NS_unistd.h"
00045 
00046 namespace Util {
00047 
00048   template <typename Key>
00049   int find(
00050     OpenDDS::DCPS::DomainParticipantImpl::TopicMap& c,
00051     const Key& key,
00052     OpenDDS::DCPS::DomainParticipantImpl::TopicMap::mapped_type*& value)
00053   {
00054     OpenDDS::DCPS::DomainParticipantImpl::TopicMap::iterator iter =
00055       c.find(key);
00056 
00057     if (iter == c.end()) {
00058       return -1;
00059     }
00060 
00061     value = &iter->second;
00062     return 0;
00063   }
00064 
00065   DDS::PropertySeq filter_properties(const DDS::PropertySeq& properties, const std::string& prefix)
00066   {
00067     DDS::PropertySeq result(properties.length());
00068     result.length(properties.length());
00069     size_t count = 0;
00070     for (size_t i = 0, len = properties.length(); i < len; ++i) {
00071       if (std::string(properties[i].name.in()).find(prefix) == 0) {
00072         result[count++] = properties[i];
00073       }
00074     }
00075     result.length(count);
00076     return result;
00077   }
00078 
00079 } // namespace Util
00080 
00081 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00082 
00083 namespace OpenDDS {
00084 namespace DCPS {
00085 
00086 //TBD - add check for enabled in most methods.
00087 //      Currently this is not needed because auto_enable_created_entities
00088 //      cannot be false.
00089 
00090 // Implementation skeleton constructor
00091 DomainParticipantImpl::DomainParticipantImpl(DomainParticipantFactoryImpl *     factory,
00092                                              const DDS::DomainId_t&             domain_id,
00093                                              const DDS::DomainParticipantQos &  qos,
00094                                              DDS::DomainParticipantListener_ptr a_listener,
00095                                              const DDS::StatusMask &            mask)
00096   : factory_(factory),
00097     default_topic_qos_(TheServiceParticipant->initial_TopicQos()),
00098     default_publisher_qos_(TheServiceParticipant->initial_PublisherQos()),
00099     default_subscriber_qos_(TheServiceParticipant->initial_SubscriberQos()),
00100     qos_(qos),
00101     domain_id_(domain_id),
00102     dp_id_(GUID_UNKNOWN),
00103     federated_(false),
00104     shutdown_condition_(shutdown_mutex_),
00105     shutdown_complete_(false),
00106     monitor_(0),
00107     pub_id_gen_(dp_id_),
00108     automatic_liveliness_timer_ (*this),
00109     participant_liveliness_timer_ (*this)
00110 {
00111   (void) this->set_listener(a_listener, mask);
00112   monitor_ = TheServiceParticipant->monitor_factory_->create_dp_monitor(this);
00113 }
00114 
00115 DomainParticipantImpl::~DomainParticipantImpl()
00116 {
00117 }
00118 
00119 DDS::Publisher_ptr
00120 DomainParticipantImpl::create_publisher(
00121   const DDS::PublisherQos & qos,
00122   DDS::PublisherListener_ptr a_listener,
00123   DDS::StatusMask mask)
00124 {
00125   DDS::PublisherQos pub_qos = qos;
00126 
00127   if (! this->validate_publisher_qos(pub_qos))
00128     return DDS::Publisher::_nil();
00129 
00130   PublisherImpl* pub = 0;
00131   ACE_NEW_RETURN(pub,
00132                  PublisherImpl(participant_handles_.next(),
00133                                pub_id_gen_.next(),
00134                                pub_qos,
00135                                a_listener,
00136                                mask,
00137                                this),
00138                  DDS::Publisher::_nil());
00139 
00140   if ((enabled_ == true) && (qos_.entity_factory.autoenable_created_entities)) {
00141     pub->enable();
00142   }
00143 
00144   DDS::Publisher_ptr pub_obj(pub);
00145 
00146   // this object will also act as the guard for leaking Publisher Impl
00147   Publisher_Pair pair(pub, pub_obj, NO_DUP);
00148 
00149   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00150                    tao_mon,
00151                    this->publishers_protector_,
00152                    DDS::Publisher::_nil());
00153 
00154   if (OpenDDS::DCPS::insert(publishers_, pair) == -1) {
00155     ACE_ERROR((LM_ERROR,
00156                ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::create_publisher, ")
00157                ACE_TEXT("%p\n"),
00158                ACE_TEXT("insert")));
00159     return DDS::Publisher::_nil();
00160   }
00161 
00162   return DDS::Publisher::_duplicate(pub_obj);
00163 }
00164 
00165 DDS::ReturnCode_t
00166 DomainParticipantImpl::delete_publisher(
00167   DDS::Publisher_ptr p)
00168 {
00169   // The servant's ref count should be 2 at this point,
00170   // one referenced by poa, one referenced by the subscriber
00171   // set.
00172   PublisherImpl* the_servant = dynamic_cast<PublisherImpl*>(p);
00173 
00174   if (!the_servant) {
00175     ACE_ERROR((LM_ERROR,
00176       ACE_TEXT("(%P|%t) ERROR: ")
00177       ACE_TEXT("DomainParticipantImpl::delete_publisher, ")
00178       ACE_TEXT("Failed to obtain PublisherImpl.\n")));
00179     return DDS::RETCODE_ERROR;
00180   }
00181 
00182   if (!the_servant->is_clean()) {
00183     ACE_ERROR((LM_ERROR,
00184                ACE_TEXT("(%P|%t) ERROR: ")
00185                ACE_TEXT("DomainParticipantImpl::delete_publisher, ")
00186                ACE_TEXT("The publisher is not empty.\n")));
00187     return DDS::RETCODE_PRECONDITION_NOT_MET;
00188   }
00189 
00190   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00191                    tao_mon,
00192                    this->publishers_protector_,
00193                    DDS::RETCODE_ERROR);
00194 
00195   Publisher_Pair pair(the_servant, p, DUP);
00196 
00197   if (OpenDDS::DCPS::remove(publishers_, pair) == -1) {
00198     ACE_ERROR((LM_ERROR,
00199                ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::delete_publisher, ")
00200                ACE_TEXT("%p\n"),
00201                ACE_TEXT("remove")));
00202     return DDS::RETCODE_ERROR;
00203 
00204   } else {
00205     return DDS::RETCODE_OK;
00206   }
00207 }
00208 
00209 DDS::Subscriber_ptr
00210 DomainParticipantImpl::create_subscriber(
00211   const DDS::SubscriberQos & qos,
00212   DDS::SubscriberListener_ptr a_listener,
00213   DDS::StatusMask mask)
00214 {
00215   DDS::SubscriberQos sub_qos = qos;
00216 
00217   if (! this->validate_subscriber_qos(sub_qos)) {
00218     return DDS::Subscriber::_nil();
00219   }
00220 
00221   SubscriberImpl* sub = 0 ;
00222   ACE_NEW_RETURN(sub,
00223                  SubscriberImpl(participant_handles_.next(),
00224                                 sub_qos,
00225                                 a_listener,
00226                                 mask,
00227                                 this),
00228                  DDS::Subscriber::_nil());
00229 
00230   if ((enabled_ == true) && (qos_.entity_factory.autoenable_created_entities)) {
00231     sub->enable();
00232   }
00233 
00234   DDS::Subscriber_ptr sub_obj(sub);
00235 
00236   Subscriber_Pair pair(sub, sub_obj, NO_DUP);
00237 
00238   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00239                    tao_mon,
00240                    this->subscribers_protector_,
00241                    DDS::Subscriber::_nil());
00242 
00243   if (OpenDDS::DCPS::insert(subscribers_, pair) == -1) {
00244     ACE_ERROR((LM_ERROR,
00245                ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::create_subscriber, ")
00246                ACE_TEXT("%p\n"),
00247                ACE_TEXT("insert")));
00248     return DDS::Subscriber::_nil();
00249   }
00250 
00251   return DDS::Subscriber::_duplicate(sub_obj);
00252 }
00253 
00254 DDS::ReturnCode_t
00255 DomainParticipantImpl::delete_subscriber(
00256   DDS::Subscriber_ptr s)
00257 {
00258   // The servant's ref count should be 2 at this point,
00259   // one referenced by poa, one referenced by the subscriber
00260   // set.
00261   SubscriberImpl* the_servant = dynamic_cast<SubscriberImpl*>(s);
00262 
00263   if (!the_servant) {
00264     ACE_ERROR((LM_ERROR,
00265       ACE_TEXT("(%P|%t) ERROR: ")
00266       ACE_TEXT("DomainParticipantImpl::delete_subscriber, ")
00267       ACE_TEXT("Failed to obtain SubscriberImpl.\n")));
00268     return DDS::RETCODE_ERROR;
00269   }
00270 
00271   if (!the_servant->is_clean()) {
00272     ACE_ERROR((LM_ERROR,
00273                ACE_TEXT("(%P|%t) ERROR: ")
00274                ACE_TEXT("DomainParticipantImpl::delete_subscriber, ")
00275                ACE_TEXT("The subscriber is not empty.\n")));
00276     return DDS::RETCODE_PRECONDITION_NOT_MET;
00277   }
00278 
00279   DDS::ReturnCode_t ret
00280   = the_servant->delete_contained_entities();
00281 
00282   if (ret != DDS::RETCODE_OK) {
00283     ACE_ERROR((LM_ERROR,
00284                ACE_TEXT("(%P|%t) ERROR: ")
00285                ACE_TEXT("DomainParticipantImpl::delete_subscriber, ")
00286                ACE_TEXT("Failed to delete contained entities.\n")));
00287     return DDS::RETCODE_ERROR;
00288   }
00289 
00290   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00291                    tao_mon,
00292                    this->subscribers_protector_,
00293                    DDS::RETCODE_ERROR);
00294 
00295   Subscriber_Pair pair(the_servant, s, DUP);
00296 
00297   if (OpenDDS::DCPS::remove(subscribers_, pair) == -1) {
00298     ACE_ERROR((LM_ERROR,
00299                ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::delete_subscriber, ")
00300                ACE_TEXT("%p\n"),
00301                ACE_TEXT("remove")));
00302     return DDS::RETCODE_ERROR;
00303 
00304   } else {
00305     return DDS::RETCODE_OK;
00306   }
00307 }
00308 
00309 DDS::Subscriber_ptr
00310 DomainParticipantImpl::get_builtin_subscriber()
00311 {
00312   return DDS::Subscriber::_duplicate(bit_subscriber_.in());
00313 }
00314 
00315 DDS::Topic_ptr
00316 DomainParticipantImpl::create_topic(
00317   const char * topic_name,
00318   const char * type_name,
00319   const DDS::TopicQos & qos,
00320   DDS::TopicListener_ptr a_listener,
00321   DDS::StatusMask mask)
00322 {
00323   return create_topic_i(topic_name,
00324                         type_name,
00325                         qos,
00326                         a_listener,
00327                         mask,
00328                         0);
00329 }
00330 
00331 DDS::Topic_ptr
00332 DomainParticipantImpl::create_typeless_topic(
00333   const char * topic_name,
00334   const char * type_name,
00335   bool type_has_keys,
00336   const DDS::TopicQos & qos,
00337   DDS::TopicListener_ptr a_listener,
00338   DDS::StatusMask mask)
00339 {
00340   int topic_mask = (type_has_keys ? TOPIC_TYPE_HAS_KEYS : 0 ) | TOPIC_TYPELESS;
00341 
00342   return create_topic_i(topic_name,
00343                         type_name,
00344                         qos,
00345                         a_listener,
00346                         mask,
00347                         topic_mask);
00348 }
00349 
00350 
00351 DDS::Topic_ptr
00352 DomainParticipantImpl::create_topic_i(
00353   const char * topic_name,
00354   const char * type_name,
00355   const DDS::TopicQos & qos,
00356   DDS::TopicListener_ptr a_listener,
00357   DDS::StatusMask mask,
00358   int topic_mask)
00359 {
00360   DDS::TopicQos topic_qos;
00361 
00362   if (qos == TOPIC_QOS_DEFAULT) {
00363     this->get_default_topic_qos(topic_qos);
00364 
00365   } else {
00366     topic_qos = qos;
00367   }
00368 
00369   OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(qos, DDS::Topic::_nil());
00370   OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(qos, DDS::Topic::_nil());
00371   OPENDDS_NO_DURABILITY_SERVICE_COMPATIBILITY_CHECK(qos, DDS::Topic::_nil());
00372   OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(qos, DDS::Topic::_nil());
00373 
00374   if (!Qos_Helper::valid(topic_qos)) {
00375     ACE_ERROR((LM_ERROR,
00376                ACE_TEXT("(%P|%t) ERROR: ")
00377                ACE_TEXT("DomainParticipantImpl::create_topic, ")
00378                ACE_TEXT("invalid qos.\n")));
00379     return DDS::Topic::_nil();
00380   }
00381 
00382   if (!Qos_Helper::consistent(topic_qos)) {
00383     ACE_ERROR((LM_ERROR,
00384                ACE_TEXT("(%P|%t) ERROR: ")
00385                ACE_TEXT("DomainParticipantImpl::create_topic, ")
00386                ACE_TEXT("inconsistent qos.\n")));
00387     return DDS::Topic::_nil();
00388   }
00389 
00390   TopicMap::mapped_type* entry = 0;
00391   bool found = false;
00392   {
00393     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00394                      tao_mon,
00395                      this->topics_protector_,
00396                      DDS::Topic::_nil());
00397 
00398 #if !defined(OPENDDS_NO_CONTENT_FILTERED_TOPIC) || !defined(OPENDDS_NO_MULTI_TOPIC)
00399     if (topic_descrs_.count(topic_name)) {
00400       if (DCPS_debug_level > 3) {
00401         ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00402           ACE_TEXT("DomainParticipantImpl::create_topic, ")
00403           ACE_TEXT("can't create a Topic due to name \"%C\" already in use ")
00404           ACE_TEXT("by a TopicDescription.\n"), topic_name));
00405       }
00406       return 0;
00407     }
00408 #endif
00409 
00410     if (Util::find(topics_, topic_name, entry) == 0) {
00411       found = true;
00412     }
00413   }
00414 
00415   if (found) {
00416     CORBA::String_var found_type
00417     = entry->pair_.svt_->get_type_name();
00418 
00419     if (ACE_OS::strcmp(type_name, found_type) == 0) {
00420       DDS::TopicQos found_qos;
00421       entry->pair_.svt_->get_qos(found_qos);
00422 
00423       if (topic_qos == found_qos) { // match type name, qos
00424         {
00425           ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00426                            tao_mon,
00427                            this->topics_protector_,
00428                            DDS::Topic::_nil());
00429           ++entry->client_refs_;
00430         }
00431         return DDS::Topic::_duplicate(entry->pair_.obj_.in());
00432 
00433       } else {
00434         if (DCPS_debug_level >= 1) {
00435           ACE_DEBUG((LM_DEBUG,
00436                      ACE_TEXT("(%P|%t) DomainParticipantImpl::create_topic, ")
00437                      ACE_TEXT("qos not match: topic_name=%C type_name=%C\n"),
00438                      topic_name, type_name));
00439         }
00440 
00441         return DDS::Topic::_nil();
00442       }
00443 
00444     } else { // no match
00445       if (DCPS_debug_level >= 1) {
00446         ACE_DEBUG((LM_DEBUG,
00447                    ACE_TEXT("(%P|%t) DomainParticipantImpl::create_topic, ")
00448                    ACE_TEXT(" not match: topic_name=%C type_name=%C\n"),
00449                    topic_name, type_name));
00450       }
00451 
00452       return DDS::Topic::_nil();
00453     }
00454 
00455   } else {
00456 
00457     OpenDDS::DCPS::TypeSupport_var type_support;
00458     bool has_keys = (topic_mask & TOPIC_TYPE_HAS_KEYS);
00459 
00460     if (0 == topic_mask) {
00461        // creating a topic with compile time type
00462       type_support = Registered_Data_Types->lookup(this, type_name);
00463       if (CORBA::is_nil(type_support)) {
00464         if (DCPS_debug_level >= 1) {
00465             ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00466                        ACE_TEXT("DomainParticipantImpl::create_topic, ")
00467                        ACE_TEXT("can't create a topic=%C type_name=%C ")
00468                        ACE_TEXT("is not registered.\n"),
00469                        topic_name, type_name));
00470         }
00471         return DDS::Topic::_nil();
00472       }
00473       has_keys = type_support->has_dcps_key();
00474     }
00475 
00476     RepoId topic_id = GUID_UNKNOWN;
00477     TopicStatus status = TOPIC_DISABLED;
00478 
00479     if (is_enabled()) {
00480       Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
00481       status = disco->assert_topic(topic_id,
00482                                    domain_id_,
00483                                    dp_id_,
00484                                    topic_name,
00485                                    type_name,
00486                                    topic_qos,
00487                                    has_keys);
00488     }
00489 
00490     if (status == CREATED || status == FOUND || status == TOPIC_DISABLED) {
00491       DDS::Topic_ptr new_topic = create_new_topic(topic_id,
00492                                                   topic_name,
00493                                                   type_name,
00494                                                   topic_qos,
00495                                                   a_listener,
00496                                                   mask,
00497                                                   type_support);
00498       if (this->monitor_) {
00499         this->monitor_->report();
00500       }
00501       return new_topic;
00502 
00503     } else {
00504       ACE_ERROR((LM_ERROR,
00505                  ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::create_topic, ")
00506                  ACE_TEXT("assert_topic failed with return value %d.\n"), status));
00507       return DDS::Topic::_nil();
00508     }
00509   }
00510 }
00511 
00512 DDS::ReturnCode_t
00513 DomainParticipantImpl::delete_topic(
00514   DDS::Topic_ptr a_topic)
00515 {
00516   return delete_topic_i(a_topic, false);
00517 }
00518 
00519 DDS::ReturnCode_t
00520 DomainParticipantImpl::delete_topic_i(
00521   DDS::Topic_ptr a_topic,
00522   bool             remove_objref)
00523 {
00524 
00525   DDS::ReturnCode_t ret = DDS::RETCODE_OK;
00526 
00527   try {
00528     // The servant's ref count should be greater than 2 at this point,
00529     // one referenced by poa, one referenced by the topic map and
00530     // others referenced by the datareader/datawriter.
00531     TopicImpl* the_topic_servant = dynamic_cast<TopicImpl*>(a_topic);
00532 
00533     if (!the_topic_servant) {
00534       ACE_ERROR_RETURN((LM_ERROR,
00535         ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::delete_topic_i, ")
00536         ACE_TEXT("%p\n"),
00537         ACE_TEXT("failed to obtain TopicImpl.")),
00538         DDS::RETCODE_ERROR);
00539     }
00540 
00541     CORBA::String_var topic_name = the_topic_servant->get_name();
00542 
00543     DDS::DomainParticipant_var dp = the_topic_servant->get_participant();
00544 
00545     DomainParticipantImpl* the_dp_servant =
00546       dynamic_cast<DomainParticipantImpl*>(dp.in());
00547 
00548     if (the_dp_servant != this ||
00549         (!remove_objref && the_topic_servant->has_entity_refs())) {
00550       // If entity_refs is true (nonzero), then some reader or writer is using
00551       // this topic and the spec requires delete_topic() to fail with the error:
00552       return DDS::RETCODE_PRECONDITION_NOT_MET;
00553     }
00554 
00555     {
00556       ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00557                        tao_mon,
00558                        this->topics_protector_,
00559                        DDS::RETCODE_ERROR);
00560 
00561       TopicMap::mapped_type* entry = 0;
00562 
00563       if (Util::find(topics_, topic_name.in(), entry) == -1) {
00564         ACE_ERROR_RETURN((LM_ERROR,
00565                           ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::delete_topic_i, ")
00566                           ACE_TEXT("%p\n"),
00567                           ACE_TEXT("find")),
00568                          DDS::RETCODE_ERROR);
00569       }
00570 
00571       --entry->client_refs_;
00572 
00573       if (remove_objref == true ||
00574           0 == entry->client_refs_) {
00575         //TBD - mark the TopicImpl as deleted and make it
00576         //      reject calls to the TopicImpl.
00577         Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
00578         TopicStatus status
00579         = disco->remove_topic(the_dp_servant->get_domain_id(),
00580                               the_dp_servant->get_id(),
00581                               the_topic_servant->get_id());
00582 
00583         if (status != REMOVED) {
00584           ACE_ERROR_RETURN((LM_ERROR,
00585                             ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::delete_topic_i, ")
00586                             ACE_TEXT("remove_topic failed with return value %d\n"), status),
00587                            DDS::RETCODE_ERROR);
00588         }
00589 
00590         // note: this will destroy the TopicImpl if there are no
00591         // client object reference to it.
00592         if (topics_.erase(topic_name.in()) == 0) {
00593           ACE_ERROR_RETURN((LM_ERROR,
00594                             ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::delete_topic_i, ")
00595                             ACE_TEXT("%p \n"),
00596                             ACE_TEXT("unbind")),
00597                            DDS::RETCODE_ERROR);
00598 
00599         } else
00600           return DDS::RETCODE_OK;
00601 
00602       }
00603     }
00604 
00605   } catch (...) {
00606     ACE_ERROR((LM_ERROR,
00607                ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::delete_topic_i, ")
00608                ACE_TEXT(" Caught Unknown Exception \n")));
00609     ret = DDS::RETCODE_ERROR;
00610   }
00611 
00612   return ret;
00613 }
00614 
00615 //Note: caller should NOT assign to Topic_var (without _duplicate'ing)
00616 //      because it will steal the framework's reference.
00617 DDS::Topic_ptr
00618 DomainParticipantImpl::find_topic(
00619   const char * topic_name,
00620   const DDS::Duration_t & timeout)
00621 {
00622   ACE_Time_Value timeout_tv
00623   = ACE_OS::gettimeofday() + ACE_Time_Value(timeout.sec, timeout.nanosec/1000);
00624 
00625   bool first_time = true;
00626 
00627   while (first_time || ACE_OS::gettimeofday() < timeout_tv) {
00628     if (first_time) {
00629       first_time = false;
00630     }
00631 
00632     TopicMap::mapped_type* entry = 0;
00633     {
00634       ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00635                        tao_mon,
00636                        this->topics_protector_,
00637                        DDS::Topic::_nil());
00638 
00639       if (Util::find(topics_, topic_name, entry) == 0) {
00640         ++entry->client_refs_;
00641         return DDS::Topic::_duplicate(entry->pair_.obj_.in());
00642       }
00643     }
00644 
00645     RepoId topic_id;
00646     CORBA::String_var type_name;
00647     DDS::TopicQos_var qos;
00648 
00649     Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
00650     TopicStatus status = disco->find_topic(domain_id_,
00651                                            topic_name,
00652                                            type_name.out(),
00653                                            qos.out(),
00654                                            topic_id);
00655 
00656 
00657     if (status == FOUND) {
00658       OpenDDS::DCPS::TypeSupport_var type_support =
00659         Registered_Data_Types->lookup(this, type_name.in());
00660       if (CORBA::is_nil(type_support)) {
00661         if (DCPS_debug_level) {
00662             ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00663                        ACE_TEXT("DomainParticipantImpl::find_topic, ")
00664                        ACE_TEXT("can't create a Topic: type_name \"%C\" ")
00665                        ACE_TEXT("is not registered.\n"), type_name.in()));
00666         }
00667 
00668         return DDS::Topic::_nil();
00669       }
00670 
00671       DDS::Topic_ptr new_topic = create_new_topic(topic_id,
00672                                                   topic_name,
00673                                                   type_name,
00674                                                   qos,
00675                                                   DDS::TopicListener::_nil(),
00676                                                   OpenDDS::DCPS::DEFAULT_STATUS_MASK,
00677                                                   type_support);
00678       return new_topic;
00679 
00680     } else if (status == INTERNAL_ERROR) {
00681       ACE_ERROR((LM_ERROR,
00682                  ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::find_topic - ")
00683                  ACE_TEXT("topic not found, discovery returned INTERNAL_ERROR!\n")));
00684       return DDS::Topic::_nil();
00685     } else {
00686       ACE_Time_Value now = ACE_OS::gettimeofday();
00687 
00688       if (now < timeout_tv) {
00689         ACE_Time_Value remaining = timeout_tv - now;
00690 
00691         if (remaining.sec() >= 1) {
00692           ACE_OS::sleep(1);
00693 
00694         } else {
00695           ACE_OS::sleep(remaining);
00696         }
00697       }
00698     }
00699   }
00700 
00701   if (DCPS_debug_level >= 1) {
00702     // timed out
00703     ACE_DEBUG((LM_DEBUG,
00704                ACE_TEXT("(%P|%t) DomainParticipantImpl::find_topic, ")
00705                ACE_TEXT("timed out. \n")));
00706   }
00707 
00708   return DDS::Topic::_nil();
00709 }
00710 
00711 DDS::TopicDescription_ptr
00712 DomainParticipantImpl::lookup_topicdescription(const char* name)
00713 {
00714   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00715                    tao_mon,
00716                    this->topics_protector_,
00717                    DDS::Topic::_nil());
00718 
00719   TopicMap::mapped_type* entry = 0;
00720 
00721   if (Util::find(topics_, name, entry) == -1) {
00722 #if !defined(OPENDDS_NO_CONTENT_FILTERED_TOPIC) || !defined(OPENDDS_NO_MULTI_TOPIC)
00723     TopicDescriptionMap::iterator iter = topic_descrs_.find(name);
00724     if (iter != topic_descrs_.end()) {
00725       return DDS::TopicDescription::_duplicate(iter->second);
00726     }
00727 #endif
00728     return DDS::TopicDescription::_nil();
00729 
00730   } else {
00731     return DDS::TopicDescription::_duplicate(entry->pair_.obj_.in());
00732   }
00733 }
00734 
00735 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00736 
00737 DDS::ContentFilteredTopic_ptr
00738 DomainParticipantImpl::create_contentfilteredtopic(
00739   const char* name,
00740   DDS::Topic_ptr related_topic,
00741   const char* filter_expression,
00742   const DDS::StringSeq& expression_parameters)
00743 {
00744   if (CORBA::is_nil(related_topic)) {
00745     if (DCPS_debug_level > 3) {
00746       ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00747         ACE_TEXT("DomainParticipantImpl::create_contentfilteredtopic, ")
00748         ACE_TEXT("can't create a content-filtered topic due to null related ")
00749         ACE_TEXT("topic.\n")));
00750     }
00751     return 0;
00752   }
00753 
00754   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, topics_protector_, 0);
00755 
00756   if (topics_.count(name)) {
00757     if (DCPS_debug_level > 3) {
00758       ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00759         ACE_TEXT("DomainParticipantImpl::create_contentfilteredtopic, ")
00760         ACE_TEXT("can't create a content-filtered topic due to name \"%C\" ")
00761         ACE_TEXT("already in use by a Topic.\n"), name));
00762     }
00763     return 0;
00764   }
00765 
00766   if (topic_descrs_.count(name)) {
00767     if (DCPS_debug_level > 3) {
00768       ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00769         ACE_TEXT("DomainParticipantImpl::create_contentfilteredtopic, ")
00770         ACE_TEXT("can't create a content-filtered topic due to name \"%C\" ")
00771         ACE_TEXT("already in use by a TopicDescription.\n"), name));
00772     }
00773     return 0;
00774   }
00775 
00776   DDS::ContentFilteredTopic_var cft;
00777   try {
00778     // Create the cft in two steps so that we only have one place to
00779     // check the expression parameters
00780     cft = new ContentFilteredTopicImpl(name, related_topic, filter_expression, this);
00781     if (cft->set_expression_parameters(expression_parameters) != DDS::RETCODE_OK) {
00782       return 0;
00783     }
00784   } catch (const std::exception& e) {
00785     if (DCPS_debug_level) {
00786       ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00787         ACE_TEXT("DomainParticipantImpl::create_contentfilteredtopic, ")
00788         ACE_TEXT("can't create a content-filtered topic due to runtime error: ")
00789         ACE_TEXT("%C.\n"), e.what()));
00790     }
00791     return 0;
00792   }
00793   DDS::TopicDescription_var td = DDS::TopicDescription::_duplicate(cft);
00794   topic_descrs_[name] = td;
00795   return cft._retn();
00796 }
00797 
00798 DDS::ReturnCode_t DomainParticipantImpl::delete_contentfilteredtopic(
00799   DDS::ContentFilteredTopic_ptr a_contentfilteredtopic)
00800 {
00801   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, topics_protector_,
00802                    DDS::RETCODE_OUT_OF_RESOURCES);
00803   DDS::ContentFilteredTopic_var cft =
00804     DDS::ContentFilteredTopic::_duplicate(a_contentfilteredtopic);
00805   CORBA::String_var name = cft->get_name();
00806   TopicDescriptionMap::iterator iter = topic_descrs_.find(name.in());
00807   if (iter == topic_descrs_.end()) {
00808     if (DCPS_debug_level > 3) {
00809       ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00810         ACE_TEXT("DomainParticipantImpl::delete_contentfilteredtopic, ")
00811         ACE_TEXT("can't delete a content-filtered topic \"%C\" ")
00812         ACE_TEXT("because it is not in the set.\n"), name.in ()));
00813     }
00814     return DDS::RETCODE_PRECONDITION_NOT_MET;
00815   }
00816 
00817   TopicDescriptionImpl* tdi = dynamic_cast<TopicDescriptionImpl*>(iter->second.in());
00818 
00819   if (!tdi) {
00820     if (DCPS_debug_level > 3) {
00821       ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00822         ACE_TEXT("DomainParticipantImpl::delete_contentfilteredtopic, ")
00823         ACE_TEXT("can't delete a content-filtered topic \"%C\" ")
00824         ACE_TEXT("failed to obtain TopicDescriptionImpl\n"), name.in()));
00825     }
00826     return DDS::RETCODE_ERROR;
00827   }
00828 
00829   if (tdi->has_entity_refs()) {
00830     if (DCPS_debug_level > 3) {
00831       ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00832         ACE_TEXT("DomainParticipantImpl::delete_contentfilteredtopic, ")
00833         ACE_TEXT("can't delete a content-filtered topic \"%C\" ")
00834         ACE_TEXT("because it is used by a datareader\n"), name.in ()));
00835     }
00836     return DDS::RETCODE_PRECONDITION_NOT_MET;
00837   }
00838   topic_descrs_.erase(iter);
00839   return DDS::RETCODE_OK;
00840 }
00841 
00842 #endif // OPENDDS_NO_CONTENT_FILTERED_TOPIC
00843 
00844 #ifndef OPENDDS_NO_MULTI_TOPIC
00845 
00846 DDS::MultiTopic_ptr DomainParticipantImpl::create_multitopic(
00847   const char* name, const char* type_name,
00848   const char* subscription_expression,
00849   const DDS::StringSeq& expression_parameters)
00850 {
00851   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, topics_protector_, 0);
00852 
00853   if (topics_.count(name)) {
00854     if (DCPS_debug_level > 3) {
00855       ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00856         ACE_TEXT("DomainParticipantImpl::create_multitopic, ")
00857         ACE_TEXT("can't create a multi topic due to name \"%C\" ")
00858         ACE_TEXT("already in use by a Topic.\n"), name));
00859     }
00860     return 0;
00861   }
00862 
00863   if (topic_descrs_.count(name)) {
00864     if (DCPS_debug_level > 3) {
00865       ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00866         ACE_TEXT("DomainParticipantImpl::create_multitopic, ")
00867         ACE_TEXT("can't create a multi topic due to name \"%C\" ")
00868         ACE_TEXT("already in use by a TopicDescription.\n"), name));
00869     }
00870     return 0;
00871   }
00872 
00873   DDS::MultiTopic_var mt;
00874   try {
00875     mt = new MultiTopicImpl(name, type_name, subscription_expression,
00876       expression_parameters, this);
00877   } catch (const std::exception& e) {
00878     if (DCPS_debug_level) {
00879       ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00880         ACE_TEXT("DomainParticipantImpl::create_multitopic, ")
00881         ACE_TEXT("can't create a multi topic due to runtime error: ")
00882         ACE_TEXT("%C.\n"), e.what()));
00883     }
00884     return 0;
00885   }
00886   DDS::TopicDescription_var td = DDS::TopicDescription::_duplicate(mt);
00887   topic_descrs_[name] = td;
00888   return mt._retn();
00889 }
00890 
00891 DDS::ReturnCode_t DomainParticipantImpl::delete_multitopic(
00892   DDS::MultiTopic_ptr a_multitopic)
00893 {
00894   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, topics_protector_,
00895                    DDS::RETCODE_OUT_OF_RESOURCES);
00896   DDS::MultiTopic_var mt = DDS::MultiTopic::_duplicate(a_multitopic);
00897   CORBA::String_var mt_name = mt->get_name();
00898   TopicDescriptionMap::iterator iter = topic_descrs_.find(mt_name.in());
00899   if (iter == topic_descrs_.end()) {
00900     if (DCPS_debug_level > 3) {
00901       ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00902         ACE_TEXT("DomainParticipantImpl::delete_multitopic, ")
00903         ACE_TEXT("can't delete a multitopic \"%C\" ")
00904         ACE_TEXT("because it is not in the set.\n"), mt_name.in ()));
00905     }
00906     return DDS::RETCODE_PRECONDITION_NOT_MET;
00907   }
00908 
00909   TopicDescriptionImpl* tdi = dynamic_cast<TopicDescriptionImpl*>(iter->second.in());
00910 
00911   if (!tdi) {
00912     if (DCPS_debug_level > 3) {
00913       ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00914         ACE_TEXT("DomainParticipantImpl::delete_multitopic, ")
00915         ACE_TEXT("can't delete a multitopic topic \"%C\" ")
00916         ACE_TEXT("failed to obtain TopicDescriptionImpl.\n"),
00917         mt_name.in()));
00918     }
00919     return DDS::RETCODE_ERROR;
00920   }
00921 
00922   if (tdi->has_entity_refs()) {
00923     if (DCPS_debug_level > 3) {
00924       ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00925         ACE_TEXT("DomainParticipantImpl::delete_multitopic, ")
00926         ACE_TEXT("can't delete a multitopic topic \"%C\" ")
00927         ACE_TEXT("because it is used by a datareader.\n"), mt_name.in ()));
00928     }
00929     return DDS::RETCODE_PRECONDITION_NOT_MET;
00930   }
00931   topic_descrs_.erase(iter);
00932   return DDS::RETCODE_OK;
00933 }
00934 
00935 #endif // OPENDDS_NO_MULTI_TOPIC
00936 
00937 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00938 
00939 RcHandle<FilterEvaluator>
00940 DomainParticipantImpl::get_filter_eval(const char* filter)
00941 {
00942   ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, filter_cache_lock_,
00943                    RcHandle<FilterEvaluator>());
00944 
00945   RcHandle<FilterEvaluator>& result = filter_cache_[filter];
00946   if (!result) {
00947     try {
00948       result = make_rch<FilterEvaluator>(filter, false);
00949     } catch (const std::exception& e) {
00950       filter_cache_.erase(filter);
00951       if (DCPS_debug_level) {
00952         ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00953                    ACE_TEXT("DomainParticipantImpl::get_filter_eval, ")
00954                    ACE_TEXT("can't create a writer-side content filter due to ")
00955                    ACE_TEXT("runtime error: %C.\n"), e.what()));
00956       }
00957     }
00958   }
00959   return result;
00960 }
00961 
00962 void
00963 DomainParticipantImpl::deref_filter_eval(const char* filter)
00964 {
00965   ACE_GUARD(ACE_Thread_Mutex, guard, filter_cache_lock_);
00966   typedef std::map<OPENDDS_STRING, RcHandle<FilterEvaluator> > Map;
00967   Map::iterator iter = filter_cache_.find(filter);
00968   if (iter != filter_cache_.end()) {
00969     if (iter->second->ref_count() == 1) {
00970       filter_cache_.erase(iter);
00971     }
00972   }
00973 }
00974 
00975 #endif
00976 
00977 DDS::ReturnCode_t
00978 DomainParticipantImpl::delete_contained_entities()
00979 {
00980   // mark that the entity is being deleted
00981   set_deleted(true);
00982 
00983   // BIT subscriber and data readers will be deleted with the
00984   // rest of the entities, so need to report to discovery that
00985   // BIT is no longer available
00986   Discovery_rch disc = TheServiceParticipant->get_discovery(this->domain_id_);
00987   if (disc)
00988     disc->fini_bit(this);
00989 
00990   if (ACE_OS::thr_equal(TheServiceParticipant->reactor_owner(),
00991                         ACE_Thread::self())) {
00992     handle_exception(0);
00993 
00994   } else {
00995     TheServiceParticipant->reactor()->notify(this);
00996 
00997     shutdown_mutex_.acquire();
00998     while (!shutdown_complete_) {
00999       shutdown_condition_.wait();
01000     }
01001     shutdown_complete_ = false;
01002     shutdown_mutex_.release();
01003   }
01004 
01005   bit_subscriber_ = DDS::Subscriber::_nil();
01006 
01007   OpenDDS::DCPS::Registered_Data_Types->unregister_participant(this);
01008 
01009   // the participant can now start creating new contained entities
01010   set_deleted(false);
01011   return shutdown_result_;
01012 }
01013 
01014 CORBA::Boolean
01015 DomainParticipantImpl::contains_entity(DDS::InstanceHandle_t a_handle)
01016 {
01017   /// Check top-level containers for Topic, Subscriber,
01018   /// and Publisher instances.
01019   {
01020     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01021                      guard,
01022                      this->topics_protector_,
01023                      false);
01024 
01025     for (TopicMap::iterator it(topics_.begin());
01026          it != topics_.end(); ++it) {
01027       if (a_handle == it->second.pair_.svt_->get_instance_handle())
01028         return true;
01029     }
01030   }
01031 
01032   {
01033     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01034                      guard,
01035                      this->subscribers_protector_,
01036                      false);
01037 
01038     for (SubscriberSet::iterator it(subscribers_.begin());
01039          it != subscribers_.end(); ++it) {
01040       if (a_handle == it->svt_->get_instance_handle())
01041         return true;
01042     }
01043   }
01044 
01045   {
01046     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01047                      guard,
01048                      this->publishers_protector_,
01049                      false);
01050 
01051     for (PublisherSet::iterator it(publishers_.begin());
01052          it != publishers_.end(); ++it) {
01053       if (a_handle == it->svt_->get_instance_handle())
01054         return true;
01055     }
01056   }
01057 
01058   /// Recurse into SubscriberImpl and PublisherImpl for
01059   /// DataReader and DataWriter instances respectively.
01060   for (SubscriberSet::iterator it(subscribers_.begin());
01061        it != subscribers_.end(); ++it) {
01062     if (it->svt_->contains_reader(a_handle))
01063       return true;
01064   }
01065 
01066   for (PublisherSet::iterator it(publishers_.begin());
01067        it != publishers_.end(); ++it) {
01068     if (it->svt_->contains_writer(a_handle))
01069       return true;
01070   }
01071 
01072   return false;
01073 }
01074 
01075 DDS::ReturnCode_t
01076 DomainParticipantImpl::set_qos(
01077   const DDS::DomainParticipantQos & qos)
01078 {
01079   if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
01080     if (qos_ == qos)
01081       return DDS::RETCODE_OK;
01082 
01083     // for the not changeable qos, it can be changed before enable
01084     if (!Qos_Helper::changeable(qos_, qos) && enabled_ == true) {
01085       return DDS::RETCODE_IMMUTABLE_POLICY;
01086 
01087     } else {
01088       qos_ = qos;
01089 
01090       Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
01091       const bool status =
01092         disco->update_domain_participant_qos(domain_id_,
01093                                              dp_id_,
01094                                              qos_);
01095 
01096       if (!status) {
01097         ACE_ERROR_RETURN((LM_ERROR,
01098                           ACE_TEXT("(%P|%t) DomainParticipantImpl::set_qos, ")
01099                           ACE_TEXT("failed on compatibility check. \n")),
01100                          DDS::RETCODE_ERROR);
01101       }
01102     }
01103 
01104     return DDS::RETCODE_OK;
01105 
01106   } else {
01107     return DDS::RETCODE_INCONSISTENT_POLICY;
01108   }
01109 }
01110 
01111 DDS::ReturnCode_t
01112 DomainParticipantImpl::get_qos(
01113   DDS::DomainParticipantQos & qos)
01114 {
01115   qos = qos_;
01116   return DDS::RETCODE_OK;
01117 }
01118 
01119 DDS::ReturnCode_t
01120 DomainParticipantImpl::set_listener(
01121   DDS::DomainParticipantListener_ptr a_listener,
01122   DDS::StatusMask mask)
01123 {
01124   listener_mask_ = mask;
01125   //note: OK to duplicate  a nil object ref
01126   listener_ = DDS::DomainParticipantListener::_duplicate(a_listener);
01127   return DDS::RETCODE_OK;
01128 }
01129 
01130 DDS::DomainParticipantListener_ptr
01131 DomainParticipantImpl::get_listener()
01132 {
01133   return DDS::DomainParticipantListener::_duplicate(listener_.in());
01134 }
01135 
01136 DDS::ReturnCode_t
01137 DomainParticipantImpl::ignore_participant(
01138   DDS::InstanceHandle_t handle)
01139 {
01140 #if !defined (DDS_HAS_MINIMUM_BIT)
01141 
01142   if (enabled_ == false) {
01143     ACE_ERROR_RETURN((LM_ERROR,
01144                       ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::ignore_participant, ")
01145                       ACE_TEXT("Entity is not enabled. \n")),
01146                      DDS::RETCODE_NOT_ENABLED);
01147   }
01148 
01149   RepoId ignoreId = get_repoid(handle);
01150   HandleMap::const_iterator location = this->ignored_participants_.find(ignoreId);
01151 
01152   if (location == this->ignored_participants_.end()) {
01153     this->ignored_participants_[ ignoreId] = handle;
01154   }
01155   else {// ignore same participant again, just return ok.
01156     return DDS::RETCODE_OK;
01157   }
01158 
01159   if (DCPS_debug_level >= 4) {
01160     GuidConverter converter(dp_id_);
01161     ACE_DEBUG((LM_DEBUG,
01162                ACE_TEXT("(%P|%t) DomainParticipantImpl::ignore_participant: ")
01163                ACE_TEXT("%C ignoring handle %x.\n"),
01164                OPENDDS_STRING(converter).c_str(),
01165                handle));
01166   }
01167 
01168   Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
01169   if (!disco->ignore_domain_participant(domain_id_,
01170                                         dp_id_,
01171                                         ignoreId)) {
01172     ACE_ERROR_RETURN((LM_ERROR,
01173                       ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::ignore_participant, ")
01174                       ACE_TEXT("Could not ignore domain participant.\n")),
01175                      DDS::RETCODE_NOT_ENABLED);
01176     return DDS::RETCODE_ERROR;
01177   }
01178 
01179 
01180   if (DCPS_debug_level >= 4) {
01181     GuidConverter converter(dp_id_);
01182     ACE_DEBUG((LM_DEBUG,
01183                ACE_TEXT("(%P|%t) DomainParticipantImpl::ignore_participant: ")
01184                ACE_TEXT("%C repo call returned.\n"),
01185                OPENDDS_STRING(converter).c_str()));
01186   }
01187 
01188   return DDS::RETCODE_OK;
01189 #else
01190   ACE_UNUSED_ARG(handle);
01191   return DDS::RETCODE_UNSUPPORTED;
01192 #endif // !defined (DDS_HAS_MINIMUM_BIT)
01193 }
01194 
01195 DDS::ReturnCode_t
01196 DomainParticipantImpl::ignore_topic(
01197   DDS::InstanceHandle_t handle)
01198 {
01199 #if !defined (DDS_HAS_MINIMUM_BIT)
01200 
01201   if (enabled_ == false) {
01202     ACE_ERROR_RETURN((LM_ERROR,
01203                       ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::ignore_topic, ")
01204                       ACE_TEXT(" Entity is not enabled. \n")),
01205                      DDS::RETCODE_NOT_ENABLED);
01206   }
01207 
01208   RepoId ignoreId = get_repoid(handle);
01209   HandleMap::const_iterator location = this->ignored_topics_.find(ignoreId);
01210 
01211   if (location == this->ignored_topics_.end()) {
01212     this->ignored_topics_[ ignoreId] = handle;
01213   }
01214   else { // ignore same topic again, just return ok.
01215     return DDS::RETCODE_OK;
01216   }
01217 
01218   if (DCPS_debug_level >= 4) {
01219     GuidConverter converter(dp_id_);
01220     ACE_DEBUG((LM_DEBUG,
01221                ACE_TEXT("(%P|%t) DomainParticipantImpl::ignore_topic: ")
01222                ACE_TEXT("%C ignoring handle %x.\n"),
01223                OPENDDS_STRING(converter).c_str(),
01224                handle));
01225   }
01226 
01227   Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
01228   if (!disco->ignore_topic(domain_id_,
01229                            dp_id_,
01230                            ignoreId)) {
01231     ACE_ERROR((LM_ERROR,
01232                ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::ignore_topic, ")
01233                ACE_TEXT(" Could not ignore topic.\n")));
01234   }
01235 
01236   return DDS::RETCODE_OK;
01237 #else
01238   ACE_UNUSED_ARG(handle);
01239   return DDS::RETCODE_UNSUPPORTED;
01240 #endif // !defined (DDS_HAS_MINIMUM_BIT)
01241 }
01242 
01243 DDS::ReturnCode_t
01244 DomainParticipantImpl::ignore_publication(
01245   DDS::InstanceHandle_t handle)
01246 {
01247 #if !defined (DDS_HAS_MINIMUM_BIT)
01248 
01249   if (enabled_ == false) {
01250     ACE_ERROR_RETURN((LM_ERROR,
01251                       ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::ignore_publication, ")
01252                       ACE_TEXT(" Entity is not enabled. \n")),
01253                      DDS::RETCODE_NOT_ENABLED);
01254   }
01255 
01256   if (DCPS_debug_level >= 4) {
01257     GuidConverter converter(dp_id_);
01258     ACE_DEBUG((LM_DEBUG,
01259                ACE_TEXT("(%P|%t) DomainParticipantImpl::ignore_publication: ")
01260                ACE_TEXT("%C ignoring handle %x.\n"),
01261                OPENDDS_STRING(converter).c_str(),
01262                handle));
01263   }
01264 
01265   RepoId ignoreId = get_repoid(handle);
01266   Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
01267   if (!disco->ignore_publication(domain_id_,
01268                                  dp_id_,
01269                                  ignoreId)) {
01270     ACE_ERROR_RETURN((LM_ERROR,
01271                       ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::ignore_publication, ")
01272                       ACE_TEXT(" could not ignore publication in discovery. \n")),
01273                      DDS::RETCODE_ERROR);
01274   }
01275 
01276   return DDS::RETCODE_OK;
01277 #else
01278   ACE_UNUSED_ARG(handle);
01279   return DDS::RETCODE_UNSUPPORTED;
01280 #endif // !defined (DDS_HAS_MINIMUM_BIT)
01281 }
01282 
01283 DDS::ReturnCode_t
01284 DomainParticipantImpl::ignore_subscription(
01285   DDS::InstanceHandle_t handle)
01286 {
01287 #if !defined (DDS_HAS_MINIMUM_BIT)
01288 
01289   if (enabled_ == false) {
01290     ACE_ERROR_RETURN((LM_ERROR,
01291                       ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::ignore_subscription, ")
01292                       ACE_TEXT(" Entity is not enabled. \n")),
01293                      DDS::RETCODE_NOT_ENABLED);
01294   }
01295 
01296   if (DCPS_debug_level >= 4) {
01297     GuidConverter converter(dp_id_);
01298     ACE_DEBUG((LM_DEBUG,
01299                ACE_TEXT("(%P|%t) DomainParticipantImpl::ignore_subscription: ")
01300                ACE_TEXT("%C ignoring handle %d.\n"),
01301                OPENDDS_STRING(converter).c_str(),
01302                handle));
01303   }
01304 
01305 
01306   RepoId ignoreId = get_repoid(handle);
01307   Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
01308   if (!disco->ignore_subscription(domain_id_,
01309                                   dp_id_,
01310                                   ignoreId)) {
01311     ACE_ERROR_RETURN((LM_ERROR,
01312                       ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::ignore_subscription, ")
01313                       ACE_TEXT(" could not ignore subscription in discovery. \n")),
01314                      DDS::RETCODE_ERROR);
01315   }
01316 
01317   return DDS::RETCODE_OK;
01318 #else
01319   ACE_UNUSED_ARG(handle);
01320   return DDS::RETCODE_UNSUPPORTED;
01321 #endif // !defined (DDS_HAS_MINIMUM_BIT)
01322 }
01323 
01324 DDS::DomainId_t
01325 DomainParticipantImpl::get_domain_id()
01326 {
01327   return domain_id_;
01328 }
01329 
01330 DDS::ReturnCode_t
01331 DomainParticipantImpl::assert_liveliness()
01332 {
01333   // This operation needs to only be used if the DomainParticipant contains
01334   // DataWriter entities with the LIVELINESS set to MANUAL_BY_PARTICIPANT and
01335   // it only affects the liveliness of those DataWriter entities. Otherwise,
01336   // it has no effect.
01337   // This will do nothing in current implementation since we only
01338   // support the AUTOMATIC liveliness qos for datawriter.
01339   // Add implementation here.
01340 
01341   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01342                    tao_mon,
01343                    this->publishers_protector_,
01344                    DDS::RETCODE_ERROR);
01345 
01346   for (PublisherSet::iterator it(publishers_.begin());
01347        it != publishers_.end(); ++it) {
01348     it->svt_->assert_liveliness_by_participant();
01349   }
01350 
01351   last_liveliness_activity_ = ACE_OS::gettimeofday();
01352 
01353   return DDS::RETCODE_OK;
01354 }
01355 
01356 DDS::ReturnCode_t
01357 DomainParticipantImpl::set_default_publisher_qos(
01358   const DDS::PublisherQos & qos)
01359 {
01360   if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
01361     default_publisher_qos_ = qos;
01362     return DDS::RETCODE_OK;
01363 
01364   } else {
01365     return DDS::RETCODE_INCONSISTENT_POLICY;
01366   }
01367 }
01368 
01369 DDS::ReturnCode_t
01370 DomainParticipantImpl::get_default_publisher_qos(
01371   DDS::PublisherQos & qos)
01372 {
01373   qos = default_publisher_qos_;
01374   return DDS::RETCODE_OK;
01375 }
01376 
01377 DDS::ReturnCode_t
01378 DomainParticipantImpl::set_default_subscriber_qos(
01379   const DDS::SubscriberQos & qos)
01380 {
01381   if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
01382     default_subscriber_qos_ = qos;
01383     return DDS::RETCODE_OK;
01384 
01385   } else {
01386     return DDS::RETCODE_INCONSISTENT_POLICY;
01387   }
01388 }
01389 
01390 DDS::ReturnCode_t
01391 DomainParticipantImpl::get_default_subscriber_qos(
01392   DDS::SubscriberQos & qos)
01393 {
01394   qos = default_subscriber_qos_;
01395   return DDS::RETCODE_OK;
01396 }
01397 
01398 DDS::ReturnCode_t
01399 DomainParticipantImpl::set_default_topic_qos(
01400   const DDS::TopicQos & qos)
01401 {
01402   if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
01403     default_topic_qos_ = qos;
01404     return DDS::RETCODE_OK;
01405 
01406   } else {
01407     return DDS::RETCODE_INCONSISTENT_POLICY;
01408   }
01409 }
01410 
01411 DDS::ReturnCode_t
01412 DomainParticipantImpl::get_default_topic_qos(
01413   DDS::TopicQos & qos)
01414 {
01415   qos = default_topic_qos_;
01416   return DDS::RETCODE_OK;
01417 }
01418 
01419 DDS::ReturnCode_t
01420 DomainParticipantImpl::get_current_time(
01421   DDS::Time_t & current_time)
01422 {
01423   current_time
01424   = OpenDDS::DCPS::time_value_to_time(
01425       ACE_OS::gettimeofday());
01426   return DDS::RETCODE_OK;
01427 }
01428 
01429 #if !defined (DDS_HAS_MINIMUM_BIT)
01430 
01431 DDS::ReturnCode_t
01432 DomainParticipantImpl::get_discovered_participants(
01433   DDS::InstanceHandleSeq & participant_handles)
01434 {
01435   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01436                    guard,
01437                    this->handle_protector_,
01438                    DDS::RETCODE_ERROR);
01439 
01440   HandleMap::const_iterator itEnd = this->handles_.end();
01441 
01442   for (HandleMap::const_iterator iter = this->handles_.begin();
01443        iter != itEnd; ++iter) {
01444     GuidConverter converter(iter->first);
01445 
01446     if (converter.entityKind() == KIND_PARTICIPANT)
01447     {
01448       // skip itself and the ignored participant
01449       if (iter->first == this->dp_id_
01450       || (this->ignored_participants_.find(iter->first)
01451         != this->ignored_participants_.end ())) {
01452         continue;
01453       }
01454 
01455       push_back(participant_handles, iter->second);
01456     }
01457   }
01458 
01459   return DDS::RETCODE_OK;
01460 }
01461 
01462 DDS::ReturnCode_t
01463 DomainParticipantImpl::get_discovered_participant_data(
01464   DDS::ParticipantBuiltinTopicData & participant_data,
01465   DDS::InstanceHandle_t participant_handle)
01466 {
01467   {
01468     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01469                      guard,
01470                      this->handle_protector_,
01471                      DDS::RETCODE_ERROR);
01472 
01473     bool found = false;
01474     HandleMap::const_iterator itEnd = this->handles_.end();
01475 
01476     for (HandleMap::const_iterator iter = this->handles_.begin();
01477          iter != itEnd; ++iter) {
01478       GuidConverter converter(iter->first);
01479 
01480       if (participant_handle == iter->second
01481           && converter.entityKind() == KIND_PARTICIPANT) {
01482         found = true;
01483         break;
01484       }
01485     }
01486 
01487     if (!found)
01488       return DDS::RETCODE_PRECONDITION_NOT_MET;
01489   }
01490 
01491   DDS::SampleInfoSeq info;
01492   DDS::ParticipantBuiltinTopicDataSeq data;
01493   DDS::DataReader_var dr =
01494     this->bit_subscriber_->lookup_datareader(BUILT_IN_PARTICIPANT_TOPIC);
01495   DDS::ParticipantBuiltinTopicDataDataReader_var bit_part_dr =
01496     DDS::ParticipantBuiltinTopicDataDataReader::_narrow(dr);
01497   DDS::ReturnCode_t ret = bit_part_dr->read_instance(data,
01498                                                      info,
01499                                                      1,
01500                                                      participant_handle,
01501                                                      DDS::ANY_SAMPLE_STATE,
01502                                                      DDS::ANY_VIEW_STATE,
01503                                                      DDS::ANY_INSTANCE_STATE);
01504 
01505   if (ret == DDS::RETCODE_OK) {
01506     if (info[0].valid_data)
01507       participant_data = data[0];
01508 
01509     else
01510       return DDS::RETCODE_NO_DATA;
01511   }
01512 
01513   return ret;
01514 }
01515 
01516 DDS::ReturnCode_t
01517 DomainParticipantImpl::get_discovered_topics(
01518   DDS::InstanceHandleSeq & topic_handles)
01519 {
01520   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01521                    guard,
01522                    this->handle_protector_,
01523                    DDS::RETCODE_ERROR);
01524 
01525   HandleMap::const_iterator itEnd = this->handles_.end();
01526 
01527   for (HandleMap::const_iterator iter = this->handles_.begin();
01528        iter != itEnd; ++iter) {
01529     GuidConverter converter(iter->first);
01530 
01531     if (converter.isTopic()) {
01532 
01533       // skip the ignored topic
01534       if (this->ignored_topics_.find(iter->first)
01535           != this->ignored_topics_.end ()) {
01536         continue;
01537       }
01538 
01539       push_back(topic_handles, iter->second);
01540     }
01541   }
01542 
01543   return DDS::RETCODE_OK;
01544 }
01545 
01546 DDS::ReturnCode_t
01547 DomainParticipantImpl::get_discovered_topic_data(
01548   DDS::TopicBuiltinTopicData & topic_data,
01549   DDS::InstanceHandle_t topic_handle)
01550 {
01551   {
01552     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01553                      guard,
01554                      this->handle_protector_,
01555                      DDS::RETCODE_ERROR);
01556 
01557     bool found = false;
01558     HandleMap::const_iterator itEnd = this->handles_.end();
01559 
01560     for (HandleMap::const_iterator iter = this->handles_.begin();
01561          iter != itEnd; ++iter) {
01562       GuidConverter converter(iter->first);
01563 
01564       if (topic_handle == iter->second && converter.isTopic()) {
01565         found = true;
01566         break;
01567       }
01568     }
01569 
01570     if (!found)
01571       return DDS::RETCODE_PRECONDITION_NOT_MET;
01572   }
01573 
01574   DDS::DataReader_var dr =
01575     bit_subscriber_->lookup_datareader(BUILT_IN_TOPIC_TOPIC);
01576   DDS::TopicBuiltinTopicDataDataReader_var bit_topic_dr =
01577     DDS::TopicBuiltinTopicDataDataReader::_narrow(dr);
01578 
01579   DDS::SampleInfoSeq info;
01580   DDS::TopicBuiltinTopicDataSeq data;
01581   DDS::ReturnCode_t ret =
01582     bit_topic_dr->read_instance(data,
01583                                 info,
01584                                 1,
01585                                 topic_handle,
01586                                 DDS::ANY_SAMPLE_STATE,
01587                                 DDS::ANY_VIEW_STATE,
01588                                 DDS::ANY_INSTANCE_STATE);
01589 
01590   if (ret == DDS::RETCODE_OK) {
01591     if (info[0].valid_data)
01592       topic_data = data[0];
01593 
01594     else
01595       return DDS::RETCODE_NO_DATA;
01596   }
01597 
01598   return ret;
01599 }
01600 
01601 #endif
01602 
01603 DDS::ReturnCode_t
01604 DomainParticipantImpl::enable()
01605 {
01606   //According spec:
01607   // - Calling enable on an already enabled Entity returns OK and has no
01608   // effect.
01609   // - Calling enable on an Entity whose factory is not enabled will fail
01610   // and return PRECONDITION_NOT_MET.
01611 
01612   if (this->is_enabled()) {
01613     return DDS::RETCODE_OK;
01614   }
01615 
01616   if (monitor_) {
01617     monitor_->report();
01618   }
01619 
01620   if (TheServiceParticipant->monitor_) {
01621     TheServiceParticipant->monitor_->report();
01622   }
01623 
01624 #if defined(OPENDDS_SECURITY)
01625   if (!security_config_ && TheServiceParticipant->get_security()) {
01626     security_config_ = TheSecurityRegistry->default_config();
01627     if (!security_config_) {
01628       security_config_ = TheSecurityRegistry->fix_empty_default();
01629     }
01630   }
01631 #endif
01632 
01633   Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
01634 
01635   if (disco.is_nil()) {
01636     ACE_ERROR((LM_ERROR,
01637                ACE_TEXT("(%P|%t) ERROR: ")
01638                ACE_TEXT("DomainParticipantImpl::enable, ")
01639                ACE_TEXT("no repository found for domain id: %d.\n"), domain_id_));
01640     return DDS::RETCODE_ERROR;
01641   }
01642 
01643 #if defined(OPENDDS_SECURITY)
01644   if (TheServiceParticipant->get_security() && !security_config_) {
01645     ACE_ERROR((LM_ERROR,
01646                ACE_TEXT("(%P|%t) ERROR: ")
01647                ACE_TEXT("DomainParticipantImpl::enable, ")
01648                ACE_TEXT("DCPSSecurity flag is set, but unable to load security plugin configuration.\n")));
01649     return DDS::RETCODE_ERROR;
01650   }
01651 #endif
01652 
01653   AddDomainStatus value = {GUID_UNKNOWN, false};
01654 
01655 #if defined(OPENDDS_SECURITY)
01656   if (TheServiceParticipant->get_security()) {
01657     Security::Authentication_var auth = security_config_->get_authentication();
01658 
01659     DDS::Security::SecurityException se;
01660     DDS::Security::ValidationResult_t val_res =
01661       auth->validate_local_identity(id_handle_, dp_id_, domain_id_, qos_, disco->generate_participant_guid(), se);
01662 
01663     /* TODO - Handle VALIDATION_PENDING_RETRY */
01664     if (val_res != DDS::Security::VALIDATION_OK) {
01665       ACE_ERROR((LM_ERROR,
01666         ACE_TEXT("(%P|%t) ERROR: ")
01667         ACE_TEXT("DomainParticipantImpl::enable, ")
01668         ACE_TEXT("Unable to validate local identity. SecurityException[%d.%d]: %C\n"),
01669           se.code, se.minor_code, se.message.in()));
01670       return DDS::Security::RETCODE_NOT_ALLOWED_BY_SECURITY;
01671     }
01672 
01673     Security::AccessControl_var access = security_config_->get_access_control();
01674 
01675     perm_handle_ = access->validate_local_permissions(auth, id_handle_, domain_id_, qos_, se);
01676 
01677     if (perm_handle_ == DDS::HANDLE_NIL) {
01678       ACE_ERROR((LM_ERROR,
01679         ACE_TEXT("(%P|%t) ERROR: ")
01680         ACE_TEXT("DomainParticipantImpl::enable, ")
01681         ACE_TEXT("Unable to validate local permissions. SecurityException[%d.%d]: %C\n"),
01682           se.code, se.minor_code, se.message.in()));
01683       return DDS::Security::RETCODE_NOT_ALLOWED_BY_SECURITY;
01684     }
01685 
01686     bool check_create = access->check_create_participant(perm_handle_, domain_id_, qos_, se);
01687     if (!check_create) {
01688       ACE_ERROR((LM_ERROR,
01689         ACE_TEXT("(%P|%t) ERROR: ")
01690         ACE_TEXT("DomainParticipantImpl::enable, ")
01691         ACE_TEXT("Unable to create participant. SecurityException[%d.%d]: %C\n"),
01692           se.code, se.minor_code, se.message.in()));
01693       return DDS::Security::RETCODE_NOT_ALLOWED_BY_SECURITY;
01694     }
01695 
01696     DDS::Security::ParticipantSecurityAttributes part_sec_attr;
01697     bool check_part_sec_attr = access->get_participant_sec_attributes(perm_handle_, part_sec_attr, se);
01698 
01699     if (!check_part_sec_attr) {
01700       ACE_ERROR((LM_ERROR,
01701         ACE_TEXT("(%P|%t) ERROR: ")
01702         ACE_TEXT("DomainParticipantImpl::enable, ")
01703         ACE_TEXT("Unable to get participant security attributes. SecurityException[%d.%d]: %C\n"),
01704           se.code, se.minor_code, se.message.in()));
01705       return DDS::RETCODE_ERROR;
01706     }
01707 
01708     Security::CryptoKeyFactory_var crypto = security_config_->get_crypto_key_factory();
01709 
01710     part_crypto_handle_ = crypto->register_local_participant(id_handle_, perm_handle_,
01711       Util::filter_properties(qos_.property.value, "dds.sec.crypto."), part_sec_attr, se);
01712     if (part_crypto_handle_ == DDS::HANDLE_NIL) {
01713       ACE_ERROR((LM_ERROR,
01714         ACE_TEXT("(%P|%t) ERROR: ")
01715         ACE_TEXT("DomainParticipantImpl::enable, ")
01716         ACE_TEXT("Unable to register local participant. SecurityException[%d.%d]: %C\n"),
01717           se.code, se.minor_code, se.message.in()));
01718       return DDS::RETCODE_ERROR;
01719     }
01720 
01721     value = disco->add_domain_participant_secure(domain_id_, qos_, dp_id_, id_handle_, perm_handle_, part_crypto_handle_);
01722 
01723     if (value.id == GUID_UNKNOWN) {
01724       ACE_ERROR((LM_ERROR,
01725                  ACE_TEXT("(%P|%t) ERROR: ")
01726                  ACE_TEXT("DomainParticipantImpl::enable, ")
01727                  ACE_TEXT("add_domain_participant_secure returned invalid id.\n")));
01728       return DDS::RETCODE_ERROR;
01729     }
01730 
01731   } else {
01732 #endif
01733 
01734     value = disco->add_domain_participant(domain_id_, qos_);
01735 
01736     if (value.id == GUID_UNKNOWN) {
01737       ACE_ERROR((LM_ERROR,
01738                  ACE_TEXT("(%P|%t) ERROR: ")
01739                  ACE_TEXT("DomainParticipantImpl::enable, ")
01740                  ACE_TEXT("add_domain_participant returned invalid id.\n")));
01741       return DDS::RETCODE_ERROR;
01742     }
01743 
01744 #if defined(OPENDDS_SECURITY)
01745   }
01746 #endif
01747 
01748   dp_id_ = value.id;
01749   federated_ = value.federated;
01750 
01751   const DDS::ReturnCode_t ret = this->set_enabled();
01752 
01753   if (DCPS_debug_level > 1) {
01754     ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DomainParticipantImpl::enable: ")
01755                ACE_TEXT("enabled participant %C in domain %d\n"),
01756                OPENDDS_STRING(GuidConverter(dp_id_)).c_str(), domain_id_));
01757   }
01758 
01759   if (ret == DDS::RETCODE_OK && !TheTransientKludge->is_enabled()) {
01760     Discovery_rch disc = TheServiceParticipant->get_discovery(this->domain_id_);
01761     this->bit_subscriber_ = disc->init_bit(this);
01762   }
01763 
01764   if (ret != DDS::RETCODE_OK) {
01765     return ret;
01766   }
01767 
01768   if (qos_.entity_factory.autoenable_created_entities) {
01769 
01770     for (TopicMap::iterator it = topics_.begin(); it != topics_.end(); ++it) {
01771       it->second.pair_.svt_->enable();
01772     }
01773 
01774     for (PublisherSet::iterator it = publishers_.begin(); it != publishers_.end(); ++it) {
01775       it->svt_->enable();
01776     }
01777 
01778     for (SubscriberSet::iterator it = subscribers_.begin(); it != subscribers_.end(); ++it) {
01779       it->svt_->enable();
01780     }
01781   }
01782 
01783   return DDS::RETCODE_OK;
01784 }
01785 
01786 RepoId
01787 DomainParticipantImpl::get_id()
01788 {
01789   return dp_id_;
01790 }
01791 
01792 OPENDDS_STRING
01793 DomainParticipantImpl::get_unique_id()
01794 {
01795   return GuidConverter(dp_id_).uniqueId();
01796 }
01797 
01798 
01799 DDS::InstanceHandle_t
01800 DomainParticipantImpl::get_instance_handle()
01801 {
01802   return this->id_to_handle(this->dp_id_);
01803 }
01804 
01805 DDS::InstanceHandle_t
01806 DomainParticipantImpl::id_to_handle(const RepoId& id)
01807 {
01808   if (id == GUID_UNKNOWN) {
01809     return this->participant_handles_.next();
01810   }
01811 
01812   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01813                    guard,
01814                    this->handle_protector_,
01815                    HANDLE_UNKNOWN);
01816 
01817   HandleMap::const_iterator location = this->handles_.find(id);
01818   DDS::InstanceHandle_t result;
01819 
01820   if (location == this->handles_.end()) {
01821     // Map new handle in both directions
01822     result = this->participant_handles_.next();
01823     this->handles_[id] = result;
01824     this->repoIds_[result] = id;
01825   } else {
01826     result = location->second;
01827   }
01828 
01829   return result;
01830 }
01831 
01832 RepoId
01833 DomainParticipantImpl::get_repoid(const DDS::InstanceHandle_t& handle)
01834 {
01835   RepoId result = GUID_UNKNOWN;
01836   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01837                    guard,
01838                    this->handle_protector_,
01839                    GUID_UNKNOWN);
01840   RepoIdMap::const_iterator location = this->repoIds_.find(handle);
01841   if (location != this->repoIds_.end()) {
01842     result = location->second;
01843   }
01844   return result;
01845 }
01846 
01847 #if defined(OPENDDS_SECURITY)
01848 namespace {
01849 
01850   bool
01851   is_bit(const char* topic_name) {
01852     return strcmp(topic_name, BUILT_IN_PARTICIPANT_TOPIC) == 0
01853       || strcmp(topic_name, BUILT_IN_TOPIC_TOPIC) == 0
01854       || strcmp(topic_name, BUILT_IN_PUBLICATION_TOPIC) == 0
01855       || strcmp(topic_name, BUILT_IN_SUBSCRIPTION_TOPIC) == 0;
01856   }
01857 
01858 }
01859 #endif
01860 
01861 DDS::Topic_ptr
01862 DomainParticipantImpl::create_new_topic(
01863   const RepoId topic_id,
01864   const char * topic_name,
01865   const char * type_name,
01866   const DDS::TopicQos & qos,
01867   DDS::TopicListener_ptr a_listener,
01868   const DDS::StatusMask & mask,
01869   OpenDDS::DCPS::TypeSupport_ptr type_support)
01870 {
01871   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01872                    tao_mon,
01873                    this->topics_protector_,
01874                    DDS::Topic::_nil());
01875 
01876 #if defined(OPENDDS_SECURITY)
01877   if (TheServiceParticipant->get_security() && !is_bit(topic_name)) {
01878     Security::AccessControl_var access = security_config_->get_access_control();
01879 
01880     DDS::Security::SecurityException se;
01881 
01882     DDS::Security::TopicSecurityAttributes sec_attr;
01883     if (!access->get_topic_sec_attributes(perm_handle_, topic_name, sec_attr, se)) {
01884       ACE_ERROR((LM_WARNING,
01885         ACE_TEXT("(%P|%t) WARNING: ")
01886         ACE_TEXT("DomainParticipantImpl::create_new_topic, ")
01887         ACE_TEXT("Unable to get security attributes for topic '%C'. SecurityException[%d.%d]: %C\n"),
01888           topic_name, se.code, se.minor_code, se.message.in()));
01889       return DDS::Topic::_nil();
01890     }
01891 
01892     if ((sec_attr.is_write_protected || sec_attr.is_read_protected) &&
01893         !access->check_create_topic(perm_handle_, domain_id_, topic_name, qos, se)) {
01894       ACE_ERROR((LM_WARNING,
01895         ACE_TEXT("(%P|%t) WARNING: ")
01896         ACE_TEXT("DomainParticipantImpl::create_new_topic, ")
01897         ACE_TEXT("Permissions check failed to create new topic '%C'. SecurityException[%d.%d]: %C\n"),
01898           topic_name, se.code, se.minor_code, se.message.in()));
01899       return DDS::Topic::_nil();
01900     }
01901   }
01902 #endif
01903 
01904   TopicImpl* topic_servant = 0;
01905 
01906   ACE_NEW_RETURN(topic_servant,
01907                  TopicImpl(topic_id,
01908                            topic_name,
01909                            type_name,
01910                            type_support,
01911                            qos,
01912                            a_listener,
01913                            mask,
01914                            this),
01915                  DDS::Topic::_nil());
01916 
01917   if ((enabled_ == true)
01918       && (qos_.entity_factory.autoenable_created_entities)) {
01919     topic_servant->enable();
01920   }
01921 
01922   DDS::Topic_ptr obj(topic_servant);
01923 
01924   // this object will also act as a guard against leaking the new TopicImpl
01925   RefCounted_Topic refCounted_topic(Topic_Pair(topic_servant, obj, NO_DUP));
01926 
01927   if (OpenDDS::DCPS::bind(topics_, topic_name, refCounted_topic) == -1) {
01928     ACE_ERROR((LM_ERROR,
01929                ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::create_new_topic, ")
01930                ACE_TEXT("%p \n"),
01931                ACE_TEXT("bind")));
01932     return DDS::Topic::_nil();
01933   }
01934 
01935   if (this->monitor_) {
01936     this->monitor_->report();
01937   }
01938 
01939   // the topics_ map has one reference and we duplicate to give
01940   // the caller another reference.
01941   return DDS::Topic::_duplicate(refCounted_topic.pair_.obj_.in());
01942 }
01943 
01944 bool
01945 DomainParticipantImpl::is_clean() const
01946 {
01947   bool sub_is_clean = subscribers_.empty();
01948   bool topics_is_clean = topics_.size() == 0;
01949 
01950   if (!TheTransientKludge->is_enabled()) {
01951     // There are four topics and builtin topic subscribers
01952     // left.
01953 
01954     sub_is_clean = !sub_is_clean ? subscribers_.size() == 1 : true;
01955     topics_is_clean = !topics_is_clean ? topics_.size() == 4 : true;
01956   }
01957   return (publishers_.empty()
01958           && sub_is_clean
01959           && topics_is_clean);
01960 }
01961 
01962 DDS::DomainParticipantListener_ptr
01963 DomainParticipantImpl::listener_for(DDS::StatusKind kind)
01964 {
01965   if (CORBA::is_nil(listener_.in()) || (listener_mask_ & kind) == 0) {
01966     return DDS::DomainParticipantListener::_nil ();
01967   } else {
01968     return DDS::DomainParticipantListener::_duplicate(listener_.in());
01969   }
01970 }
01971 
01972 void
01973 DomainParticipantImpl::get_topic_ids(TopicIdVec& topics)
01974 {
01975   ACE_GUARD(ACE_Recursive_Thread_Mutex,
01976             guard,
01977             this->topics_protector_);
01978 
01979   topics.reserve(topics_.size());
01980   for (TopicMap::iterator it(topics_.begin());
01981        it != topics_.end(); ++it) {
01982     topics.push_back(it->second.pair_.svt_->get_id());
01983   }
01984 }
01985 
01986 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
01987 
01988 OwnershipManager*
01989 DomainParticipantImpl::ownership_manager()
01990 {
01991 #if !defined (DDS_HAS_MINIMUM_BIT)
01992 
01993   DDS::DataReader_var dr =
01994     bit_subscriber_->lookup_datareader(BUILT_IN_PUBLICATION_TOPIC);
01995   DDS::PublicationBuiltinTopicDataDataReader_var bit_pub_dr =
01996     DDS::PublicationBuiltinTopicDataDataReader::_narrow(dr);
01997 
01998   if (!CORBA::is_nil(bit_pub_dr.in())) {
01999     DDS::DataReaderListener_var listener = bit_pub_dr->get_listener();
02000     if (CORBA::is_nil(listener.in())) {
02001       DDS::DataReaderListener_var bit_pub_listener =
02002         new BitPubListenerImpl(this);
02003       bit_pub_dr->set_listener(bit_pub_listener, DDS::DATA_AVAILABLE_STATUS);
02004       // Must call on_data_available when attaching a listener late - samples may be waiting
02005       bit_pub_listener->on_data_available(bit_pub_dr.in());
02006     }
02007   }
02008 
02009 #endif
02010   return &this->owner_man_;
02011 }
02012 
02013 void
02014 DomainParticipantImpl::update_ownership_strength (const PublicationId& pub_id,
02015                                                   const CORBA::Long& ownership_strength)
02016 {
02017   ACE_GUARD(ACE_Recursive_Thread_Mutex,
02018             tao_mon,
02019             this->subscribers_protector_);
02020 
02021   if (this->get_deleted ())
02022     return;
02023 
02024   for (SubscriberSet::iterator it(this->subscribers_.begin());
02025       it != this->subscribers_.end(); ++it) {
02026     it->svt_->update_ownership_strength(pub_id, ownership_strength);
02027   }
02028 }
02029 
02030 #endif // OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
02031 
02032 DomainParticipantImpl::RepoIdSequence::RepoIdSequence(const RepoId& base) :
02033   base_(base),
02034   serial_(0),
02035   builder_(base_)
02036 {
02037 }
02038 
02039 RepoId
02040 DomainParticipantImpl::RepoIdSequence::next()
02041 {
02042   builder_.entityKey(++serial_);
02043   return builder_;
02044 }
02045 
02046 
02047 ////////////////////////////////////////////////////////////////
02048 
02049 
02050 bool
02051 DomainParticipantImpl::validate_publisher_qos(DDS::PublisherQos & pub_qos)
02052 {
02053   if (pub_qos == PUBLISHER_QOS_DEFAULT) {
02054     this->get_default_publisher_qos(pub_qos);
02055   }
02056 
02057   OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK(pub_qos, false);
02058 
02059   if (!Qos_Helper::valid(pub_qos) || !Qos_Helper::consistent(pub_qos)) {
02060     ACE_ERROR((LM_ERROR,
02061                ACE_TEXT("(%P|%t) ERROR: ")
02062                ACE_TEXT("DomainParticipantImpl::validate_publisher_qos, ")
02063                ACE_TEXT("invalid qos.\n")));
02064     return false;
02065   }
02066 
02067   return true;
02068 }
02069 
02070 bool
02071 DomainParticipantImpl::validate_subscriber_qos(DDS::SubscriberQos & subscriber_qos)
02072 {
02073   if (subscriber_qos == SUBSCRIBER_QOS_DEFAULT) {
02074     this->get_default_subscriber_qos(subscriber_qos);
02075   }
02076 
02077   OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK(subscriber_qos, false);
02078 
02079   if (!Qos_Helper::valid(subscriber_qos) || !Qos_Helper::consistent(subscriber_qos)) {
02080     ACE_ERROR((LM_ERROR,
02081                ACE_TEXT("(%P|%t) ERROR: ")
02082                ACE_TEXT("DomainParticipantImpl::validate_subscriber_qos, ")
02083                ACE_TEXT("invalid qos.\n")));
02084     return false;
02085   }
02086 
02087 
02088   return true;
02089 }
02090 
02091 Recorder_ptr
02092 DomainParticipantImpl::create_recorder(DDS::Topic_ptr a_topic,
02093                                        const DDS::SubscriberQos& subscriber_qos,
02094                                        const DDS::DataReaderQos& datareader_qos,
02095                                        const RecorderListener_rch& a_listener,
02096                                        DDS::StatusMask mask)
02097 {
02098   if (CORBA::is_nil(a_topic)) {
02099     ACE_ERROR((LM_ERROR,
02100                ACE_TEXT("(%P|%t) ERROR: ")
02101                ACE_TEXT("SubscriberImpl::create_datareader, ")
02102                ACE_TEXT("topic desc is nil.\n")));
02103     return 0;
02104   }
02105 
02106   DDS::SubscriberQos sub_qos = subscriber_qos;
02107   DDS::DataReaderQos dr_qos;
02108 
02109   if (! this->validate_subscriber_qos(sub_qos) ||
02110       ! SubscriberImpl::validate_datareader_qos(datareader_qos,
02111                                                 TheServiceParticipant->initial_DataReaderQos(),
02112                                                 a_topic,
02113                                                 dr_qos, false) ) {
02114     return 0;
02115   }
02116 
02117   RecorderImpl* recorder(new RecorderImpl);
02118   Recorder_var result(recorder);
02119 
02120   recorder->init(dynamic_cast<TopicDescriptionImpl*>(a_topic),
02121     dr_qos, a_listener,
02122     mask, this, subscriber_qos);
02123 
02124   if ((enabled_ == true) && (qos_.entity_factory.autoenable_created_entities)) {
02125     recorder->enable();
02126   }
02127 
02128   ACE_Guard<ACE_Recursive_Thread_Mutex> guard(recorders_protector_);
02129   recorders_.insert(result);
02130 
02131   return result._retn();
02132 }
02133 
02134 Replayer_ptr
02135 DomainParticipantImpl::create_replayer(DDS::Topic_ptr a_topic,
02136                                        const DDS::PublisherQos& publisher_qos,
02137                                        const DDS::DataWriterQos& datawriter_qos,
02138                                        const ReplayerListener_rch& a_listener,
02139                                        DDS::StatusMask mask)
02140 {
02141   if (CORBA::is_nil(a_topic)) {
02142     ACE_ERROR((LM_ERROR,
02143                ACE_TEXT("(%P|%t) ERROR: ")
02144                ACE_TEXT("SubscriberImpl::create_datareader, ")
02145                ACE_TEXT("topic desc is nil.\n")));
02146     return 0;
02147   }
02148 
02149   DDS::PublisherQos pub_qos = publisher_qos;
02150   DDS::DataWriterQos dw_qos;
02151 
02152   if (! this->validate_publisher_qos(pub_qos) ||
02153       ! PublisherImpl::validate_datawriter_qos(datawriter_qos,
02154                                                TheServiceParticipant->initial_DataWriterQos(),
02155                                                a_topic,
02156                                                dw_qos)) {
02157     return 0;
02158   }
02159 
02160   TopicImpl* topic_servant = dynamic_cast<TopicImpl*>(a_topic);
02161 
02162   ReplayerImpl* replayer(new ReplayerImpl);
02163   Replayer_var result(replayer);
02164 
02165   replayer->init(a_topic, topic_servant, dw_qos, a_listener, mask, this, pub_qos);
02166 
02167   if ((this->enabled_ == true) && (qos_.entity_factory.autoenable_created_entities)) {
02168     const DDS::ReturnCode_t ret = replayer->enable();
02169 
02170     if (ret != DDS::RETCODE_OK) {
02171       ACE_ERROR((LM_ERROR,
02172                  ACE_TEXT("(%P|%t) ERROR: ")
02173                  ACE_TEXT("DomainParticipantImpl::create_replayer, ")
02174                  ACE_TEXT("enable failed.\n")));
02175       return 0;
02176     }
02177   }
02178 
02179   ACE_Guard<ACE_Recursive_Thread_Mutex> guard(replayers_protector_);
02180   replayers_.insert(result);
02181   return result._retn();
02182 }
02183 
02184 void
02185 DomainParticipantImpl::delete_recorder(Recorder_ptr recorder)
02186 {
02187   const Recorder_var recvar(Recorder::_duplicate(recorder));
02188   ACE_Guard<ACE_Recursive_Thread_Mutex> guard(recorders_protector_);
02189   recorders_.erase(recvar);
02190 }
02191 
02192 void
02193 DomainParticipantImpl::delete_replayer(Replayer_ptr replayer)
02194 {
02195   const Replayer_var repvar(Replayer::_duplicate(replayer));
02196   ACE_Guard<ACE_Recursive_Thread_Mutex> guard(replayers_protector_);
02197   replayers_.erase(repvar);
02198 }
02199 
02200 void
02201 DomainParticipantImpl::add_adjust_liveliness_timers(DataWriterImpl* writer)
02202 {
02203   automatic_liveliness_timer_.add_adjust(writer);
02204   participant_liveliness_timer_.add_adjust(writer);
02205 }
02206 
02207 void
02208 DomainParticipantImpl::remove_adjust_liveliness_timers()
02209 {
02210   automatic_liveliness_timer_.remove_adjust();
02211   participant_liveliness_timer_.remove_adjust();
02212 }
02213 
02214 DomainParticipantImpl::LivelinessTimer::LivelinessTimer(DomainParticipantImpl& impl,
02215                                                         DDS::LivelinessQosPolicyKind kind)
02216   : impl_(impl)
02217   , kind_ (kind)
02218   , interval_ (ACE_Time_Value::max_time)
02219   , recalculate_interval_ (false)
02220   , scheduled_ (false)
02221 { }
02222 
02223 DomainParticipantImpl::LivelinessTimer::~LivelinessTimer()
02224 {
02225   if (scheduled_) {
02226     TheServiceParticipant->timer()->cancel_timer(this);
02227   }
02228 }
02229 
02230 void
02231 DomainParticipantImpl::LivelinessTimer::add_adjust(OpenDDS::DCPS::DataWriterImpl* writer)
02232 {
02233   ACE_GUARD(ACE_Thread_Mutex,
02234             guard,
02235             this->lock_);
02236 
02237   const ACE_Time_Value now = ACE_OS::gettimeofday();
02238 
02239   // Calculate the time remaining to liveliness check.
02240   const ACE_Time_Value remaining = interval_ - (now - last_liveliness_check_);
02241 
02242   // Adopt a smaller interval.
02243   const ACE_Time_Value i = writer->liveliness_check_interval(kind_);
02244   if (i < interval_) {
02245     interval_ = i;
02246   }
02247 
02248   // Reschedule or schedule a timer if necessary.
02249   if (scheduled_ && interval_ < remaining) {
02250     TheServiceParticipant->timer()->cancel_timer(this);
02251     TheServiceParticipant->timer()->schedule_timer(this, 0, interval_);
02252   } else if (!scheduled_) {
02253     TheServiceParticipant->timer()->schedule_timer(this, 0, interval_);
02254     scheduled_ = true;
02255     last_liveliness_check_ = now;
02256   }
02257 }
02258 
02259 void
02260 DomainParticipantImpl::LivelinessTimer::remove_adjust()
02261 {
02262   ACE_GUARD(ACE_Thread_Mutex,
02263             guard,
02264             this->lock_);
02265 
02266   recalculate_interval_ = true;
02267 }
02268 
02269 int
02270 DomainParticipantImpl::LivelinessTimer::handle_timeout(const ACE_Time_Value & tv, const void* /* arg */)
02271 {
02272   ACE_GUARD_RETURN(ACE_Thread_Mutex,
02273                    guard,
02274                    this->lock_,
02275                    0);
02276 
02277   scheduled_ = false;
02278 
02279   if (recalculate_interval_) {
02280     interval_ = impl_.liveliness_check_interval(kind_);
02281     recalculate_interval_ = false;
02282   }
02283 
02284   if (interval_ != ACE_Time_Value::max_time) {
02285     dispatch(tv);
02286     last_liveliness_check_ = tv;
02287     TheServiceParticipant->timer()->schedule_timer(this, 0, interval_);
02288     scheduled_ = true;
02289   }
02290 
02291   return 0;
02292 }
02293 
02294 DomainParticipantImpl::AutomaticLivelinessTimer::AutomaticLivelinessTimer(DomainParticipantImpl& impl)
02295   : LivelinessTimer (impl, DDS::AUTOMATIC_LIVELINESS_QOS)
02296 { }
02297 
02298 void
02299 DomainParticipantImpl::AutomaticLivelinessTimer::dispatch(const ACE_Time_Value& /* tv */)
02300 {
02301   impl_.signal_liveliness (kind_);
02302 }
02303 
02304 DomainParticipantImpl::ParticipantLivelinessTimer::ParticipantLivelinessTimer(DomainParticipantImpl& impl)
02305   : LivelinessTimer (impl, DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS)
02306 { }
02307 
02308 void
02309 DomainParticipantImpl::ParticipantLivelinessTimer::dispatch(const ACE_Time_Value& tv)
02310 {
02311   if (impl_.participant_liveliness_activity_after (tv - interval())) {
02312     impl_.signal_liveliness (kind_);
02313   }
02314 }
02315 
02316 ACE_Time_Value
02317 DomainParticipantImpl::liveliness_check_interval(DDS::LivelinessQosPolicyKind kind)
02318 {
02319   ACE_Time_Value tv = ACE_Time_Value::max_time;
02320 
02321   ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
02322                     tao_mon,
02323                     this->publishers_protector_,
02324                     tv);
02325 
02326   for (PublisherSet::iterator it(publishers_.begin());
02327        it != publishers_.end(); ++it) {
02328     tv = std::min (tv, it->svt_->liveliness_check_interval(kind));
02329   }
02330 
02331   return tv;
02332 }
02333 
02334 bool
02335 DomainParticipantImpl::participant_liveliness_activity_after(const ACE_Time_Value& tv)
02336 {
02337   if (last_liveliness_activity_ > tv) {
02338     return true;
02339   }
02340 
02341   ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
02342                     tao_mon,
02343                     this->publishers_protector_,
02344                     tv);
02345 
02346   for (PublisherSet::iterator it(publishers_.begin());
02347        it != publishers_.end(); ++it) {
02348     if (it->svt_->participant_liveliness_activity_after(tv)) {
02349       return true;
02350     }
02351   }
02352 
02353   return false;
02354 }
02355 
02356 void
02357 DomainParticipantImpl::signal_liveliness (DDS::LivelinessQosPolicyKind kind)
02358 {
02359   TheServiceParticipant->get_discovery(domain_id_)->signal_liveliness (domain_id_, get_id(), kind);
02360 }
02361 
02362 #if defined(OPENDDS_SECURITY)
02363 void
02364 DomainParticipantImpl::set_security_config(const Security::SecurityConfig_rch& cfg)
02365 {
02366   security_config_ = cfg;
02367 }
02368 #endif
02369 
02370 int
02371 DomainParticipantImpl::handle_exception(ACE_HANDLE /*fd*/)
02372 {
02373   DDS::ReturnCode_t ret = DDS::RETCODE_OK;
02374 
02375   // delete publishers
02376   {
02377     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
02378                      tao_mon,
02379                      this->publishers_protector_,
02380                      DDS::RETCODE_ERROR);
02381 
02382     PublisherSet::iterator pubIter = publishers_.begin();
02383     DDS::Publisher_ptr pubPtr;
02384     size_t pubsize = publishers_.size();
02385 
02386     while (pubsize > 0) {
02387       pubPtr = (*pubIter).obj_.in();
02388       ++pubIter;
02389 
02390       DDS::ReturnCode_t result
02391       = pubPtr->delete_contained_entities();
02392 
02393       if (result != DDS::RETCODE_OK) {
02394         ret = result;
02395       }
02396 
02397       result = delete_publisher(pubPtr);
02398 
02399       if (result != DDS::RETCODE_OK) {
02400         ret = result;
02401       }
02402 
02403       --pubsize;
02404     }
02405 
02406   }
02407 
02408   // delete subscribers
02409   {
02410     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
02411                      tao_mon,
02412                      this->subscribers_protector_,
02413                      DDS::RETCODE_ERROR);
02414 
02415     SubscriberSet::iterator subIter = subscribers_.begin();
02416     DDS::Subscriber_ptr subPtr;
02417     size_t subsize = subscribers_.size();
02418 
02419     while (subsize > 0) {
02420       subPtr = (*subIter).obj_.in();
02421       ++subIter;
02422 
02423       DDS::ReturnCode_t result = subPtr->delete_contained_entities();
02424 
02425       if (result != DDS::RETCODE_OK) {
02426         ret = result;
02427       }
02428 
02429       result = delete_subscriber(subPtr);
02430 
02431       if (result != DDS::RETCODE_OK) {
02432         ret = result;
02433       }
02434 
02435       --subsize;
02436     }
02437   }
02438 
02439   // delete topics
02440   {
02441     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
02442                      tao_mon,
02443                      this->topics_protector_,
02444                      DDS::RETCODE_ERROR);
02445 
02446     TopicMap::iterator topicIter = topics_.begin();
02447     DDS::Topic_ptr topicPtr;
02448     size_t topicsize = topics_.size();
02449 
02450     while (topicsize > 0) {
02451       topicPtr = topicIter->second.pair_.obj_.in();
02452       ++topicIter;
02453 
02454       // Delete the topic the reference count.
02455       const DDS::ReturnCode_t result = this->delete_topic_i(topicPtr, true);
02456 
02457       if (result != DDS::RETCODE_OK) {
02458         ret = result;
02459       }
02460       --topicsize;
02461     }
02462   }
02463 
02464   {
02465     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
02466                      tao_mon,
02467                      this->recorders_protector_,
02468                      DDS::RETCODE_ERROR);
02469 
02470     RecorderSet::iterator it = recorders_.begin();
02471     for (; it != recorders_.end(); ++it ){
02472       RecorderImpl* impl = dynamic_cast<RecorderImpl* >(it->in());
02473       DDS::ReturnCode_t result = DDS::RETCODE_ERROR;
02474       if (impl) result = impl->cleanup();
02475       if (result != DDS::RETCODE_OK) ret = result;
02476     }
02477     recorders_.clear();
02478   }
02479 
02480   {
02481     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
02482                      tao_mon,
02483                      this->replayers_protector_,
02484                      DDS::RETCODE_ERROR);
02485 
02486     ReplayerSet::iterator it = replayers_.begin();
02487     for (; it != replayers_.end(); ++it ){
02488       ReplayerImpl* impl = static_cast<ReplayerImpl* >(it->in());
02489       DDS::ReturnCode_t result = DDS::RETCODE_ERROR;
02490       if (impl) result = impl->cleanup();
02491       if (result != DDS::RETCODE_OK) ret = result;
02492 
02493     }
02494 
02495     replayers_.clear();
02496   }
02497 
02498   shutdown_mutex_.acquire();
02499   shutdown_result_ = ret;
02500   shutdown_complete_ = true;
02501   shutdown_condition_.signal();
02502   shutdown_mutex_.release();
02503 
02504   return 0;
02505 }
02506 
02507 } // namespace DCPS
02508 } // namespace OpenDDS
02509 
02510 OPENDDS_END_VERSIONED_NAMESPACE_DECL
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1