DomainParticipantImpl.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
00009 #include "DomainParticipantImpl.h"
00010 #include "FeatureDisabledQosCheck.h"
00011 #include "Service_Participant.h"
00012 #include "Qos_Helper.h"
00013 #include "GuidConverter.h"
00014 #include "PublisherImpl.h"
00015 #include "SubscriberImpl.h"
00016 #include "DataWriterImpl.h"
00017 #include "Marked_Default_Qos.h"
00018 #include "Registered_Data_Types.h"
00019 #include "Transient_Kludge.h"
00020 #include "DomainParticipantFactoryImpl.h"
00021 #include "Util.h"
00022 #include "MonitorFactory.h"
00023 #include "dds/DdsDcpsGuidC.h"
00024 #include "BitPubListenerImpl.h"
00025 #include "ContentFilteredTopicImpl.h"
00026 #include "MultiTopicImpl.h"
00027 #include "dds/DCPS/transport/framework/TransportRegistry.h"
00028 #include "dds/DCPS/transport/framework/TransportExceptions.h"
00029 
00030 #include "RecorderImpl.h"
00031 #include "ReplayerImpl.h"
00032 
00033 #if !defined (DDS_HAS_MINIMUM_BIT)
00034 #include "BuiltInTopicUtils.h"
00035 #include "dds/DdsDcpsCoreTypeSupportImpl.h"
00036 #endif // !defined (DDS_HAS_MINIMUM_BIT)
00037 
00038 #include "tao/debug.h"
00039 #include "ace/Reactor.h"
00040 
00041 namespace Util {
00042 
00043 template <typename Key>
00044 int find(
00045   OpenDDS::DCPS::DomainParticipantImpl::TopicMap& c,
00046   const Key& key,
00047   OpenDDS::DCPS::DomainParticipantImpl::TopicMap::mapped_type*& value)
00048 {
00049   OpenDDS::DCPS::DomainParticipantImpl::TopicMap::iterator iter =
00050     c.find(key);
00051 
00052   if (iter == c.end()) {
00053     return -1;
00054   }
00055 
00056   value = &iter->second;
00057   return 0;
00058 }
00059 
00060 } // namespace Util
00061 
00062 namespace OpenDDS {
00063 namespace DCPS {
00064 
00065 //TBD - add check for enabled in most methods.
00066 //      Currently this is not needed because auto_enable_created_entities
00067 //      cannot be false.
00068 
00069 // Implementation skeleton constructor
00070 DomainParticipantImpl::DomainParticipantImpl(DomainParticipantFactoryImpl *       factory,
00071                                              const DDS::DomainId_t&             domain_id,
00072                                              const RepoId&                        dp_id,
00073                                              const DDS::DomainParticipantQos &  qos,
00074                                              DDS::DomainParticipantListener_ptr a_listener,
00075                                              const DDS::StatusMask &            mask,
00076                                              bool                                 federated)
00077   : factory_(factory),
00078     default_topic_qos_(TheServiceParticipant->initial_TopicQos()),
00079     default_publisher_qos_(TheServiceParticipant->initial_PublisherQos()),
00080     default_subscriber_qos_(TheServiceParticipant->initial_SubscriberQos()),
00081     qos_(qos),
00082     domain_id_(domain_id),
00083     dp_id_(dp_id),
00084     federated_(federated),
00085     shutdown_condition_(shutdown_mutex_),
00086     shutdown_complete_(false),
00087     monitor_(0),
00088     pub_id_gen_(dp_id_),
00089     automatic_liveliness_timer_ (*this),
00090     participant_liveliness_timer_ (*this)
00091 {
00092   (void) this->set_listener(a_listener, mask);
00093   monitor_ = TheServiceParticipant->monitor_factory_->create_dp_monitor(this);
00094 }
00095 
00096 // Implementation skeleton destructor
00097 DomainParticipantImpl::~DomainParticipantImpl()
00098 {
00099 }
00100 
00101 DDS::Publisher_ptr
00102 DomainParticipantImpl::create_publisher(
00103   const DDS::PublisherQos & qos,
00104   DDS::PublisherListener_ptr a_listener,
00105   DDS::StatusMask mask)
00106 {
00107   ACE_UNUSED_ARG(mask);
00108 
00109   DDS::PublisherQos pub_qos = qos;
00110 
00111   if (! this->validate_publisher_qos(pub_qos))
00112     return DDS::Publisher::_nil();
00113 
00114   PublisherImpl* pub = 0;
00115   ACE_NEW_RETURN(pub,
00116                  PublisherImpl(participant_handles_.next(),
00117                                pub_id_gen_.next(),
00118                                pub_qos,
00119                                a_listener,
00120                                mask,
00121                                this),
00122                  DDS::Publisher::_nil());
00123 
00124   if ((enabled_ == true) && (qos_.entity_factory.autoenable_created_entities == 1)) {
00125     pub->enable();
00126   }
00127 
00128   DDS::Publisher_ptr pub_obj(pub);
00129 
00130   // this object will also act as the guard for leaking Publisher Impl
00131   Publisher_Pair pair(pub, pub_obj, NO_DUP);
00132 
00133   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00134                    tao_mon,
00135                    this->publishers_protector_,
00136                    DDS::Publisher::_nil());
00137 
00138   if (OpenDDS::DCPS::insert(publishers_, pair) == -1) {
00139     ACE_ERROR((LM_ERROR,
00140                ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::create_publisher, ")
00141                ACE_TEXT("%p\n"),
00142                ACE_TEXT("insert")));
00143     return DDS::Publisher::_nil();
00144   }
00145 
00146   return DDS::Publisher::_duplicate(pub_obj);
00147 }
00148 
00149 DDS::ReturnCode_t
00150 DomainParticipantImpl::delete_publisher(
00151   DDS::Publisher_ptr p)
00152 {
00153   // The servant's ref count should be 2 at this point,
00154   // one referenced by poa, one referenced by the subscriber
00155   // set.
00156   PublisherImpl* the_servant = dynamic_cast<PublisherImpl*>(p);
00157 
00158   if (the_servant->is_clean() == 0) {
00159     ACE_ERROR((LM_ERROR,
00160                ACE_TEXT("(%P|%t) ERROR: ")
00161                ACE_TEXT("DomainParticipantImpl::delete_publisher, ")
00162                ACE_TEXT("The publisher is not empty.\n")));
00163     return DDS::RETCODE_PRECONDITION_NOT_MET;
00164   }
00165 
00166   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00167                    tao_mon,
00168                    this->publishers_protector_,
00169                    DDS::RETCODE_ERROR);
00170 
00171   Publisher_Pair pair(the_servant, p, DUP);
00172 
00173   if (OpenDDS::DCPS::remove(publishers_, pair) == -1) {
00174     ACE_ERROR((LM_ERROR,
00175                ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::delete_publisher, ")
00176                ACE_TEXT("%p\n"),
00177                ACE_TEXT("remove")));
00178     return DDS::RETCODE_ERROR;
00179 
00180   } else {
00181     return DDS::RETCODE_OK;
00182   }
00183 }
00184 
00185 DDS::Subscriber_ptr
00186 DomainParticipantImpl::create_subscriber(
00187   const DDS::SubscriberQos & qos,
00188   DDS::SubscriberListener_ptr a_listener,
00189   DDS::StatusMask mask)
00190 {
00191   DDS::SubscriberQos sub_qos = qos;
00192 
00193   if (! this->validate_subscriber_qos(sub_qos)) {
00194     return DDS::Subscriber::_nil();
00195   }
00196 
00197   SubscriberImpl* sub = 0 ;
00198   ACE_NEW_RETURN(sub,
00199                  SubscriberImpl(participant_handles_.next(),
00200                                 sub_qos,
00201                                 a_listener,
00202                                 mask,
00203                                 this),
00204                  DDS::Subscriber::_nil());
00205 
00206   if ((enabled_ == true) && (qos_.entity_factory.autoenable_created_entities == 1)) {
00207     sub->enable();
00208   }
00209 
00210   DDS::Subscriber_ptr sub_obj(sub);
00211 
00212   Subscriber_Pair pair(sub, sub_obj, NO_DUP);
00213 
00214   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00215                    tao_mon,
00216                    this->subscribers_protector_,
00217                    DDS::Subscriber::_nil());
00218 
00219   if (OpenDDS::DCPS::insert(subscribers_, pair) == -1) {
00220     ACE_ERROR((LM_ERROR,
00221                ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::create_subscriber, ")
00222                ACE_TEXT("%p\n"),
00223                ACE_TEXT("insert")));
00224     return DDS::Subscriber::_nil();
00225   }
00226 
00227   return DDS::Subscriber::_duplicate(sub_obj);
00228 }
00229 
00230 DDS::ReturnCode_t
00231 DomainParticipantImpl::delete_subscriber(
00232   DDS::Subscriber_ptr s)
00233 {
00234   // The servant's ref count should be 2 at this point,
00235   // one referenced by poa, one referenced by the subscriber
00236   // set.
00237   SubscriberImpl* the_servant = dynamic_cast<SubscriberImpl*>(s);
00238 
00239   if (the_servant->is_clean() == 0) {
00240     ACE_ERROR((LM_ERROR,
00241                ACE_TEXT("(%P|%t) ERROR: ")
00242                ACE_TEXT("DomainParticipantImpl::delete_subscriber, ")
00243                ACE_TEXT("The subscriber is not empty.\n")));
00244     return DDS::RETCODE_PRECONDITION_NOT_MET;
00245   }
00246 
00247   DDS::ReturnCode_t ret
00248   = the_servant->delete_contained_entities();
00249 
00250   if (ret != DDS::RETCODE_OK) {
00251     ACE_ERROR((LM_ERROR,
00252                ACE_TEXT("(%P|%t) ERROR: ")
00253                ACE_TEXT("DomainParticipantImpl::delete_subscriber, ")
00254                ACE_TEXT("Failed to delete contained entities.\n")));
00255     return DDS::RETCODE_ERROR;
00256   }
00257 
00258   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00259                    tao_mon,
00260                    this->subscribers_protector_,
00261                    DDS::RETCODE_ERROR);
00262 
00263   Subscriber_Pair pair(the_servant, s, DUP);
00264 
00265   if (OpenDDS::DCPS::remove(subscribers_, pair) == -1) {
00266     ACE_ERROR((LM_ERROR,
00267                ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::delete_subscriber, ")
00268                ACE_TEXT("%p\n"),
00269                ACE_TEXT("remove")));
00270     return DDS::RETCODE_ERROR;
00271 
00272   } else {
00273     return DDS::RETCODE_OK;
00274   }
00275 }
00276 
00277 DDS::Subscriber_ptr
00278 DomainParticipantImpl::get_builtin_subscriber()
00279 {
00280   return DDS::Subscriber::_duplicate(bit_subscriber_.in());
00281 }
00282 
00283 DDS::Topic_ptr
00284 DomainParticipantImpl::create_topic(
00285   const char * topic_name,
00286   const char * type_name,
00287   const DDS::TopicQos & qos,
00288   DDS::TopicListener_ptr a_listener,
00289   DDS::StatusMask mask)
00290 {
00291   return create_topic_i(topic_name,
00292                         type_name,
00293                         qos,
00294                         a_listener,
00295                         mask,
00296                         0);
00297 }
00298 
00299 DDS::Topic_ptr
00300 DomainParticipantImpl::create_typeless_topic(
00301   const char * topic_name,
00302   const char * type_name,
00303   bool type_has_keys,
00304   const DDS::TopicQos & qos,
00305   DDS::TopicListener_ptr a_listener,
00306   DDS::StatusMask mask)
00307 {
00308   int topic_mask = (type_has_keys ? TOPIC_TYPE_HAS_KEYS : 0 ) | TOPIC_TYPELESS;
00309 
00310   return create_topic_i(topic_name,
00311                         type_name,
00312                         qos,
00313                         a_listener,
00314                         mask,
00315                         topic_mask);
00316 }
00317 
00318 
00319 DDS::Topic_ptr
00320 DomainParticipantImpl::create_topic_i(
00321   const char * topic_name,
00322   const char * type_name,
00323   const DDS::TopicQos & qos,
00324   DDS::TopicListener_ptr a_listener,
00325   DDS::StatusMask mask,
00326   int topic_mask)
00327 {
00328   DDS::TopicQos topic_qos;
00329 
00330   if (qos == TOPIC_QOS_DEFAULT) {
00331     this->get_default_topic_qos(topic_qos);
00332 
00333   } else {
00334     topic_qos = qos;
00335   }
00336 
00337   OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(qos, DDS::Topic::_nil());
00338   OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(qos, DDS::Topic::_nil());
00339   OPENDDS_NO_DURABILITY_SERVICE_COMPATIBILITY_CHECK(qos, DDS::Topic::_nil());
00340   OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(qos, DDS::Topic::_nil());
00341 
00342   if (!Qos_Helper::valid(topic_qos)) {
00343     ACE_ERROR((LM_ERROR,
00344                ACE_TEXT("(%P|%t) ERROR: ")
00345                ACE_TEXT("DomainParticipantImpl::create_topic, ")
00346                ACE_TEXT("invalid qos.\n")));
00347     return DDS::Topic::_nil();
00348   }
00349 
00350   if (!Qos_Helper::consistent(topic_qos)) {
00351     ACE_ERROR((LM_ERROR,
00352                ACE_TEXT("(%P|%t) ERROR: ")
00353                ACE_TEXT("DomainParticipantImpl::create_topic, ")
00354                ACE_TEXT("inconsistent qos.\n")));
00355     return DDS::Topic::_nil();
00356   }
00357 
00358   TopicMap::mapped_type* entry = 0;
00359   bool found = false;
00360   {
00361     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00362                      tao_mon,
00363                      this->topics_protector_,
00364                      DDS::Topic::_nil());
00365 
00366 #if !defined(OPENDDS_NO_CONTENT_FILTERED_TOPIC) || !defined(OPENDDS_NO_MULTI_TOPIC)
00367     if (topic_descrs_.count(topic_name)) {
00368       if (DCPS_debug_level > 3) {
00369         ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00370           ACE_TEXT("DomainParticipantImpl::create_topic, ")
00371           ACE_TEXT("can't create a Topic due to name \"%C\" already in use ")
00372           ACE_TEXT("by a TopicDescription.\n"), topic_name));
00373       }
00374       return 0;
00375     }
00376 #endif
00377 
00378     if (Util::find(topics_, topic_name, entry) == 0) {
00379       found = true;
00380     }
00381   }
00382 
00383   if (found) {
00384     CORBA::String_var found_type
00385     = entry->pair_.svt_->get_type_name();
00386 
00387     if (ACE_OS::strcmp(type_name, found_type) == 0) {
00388       DDS::TopicQos found_qos;
00389       entry->pair_.svt_->get_qos(found_qos);
00390 
00391       if (topic_qos == found_qos) { // match type name, qos
00392         {
00393           ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00394                            tao_mon,
00395                            this->topics_protector_,
00396                            DDS::Topic::_nil());
00397           entry->client_refs_ ++;
00398         }
00399         return DDS::Topic::_duplicate(entry->pair_.obj_.in());
00400 
00401       } else {
00402         if (DCPS_debug_level >= 1) {
00403           ACE_DEBUG((LM_DEBUG,
00404                      ACE_TEXT("(%P|%t) DomainParticipantImpl::create_topic, ")
00405                      ACE_TEXT("qos not match: topic_name=%C type_name=%C\n"),
00406                      topic_name, type_name));
00407         }
00408 
00409         return DDS::Topic::_nil();
00410       }
00411 
00412     } else { // no match
00413       if (DCPS_debug_level >= 1) {
00414         ACE_DEBUG((LM_DEBUG,
00415                    ACE_TEXT("(%P|%t) DomainParticipantImpl::create_topic, ")
00416                    ACE_TEXT(" not match: topic_name=%C type_name=%C\n"),
00417                    topic_name, type_name));
00418       }
00419 
00420       return DDS::Topic::_nil();
00421     }
00422 
00423   } else {
00424 
00425     OpenDDS::DCPS::TypeSupport_var type_support;
00426     bool has_keys = (topic_mask & TOPIC_TYPE_HAS_KEYS);
00427 
00428     if (0 == topic_mask) {
00429        // creating a topic with compile time type
00430       type_support = Registered_Data_Types->lookup(this, type_name);
00431       has_keys = type_support->has_dcps_key();
00432     }
00433     RepoId topic_id;
00434 
00435     Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
00436     TopicStatus status = disco->assert_topic(topic_id,
00437                                              domain_id_,
00438                                              dp_id_,
00439                                              topic_name,
00440                                              type_name,
00441                                              topic_qos,
00442                                              has_keys);
00443 
00444     if (status == CREATED || status == FOUND) {
00445       DDS::Topic_ptr new_topic = create_new_topic(topic_id,
00446                                                 topic_name,
00447                                                 type_name,
00448                                                 topic_qos,
00449                                                 a_listener,
00450                                                 mask,
00451                                                 type_support);
00452       if (this->monitor_) {
00453         this->monitor_->report();
00454       }
00455       return new_topic;
00456 
00457     } else {
00458       ACE_ERROR((LM_ERROR,
00459                  ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::create_topic, ")
00460                  ACE_TEXT("assert_topic failed.\n")));
00461       return DDS::Topic::_nil();
00462     }
00463   }
00464 }
00465 
00466 DDS::ReturnCode_t
00467 DomainParticipantImpl::delete_topic(
00468   DDS::Topic_ptr a_topic)
00469 {
00470   return delete_topic_i(a_topic, false);
00471 }
00472 
00473 DDS::ReturnCode_t
00474 DomainParticipantImpl::delete_topic_i(
00475   DDS::Topic_ptr a_topic,
00476   bool             remove_objref)
00477 {
00478 
00479   DDS::ReturnCode_t ret = DDS::RETCODE_OK;
00480 
00481   try {
00482     // The servant's ref count should be greater than 2 at this point,
00483     // one referenced by poa, one referenced by the topic map and
00484     // others referenced by the datareader/datawriter.
00485     TopicImpl* the_topic_servant = dynamic_cast<TopicImpl*>(a_topic);
00486 
00487     CORBA::String_var topic_name = the_topic_servant->get_name();
00488 
00489     DDS::DomainParticipant_var dp = the_topic_servant->get_participant();
00490 
00491     DomainParticipantImpl* the_dp_servant =
00492       dynamic_cast<DomainParticipantImpl*>(dp.in());
00493 
00494     if (the_dp_servant != this ||
00495         (!remove_objref && the_topic_servant->entity_refs())) {
00496       // If entity_refs is true (nonzero), then some reader or writer is using
00497       // this topic and the spec requires delete_topic() to fail with the error:
00498       return DDS::RETCODE_PRECONDITION_NOT_MET;
00499     }
00500 
00501     {
00502 
00503       ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00504                        tao_mon,
00505                        this->topics_protector_,
00506                        DDS::RETCODE_ERROR);
00507 
00508       TopicMap::mapped_type* entry = 0;
00509 
00510       if (Util::find(topics_, topic_name.in(), entry) == -1) {
00511         ACE_ERROR_RETURN((LM_ERROR,
00512                           ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::delete_topic_i, ")
00513                           ACE_TEXT("%p\n"),
00514                           ACE_TEXT("find")),
00515                          DDS::RETCODE_ERROR);
00516       }
00517 
00518       entry->client_refs_ --;
00519 
00520       if (remove_objref == true ||
00521           0 == entry->client_refs_) {
00522         //TBD - mark the TopicImpl as deleted and make it
00523         //      reject calls to the TopicImpl.
00524         Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
00525         TopicStatus status
00526         = disco->remove_topic(the_dp_servant->get_domain_id(),
00527                               the_dp_servant->get_id(),
00528                               the_topic_servant->get_id());
00529 
00530         if (status != REMOVED) {
00531           ACE_ERROR_RETURN((LM_ERROR,
00532                             ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::delete_topic_i, ")
00533                             ACE_TEXT("remove_topic failed\n")),
00534                            DDS::RETCODE_ERROR);
00535         }
00536 
00537         // note: this will destroy the TopicImpl if there are no
00538         // client object reference to it.
00539         if (topics_.erase(topic_name.in()) == 0) {
00540           ACE_ERROR_RETURN((LM_ERROR,
00541                             ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::delete_topic_i, ")
00542                             ACE_TEXT("%p \n"),
00543                             ACE_TEXT("unbind")),
00544                            DDS::RETCODE_ERROR);
00545 
00546         } else
00547           return DDS::RETCODE_OK;
00548 
00549       }
00550     }
00551 
00552   } catch (...) {
00553     ACE_ERROR((LM_ERROR,
00554                ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::delete_topic_i, ")
00555                ACE_TEXT(" Caught Unknown Exception \n")));
00556     ret = DDS::RETCODE_ERROR;
00557   }
00558 
00559   return ret;
00560 }
00561 
00562 //Note: caller should NOT assign to Topic_var (without _duplicate'ing)
00563 //      because it will steal the framework's reference.
00564 DDS::Topic_ptr
00565 DomainParticipantImpl::find_topic(
00566   const char * topic_name,
00567   const DDS::Duration_t & timeout)
00568 {
00569   ACE_Time_Value timeout_tv
00570   = ACE_OS::gettimeofday() + ACE_Time_Value(timeout.sec, timeout.nanosec/1000);
00571 
00572   int first_time = 1;
00573 
00574   while (first_time || ACE_OS::gettimeofday() < timeout_tv) {
00575     if (first_time) {
00576       first_time = 0;
00577     }
00578 
00579     TopicMap::mapped_type* entry = 0;
00580     {
00581       ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00582                        tao_mon,
00583                        this->topics_protector_,
00584                        DDS::Topic::_nil());
00585 
00586       if (Util::find(topics_, topic_name, entry) == 0) {
00587         entry->client_refs_ ++;
00588         return DDS::Topic::_duplicate(entry->pair_.obj_.in());
00589       }
00590     }
00591 
00592     RepoId topic_id;
00593     CORBA::String_var type_name;
00594     DDS::TopicQos_var qos;
00595 
00596     Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
00597     TopicStatus status = disco->find_topic(domain_id_,
00598                                            topic_name,
00599                                            type_name.out(),
00600                                            qos.out(),
00601                                            topic_id);
00602 
00603 
00604     if (status == FOUND) {
00605       OpenDDS::DCPS::TypeSupport_var type_support =
00606         Registered_Data_Types->lookup(this, type_name.in());
00607       if (CORBA::is_nil(type_support)) {
00608         if (DCPS_debug_level) {
00609             ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00610                        ACE_TEXT("DomainParticipantImpl::find_topic, ")
00611                        ACE_TEXT("can't create a Topic: type_name \"%C\"")
00612                        ACE_TEXT("is not registered.\n"), type_name.in()));
00613         }
00614 
00615         return DDS::Topic::_nil();
00616       }
00617 
00618       DDS::Topic_ptr new_topic = create_new_topic(topic_id,
00619                                                   topic_name,
00620                                                   type_name,
00621                                                   qos,
00622                                                   DDS::TopicListener::_nil(),
00623                                                   OpenDDS::DCPS::DEFAULT_STATUS_MASK,
00624                                                   type_support);
00625       return new_topic;
00626 
00627     } else if (status == INTERNAL_ERROR) {
00628       ACE_ERROR((LM_ERROR,
00629                  ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::find_topic - ")
00630                  ACE_TEXT("topic not found, discovery returned INTERNAL_ERROR!\n")));
00631       return DDS::Topic::_nil();
00632     } else {
00633       ACE_Time_Value now = ACE_OS::gettimeofday();
00634 
00635       if (now < timeout_tv) {
00636         ACE_Time_Value remaining = timeout_tv - now;
00637 
00638         if (remaining.sec() >= 1) {
00639           ACE_OS::sleep(1);
00640 
00641         } else {
00642           ACE_OS::sleep(remaining);
00643         }
00644       }
00645     }
00646   }
00647 
00648   if (DCPS_debug_level >= 1) {
00649     // timed out
00650     ACE_DEBUG((LM_DEBUG,
00651                ACE_TEXT("(%P|%t) DomainParticipantImpl::find_topic, ")
00652                ACE_TEXT("timed out. \n")));
00653   }
00654 
00655   return DDS::Topic::_nil();
00656 }
00657 
00658 DDS::TopicDescription_ptr
00659 DomainParticipantImpl::lookup_topicdescription(const char* name)
00660 {
00661   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00662                    tao_mon,
00663                    this->topics_protector_,
00664                    DDS::Topic::_nil());
00665 
00666   TopicMap::mapped_type* entry = 0;
00667 
00668   if (Util::find(topics_, name, entry) == -1) {
00669 #if !defined(OPENDDS_NO_CONTENT_FILTERED_TOPIC) || !defined(OPENDDS_NO_MULTI_TOPIC)
00670     TopicDescriptionMap::iterator iter = topic_descrs_.find(name);
00671     if (iter != topic_descrs_.end()) {
00672       return DDS::TopicDescription::_duplicate(iter->second);
00673     }
00674 #endif
00675     return DDS::TopicDescription::_nil();
00676 
00677   } else {
00678     return DDS::TopicDescription::_duplicate(entry->pair_.obj_.in());
00679   }
00680 }
00681 
00682 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00683 
00684 DDS::ContentFilteredTopic_ptr
00685 DomainParticipantImpl::create_contentfilteredtopic(
00686   const char* name,
00687   DDS::Topic_ptr related_topic,
00688   const char* filter_expression,
00689   const DDS::StringSeq& expression_parameters)
00690 {
00691   if (CORBA::is_nil(related_topic)) {
00692     if (DCPS_debug_level > 3) {
00693       ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00694         ACE_TEXT("DomainParticipantImpl::create_contentfilteredtopic, ")
00695         ACE_TEXT("can't create a content-filtered topic due to null related ")
00696         ACE_TEXT("topic.\n")));
00697     }
00698     return 0;
00699   }
00700 
00701   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, topics_protector_, 0);
00702 
00703   if (topics_.count(name)) {
00704     if (DCPS_debug_level > 3) {
00705       ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00706         ACE_TEXT("DomainParticipantImpl::create_contentfilteredtopic, ")
00707         ACE_TEXT("can't create a content-filtered topic due to name \"%C\" ")
00708         ACE_TEXT("already in use by a Topic.\n"), name));
00709     }
00710     return 0;
00711   }
00712 
00713   if (topic_descrs_.count(name)) {
00714     if (DCPS_debug_level > 3) {
00715       ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00716         ACE_TEXT("DomainParticipantImpl::create_contentfilteredtopic, ")
00717         ACE_TEXT("can't create a content-filtered topic due to name \"%C\" ")
00718         ACE_TEXT("already in use by a TopicDescription.\n"), name));
00719     }
00720     return 0;
00721   }
00722 
00723   DDS::ContentFilteredTopic_var cft;
00724   try {
00725     cft = new ContentFilteredTopicImpl(name,
00726       related_topic, filter_expression, expression_parameters, this);
00727   } catch (const std::exception& e) {
00728     if (DCPS_debug_level) {
00729       ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00730         ACE_TEXT("DomainParticipantImpl::create_contentfilteredtopic, ")
00731         ACE_TEXT("can't create a content-filtered topic due to runtime error: ")
00732         ACE_TEXT("%C.\n"), e.what()));
00733     }
00734     return 0;
00735   }
00736   DDS::TopicDescription_var td = DDS::TopicDescription::_duplicate(cft);
00737   topic_descrs_[name] = td;
00738   return cft._retn();
00739 }
00740 
00741 DDS::ReturnCode_t DomainParticipantImpl::delete_contentfilteredtopic(
00742   DDS::ContentFilteredTopic_ptr a_contentfilteredtopic)
00743 {
00744   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, topics_protector_,
00745                    DDS::RETCODE_OUT_OF_RESOURCES);
00746   DDS::ContentFilteredTopic_var cft =
00747     DDS::ContentFilteredTopic::_duplicate(a_contentfilteredtopic);
00748   CORBA::String_var name = cft->get_name();
00749   TopicDescriptionMap::iterator iter = topic_descrs_.find(name.in());
00750   if (iter == topic_descrs_.end()) {
00751     if (DCPS_debug_level > 3) {
00752       ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00753         ACE_TEXT("DomainParticipantImpl::delete_contentfilteredtopic, ")
00754         ACE_TEXT("can't delete a content-filtered topic \"%C\" ")
00755         ACE_TEXT("because it is not in the set.\n"), name.in ()));
00756     }
00757     return DDS::RETCODE_PRECONDITION_NOT_MET;
00758   }
00759   if (dynamic_cast<TopicDescriptionImpl*>(iter->second.in())->has_reader()) {
00760     if (DCPS_debug_level > 3) {
00761       ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00762         ACE_TEXT("DomainParticipantImpl::delete_contentfilteredtopic, ")
00763         ACE_TEXT("can't delete a content-filtered topic \"%C\" ")
00764         ACE_TEXT("because it still is used by a reader.\n"), name.in ()));
00765     }
00766     return DDS::RETCODE_PRECONDITION_NOT_MET;
00767   }
00768   topic_descrs_.erase(iter);
00769   return DDS::RETCODE_OK;
00770 }
00771 
00772 #endif // OPENDDS_NO_CONTENT_FILTERED_TOPIC
00773 
00774 #ifndef OPENDDS_NO_MULTI_TOPIC
00775 
00776 DDS::MultiTopic_ptr DomainParticipantImpl::create_multitopic(
00777   const char* name, const char* type_name,
00778   const char* subscription_expression,
00779   const DDS::StringSeq& expression_parameters)
00780 {
00781   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, topics_protector_, 0);
00782 
00783   if (topics_.count(name)) {
00784     if (DCPS_debug_level > 3) {
00785       ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00786         ACE_TEXT("DomainParticipantImpl::create_multitopic, ")
00787         ACE_TEXT("can't create a multi topic due to name \"%C\" ")
00788         ACE_TEXT("already in use by a Topic.\n"), name));
00789     }
00790     return 0;
00791   }
00792 
00793   if (topic_descrs_.count(name)) {
00794     if (DCPS_debug_level > 3) {
00795       ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00796         ACE_TEXT("DomainParticipantImpl::create_multitopic, ")
00797         ACE_TEXT("can't create a multi topic due to name \"%C\" ")
00798         ACE_TEXT("already in use by a TopicDescription.\n"), name));
00799     }
00800     return 0;
00801   }
00802 
00803   DDS::MultiTopic_var mt;
00804   try {
00805     mt = new MultiTopicImpl(name, type_name, subscription_expression,
00806       expression_parameters, this);
00807   } catch (const std::exception& e) {
00808     if (DCPS_debug_level) {
00809       ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00810         ACE_TEXT("DomainParticipantImpl::create_multitopic, ")
00811         ACE_TEXT("can't create a multi topic due to runtime error: ")
00812         ACE_TEXT("%C.\n"), e.what()));
00813     }
00814     return 0;
00815   }
00816   DDS::TopicDescription_var td = DDS::TopicDescription::_duplicate(mt);
00817   topic_descrs_[name] = td;
00818   return mt._retn();
00819 }
00820 
00821 DDS::ReturnCode_t DomainParticipantImpl::delete_multitopic(
00822   DDS::MultiTopic_ptr a_multitopic)
00823 {
00824   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, topics_protector_,
00825                    DDS::RETCODE_OUT_OF_RESOURCES);
00826   DDS::MultiTopic_var mt = DDS::MultiTopic::_duplicate(a_multitopic);
00827   CORBA::String_var mt_name = mt->get_name();
00828   TopicDescriptionMap::iterator iter = topic_descrs_.find(mt_name.in());
00829   if (iter == topic_descrs_.end()) {
00830     return DDS::RETCODE_PRECONDITION_NOT_MET;
00831   }
00832   if (dynamic_cast<TopicDescriptionImpl*>(iter->second.in())->has_reader()) {
00833     return DDS::RETCODE_PRECONDITION_NOT_MET;
00834   }
00835   topic_descrs_.erase(iter);
00836   return DDS::RETCODE_OK;
00837 }
00838 
00839 #endif // OPENDDS_NO_MULTI_TOPIC
00840 
00841 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00842 
00843 RcHandle<FilterEvaluator>
00844 DomainParticipantImpl::get_filter_eval(const char* filter)
00845 {
00846   ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, filter_cache_lock_,
00847                    RcHandle<FilterEvaluator>());
00848   typedef std::map<OPENDDS_STRING, RcHandle<FilterEvaluator> > Map;
00849   Map::iterator iter = filter_cache_.find(filter);
00850   if (iter == filter_cache_.end()) {
00851     return filter_cache_[filter] = new FilterEvaluator(filter, false);
00852   }
00853   return iter->second;
00854 }
00855 
00856 void
00857 DomainParticipantImpl::deref_filter_eval(const char* filter)
00858 {
00859   ACE_GUARD(ACE_Thread_Mutex, guard, filter_cache_lock_);
00860   typedef std::map<OPENDDS_STRING, RcHandle<FilterEvaluator> > Map;
00861   Map::iterator iter = filter_cache_.find(filter);
00862   if (iter != filter_cache_.end()) {
00863     if (iter->second->ref_count() == 1) {
00864       filter_cache_.erase(iter);
00865     }
00866   }
00867 }
00868 
00869 #endif
00870 
00871 DDS::ReturnCode_t
00872 DomainParticipantImpl::delete_contained_entities()
00873 {
00874   // mark that the entity is being deleted
00875   set_deleted(true);
00876 
00877   // BIT subscriber and data readers will be deleted with the
00878   // rest of the entities, so need to report to discovery that
00879   // BIT is no longer available
00880   Discovery_rch disc = TheServiceParticipant->get_discovery(this->domain_id_);
00881   disc->fini_bit(this);
00882 
00883   TheServiceParticipant->reactor()->notify(this);
00884 
00885   shutdown_mutex_.acquire();
00886   while (!shutdown_complete_) {
00887     shutdown_condition_.wait();
00888   }
00889   shutdown_complete_ = false;
00890   shutdown_mutex_.release();
00891 
00892   bit_subscriber_ = DDS::Subscriber::_nil();
00893 
00894   OpenDDS::DCPS::Registered_Data_Types->unregister_participant(this);
00895 
00896   participant_objref_ = DDS::DomainParticipant::_nil();
00897 
00898   // the participant can now start creating new contained entities
00899   set_deleted(false);
00900   return shutdown_result_;
00901 }
00902 
00903 CORBA::Boolean
00904 DomainParticipantImpl::contains_entity(DDS::InstanceHandle_t a_handle)
00905 {
00906   /// Check top-level containers for Topic, Subscriber,
00907   /// and Publisher instances.
00908   {
00909     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00910                      guard,
00911                      this->topics_protector_,
00912                      false);
00913 
00914     for (TopicMap::iterator it(topics_.begin());
00915          it != topics_.end(); ++it) {
00916       if (a_handle == it->second.pair_.svt_->get_instance_handle())
00917         return true;
00918     }
00919   }
00920 
00921   {
00922     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00923                      guard,
00924                      this->subscribers_protector_,
00925                      false);
00926 
00927     for (SubscriberSet::iterator it(subscribers_.begin());
00928          it != subscribers_.end(); ++it) {
00929       if (a_handle == it->svt_->get_instance_handle())
00930         return true;
00931     }
00932   }
00933 
00934   {
00935     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00936                      guard,
00937                      this->publishers_protector_,
00938                      false);
00939 
00940     for (PublisherSet::iterator it(publishers_.begin());
00941          it != publishers_.end(); ++it) {
00942       if (a_handle == it->svt_->get_instance_handle())
00943         return true;
00944     }
00945   }
00946 
00947   /// Recurse into SubscriberImpl and PublisherImpl for
00948   /// DataReader and DataWriter instances respectively.
00949   for (SubscriberSet::iterator it(subscribers_.begin());
00950        it != subscribers_.end(); ++it) {
00951     if (it->svt_->contains_reader(a_handle))
00952       return true;
00953   }
00954 
00955   for (PublisherSet::iterator it(publishers_.begin());
00956        it != publishers_.end(); ++it) {
00957     if (it->svt_->contains_writer(a_handle))
00958       return true;
00959   }
00960 
00961   return false;
00962 }
00963 
00964 DDS::ReturnCode_t
00965 DomainParticipantImpl::set_qos(
00966   const DDS::DomainParticipantQos & qos)
00967 {
00968   if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
00969     if (qos_ == qos)
00970       return DDS::RETCODE_OK;
00971 
00972     // for the not changeable qos, it can be changed before enable
00973     if (!Qos_Helper::changeable(qos_, qos) && enabled_ == true) {
00974       return DDS::RETCODE_IMMUTABLE_POLICY;
00975 
00976     } else {
00977       qos_ = qos;
00978 
00979       Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
00980       const bool status =
00981         disco->update_domain_participant_qos(domain_id_,
00982                                              dp_id_,
00983                                              qos_);
00984 
00985       if (!status) {
00986         ACE_ERROR_RETURN((LM_ERROR,
00987                           ACE_TEXT("(%P|%t) DomainParticipantImpl::set_qos, ")
00988                           ACE_TEXT("failed on compatiblity check. \n")),
00989                          DDS::RETCODE_ERROR);
00990       }
00991     }
00992 
00993     return DDS::RETCODE_OK;
00994 
00995   } else {
00996     return DDS::RETCODE_INCONSISTENT_POLICY;
00997   }
00998 }
00999 
01000 DDS::ReturnCode_t
01001 DomainParticipantImpl::get_qos(
01002   DDS::DomainParticipantQos & qos)
01003 {
01004   qos = qos_;
01005   return DDS::RETCODE_OK;
01006 }
01007 
01008 DDS::ReturnCode_t
01009 DomainParticipantImpl::set_listener(
01010   DDS::DomainParticipantListener_ptr a_listener,
01011   DDS::StatusMask mask)
01012 {
01013   listener_mask_ = mask;
01014   //note: OK to duplicate  a nil object ref
01015   listener_ = DDS::DomainParticipantListener::_duplicate(a_listener);
01016   return DDS::RETCODE_OK;
01017 }
01018 
01019 DDS::DomainParticipantListener_ptr
01020 DomainParticipantImpl::get_listener()
01021 {
01022   return DDS::DomainParticipantListener::_duplicate(listener_.in());
01023 }
01024 
01025 DDS::ReturnCode_t
01026 DomainParticipantImpl::ignore_participant(
01027   DDS::InstanceHandle_t handle)
01028 {
01029 #if !defined (DDS_HAS_MINIMUM_BIT)
01030 
01031   if (enabled_ == false) {
01032     ACE_ERROR_RETURN((LM_ERROR,
01033                       ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::ignore_participant, ")
01034                       ACE_TEXT(" Entity is not enabled. \n")),
01035                      DDS::RETCODE_NOT_ENABLED);
01036   }
01037 
01038   RepoId ignoreId = get_repoid(handle);
01039   HandleMap::const_iterator location = this->ignored_participants_.find(ignoreId);
01040 
01041   if (location == this->ignored_participants_.end()) {
01042     this->ignored_participants_[ ignoreId] = handle;
01043   }
01044   else {// ignore same participant again, just return ok.
01045     return DDS::RETCODE_OK;
01046   }
01047 
01048   if (DCPS_debug_level >= 4) {
01049     GuidConverter converter(dp_id_);
01050     ACE_DEBUG((LM_DEBUG,
01051                ACE_TEXT("(%P|%t) DomainParticipantImpl::ignore_participant: ")
01052                ACE_TEXT("%C ignoring handle %x.\n"),
01053                OPENDDS_STRING(converter).c_str(),
01054                handle));
01055   }
01056 
01057   Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
01058   if (!disco->ignore_domain_participant(domain_id_,
01059                                         dp_id_,
01060                                         ignoreId)) {
01061     ACE_ERROR_RETURN((LM_ERROR,
01062                       ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::ignore_participant, ")
01063                       ACE_TEXT(" Could not ignore domain participant.\n")),
01064                      DDS::RETCODE_NOT_ENABLED);
01065     return DDS::RETCODE_ERROR;
01066   }
01067 
01068 
01069   if (DCPS_debug_level >= 4) {
01070     GuidConverter converter(dp_id_);
01071     ACE_DEBUG((LM_DEBUG,
01072                ACE_TEXT("(%P|%t) DomainParticipantImpl::ignore_participant: ")
01073                ACE_TEXT("%C repo call returned.\n"),
01074                OPENDDS_STRING(converter).c_str()));
01075   }
01076 
01077   return DDS::RETCODE_OK;
01078 #else
01079   ACE_UNUSED_ARG(handle);
01080   return DDS::RETCODE_UNSUPPORTED;
01081 #endif // !defined (DDS_HAS_MINIMUM_BIT)
01082 }
01083 
01084 DDS::ReturnCode_t
01085 DomainParticipantImpl::ignore_topic(
01086   DDS::InstanceHandle_t handle)
01087 {
01088 #if !defined (DDS_HAS_MINIMUM_BIT)
01089 
01090   if (enabled_ == false) {
01091     ACE_ERROR_RETURN((LM_ERROR,
01092                       ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::ignore_topic, ")
01093                       ACE_TEXT(" Entity is not enabled. \n")),
01094                      DDS::RETCODE_NOT_ENABLED);
01095   }
01096 
01097   RepoId ignoreId = get_repoid(handle);
01098   HandleMap::const_iterator location = this->ignored_topics_.find(ignoreId);
01099 
01100   if (location == this->ignored_topics_.end()) {
01101     this->ignored_topics_[ ignoreId] = handle;
01102   }
01103   else { // ignore same topic again, just return ok.
01104     return DDS::RETCODE_OK;
01105   }
01106 
01107   if (DCPS_debug_level >= 4) {
01108     GuidConverter converter(dp_id_);
01109     ACE_DEBUG((LM_DEBUG,
01110                ACE_TEXT("(%P|%t) DomainParticipantImpl::ignore_topic: ")
01111                ACE_TEXT("%C ignoring handle %x.\n"),
01112                OPENDDS_STRING(converter).c_str(),
01113                handle));
01114   }
01115 
01116   Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
01117   if (!disco->ignore_topic(domain_id_,
01118                            dp_id_,
01119                            ignoreId)) {
01120     ACE_ERROR((LM_ERROR,
01121                ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::ignore_topic, ")
01122                ACE_TEXT(" Could not ignore topic.\n")));
01123   }
01124 
01125   return DDS::RETCODE_OK;
01126 #else
01127   ACE_UNUSED_ARG(handle);
01128   return DDS::RETCODE_UNSUPPORTED;
01129 #endif // !defined (DDS_HAS_MINIMUM_BIT)
01130 }
01131 
01132 DDS::ReturnCode_t
01133 DomainParticipantImpl::ignore_publication(
01134   DDS::InstanceHandle_t handle)
01135 {
01136 #if !defined (DDS_HAS_MINIMUM_BIT)
01137 
01138   if (enabled_ == false) {
01139     ACE_ERROR_RETURN((LM_ERROR,
01140                       ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::ignore_publication, ")
01141                       ACE_TEXT(" Entity is not enabled. \n")),
01142                      DDS::RETCODE_NOT_ENABLED);
01143   }
01144 
01145   if (DCPS_debug_level >= 4) {
01146     GuidConverter converter(dp_id_);
01147     ACE_DEBUG((LM_DEBUG,
01148                ACE_TEXT("(%P|%t) DomainParticipantImpl::ignore_publication: ")
01149                ACE_TEXT("%C ignoring handle %x.\n"),
01150                OPENDDS_STRING(converter).c_str(),
01151                handle));
01152   }
01153 
01154   RepoId ignoreId = get_repoid(handle);
01155   Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
01156   if (!disco->ignore_publication(domain_id_,
01157                                  dp_id_,
01158                                  ignoreId)) {
01159     ACE_ERROR_RETURN((LM_ERROR,
01160                       ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::ignore_publication, ")
01161                       ACE_TEXT(" could not ignore publication in discovery. \n")),
01162                      DDS::RETCODE_ERROR);
01163   }
01164 
01165   return DDS::RETCODE_OK;
01166 #else
01167   ACE_UNUSED_ARG(handle);
01168   return DDS::RETCODE_UNSUPPORTED;
01169 #endif // !defined (DDS_HAS_MINIMUM_BIT)
01170 }
01171 
01172 DDS::ReturnCode_t
01173 DomainParticipantImpl::ignore_subscription(
01174   DDS::InstanceHandle_t handle)
01175 {
01176 #if !defined (DDS_HAS_MINIMUM_BIT)
01177 
01178   if (enabled_ == false) {
01179     ACE_ERROR_RETURN((LM_ERROR,
01180                       ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::ignore_subscription, ")
01181                       ACE_TEXT(" Entity is not enabled. \n")),
01182                      DDS::RETCODE_NOT_ENABLED);
01183   }
01184 
01185   if (DCPS_debug_level >= 4) {
01186     GuidConverter converter(dp_id_);
01187     ACE_DEBUG((LM_DEBUG,
01188                ACE_TEXT("(%P|%t) DomainParticipantImpl::ignore_subscription: ")
01189                ACE_TEXT("%C ignoring handle %d.\n"),
01190                OPENDDS_STRING(converter).c_str(),
01191                handle));
01192   }
01193 
01194 
01195   RepoId ignoreId = get_repoid(handle);
01196   Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
01197   if (!disco->ignore_subscription(domain_id_,
01198                                   dp_id_,
01199                                   ignoreId)) {
01200     ACE_ERROR_RETURN((LM_ERROR,
01201                       ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::ignore_subscription, ")
01202                       ACE_TEXT(" could not ignore subscription in discovery. \n")),
01203                      DDS::RETCODE_ERROR);
01204   }
01205 
01206   return DDS::RETCODE_OK;
01207 #else
01208   ACE_UNUSED_ARG(handle);
01209   return DDS::RETCODE_UNSUPPORTED;
01210 #endif // !defined (DDS_HAS_MINIMUM_BIT)
01211 }
01212 
01213 DDS::DomainId_t
01214 DomainParticipantImpl::get_domain_id()
01215 {
01216   return domain_id_;
01217 }
01218 
01219 DDS::ReturnCode_t
01220 DomainParticipantImpl::assert_liveliness()
01221 {
01222   // This operation needs to only be used if the DomainParticipant contains
01223   // DataWriter entities with the LIVELINESS set to MANUAL_BY_PARTICIPANT and
01224   // it only affects the liveliness of those DataWriter entities. Otherwise,
01225   // it has no effect.
01226   // This will do nothing in current implementation since we only
01227   // support the AUTOMATIC liveliness qos for datawriter.
01228   // Add implementation here.
01229 
01230   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01231                    tao_mon,
01232                    this->publishers_protector_,
01233                    DDS::RETCODE_ERROR);
01234 
01235   for (PublisherSet::iterator it(publishers_.begin());
01236        it != publishers_.end(); ++it) {
01237     it->svt_->assert_liveliness_by_participant();
01238   }
01239 
01240   last_liveliness_activity_ = ACE_OS::gettimeofday();
01241 
01242   return DDS::RETCODE_OK;
01243 }
01244 
01245 DDS::ReturnCode_t
01246 DomainParticipantImpl::set_default_publisher_qos(
01247   const DDS::PublisherQos & qos)
01248 {
01249   if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
01250     default_publisher_qos_ = qos;
01251     return DDS::RETCODE_OK;
01252 
01253   } else {
01254     return DDS::RETCODE_INCONSISTENT_POLICY;
01255   }
01256 }
01257 
01258 DDS::ReturnCode_t
01259 DomainParticipantImpl::get_default_publisher_qos(
01260   DDS::PublisherQos & qos)
01261 {
01262   qos = default_publisher_qos_;
01263   return DDS::RETCODE_OK;
01264 }
01265 
01266 DDS::ReturnCode_t
01267 DomainParticipantImpl::set_default_subscriber_qos(
01268   const DDS::SubscriberQos & qos)
01269 {
01270   if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
01271     default_subscriber_qos_ = qos;
01272     return DDS::RETCODE_OK;
01273 
01274   } else {
01275     return DDS::RETCODE_INCONSISTENT_POLICY;
01276   }
01277 }
01278 
01279 DDS::ReturnCode_t
01280 DomainParticipantImpl::get_default_subscriber_qos(
01281   DDS::SubscriberQos & qos)
01282 {
01283   qos = default_subscriber_qos_;
01284   return DDS::RETCODE_OK;
01285 }
01286 
01287 DDS::ReturnCode_t
01288 DomainParticipantImpl::set_default_topic_qos(
01289   const DDS::TopicQos & qos)
01290 {
01291   if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
01292     default_topic_qos_ = qos;
01293     return DDS::RETCODE_OK;
01294 
01295   } else {
01296     return DDS::RETCODE_INCONSISTENT_POLICY;
01297   }
01298 }
01299 
01300 DDS::ReturnCode_t
01301 DomainParticipantImpl::get_default_topic_qos(
01302   DDS::TopicQos & qos)
01303 {
01304   qos = default_topic_qos_;
01305   return DDS::RETCODE_OK;
01306 }
01307 
01308 DDS::ReturnCode_t
01309 DomainParticipantImpl::get_current_time(
01310   DDS::Time_t & current_time)
01311 {
01312   current_time
01313   = OpenDDS::DCPS::time_value_to_time(
01314       ACE_OS::gettimeofday());
01315   return DDS::RETCODE_OK;
01316 }
01317 
01318 #if !defined (DDS_HAS_MINIMUM_BIT)
01319 
01320 DDS::ReturnCode_t
01321 DomainParticipantImpl::get_discovered_participants(
01322   DDS::InstanceHandleSeq & participant_handles)
01323 {
01324   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01325                    guard,
01326                    this->handle_protector_,
01327                    DDS::RETCODE_ERROR);
01328 
01329   HandleMap::const_iterator itEnd = this->handles_.end();
01330 
01331   for (HandleMap::const_iterator iter = this->handles_.begin();
01332        iter != itEnd; ++iter) {
01333     GuidConverter converter(iter->first);
01334 
01335     if (converter.entityKind() == KIND_PARTICIPANT)
01336     {
01337       // skip itself and the ignored participant
01338       if (iter->first == this->dp_id_
01339       || (this->ignored_participants_.find(iter->first)
01340         != this->ignored_participants_.end ())) {
01341         continue;
01342       }
01343 
01344       push_back(participant_handles, iter->second);
01345     }
01346   }
01347 
01348   return DDS::RETCODE_OK;
01349 }
01350 
01351 DDS::ReturnCode_t
01352 DomainParticipantImpl::get_discovered_participant_data(
01353   DDS::ParticipantBuiltinTopicData & participant_data,
01354   DDS::InstanceHandle_t participant_handle)
01355 {
01356   {
01357     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01358                      guard,
01359                      this->handle_protector_,
01360                      DDS::RETCODE_ERROR);
01361 
01362     bool found = false;
01363     HandleMap::const_iterator itEnd = this->handles_.end();
01364 
01365     for (HandleMap::const_iterator iter = this->handles_.begin();
01366          iter != itEnd; ++iter) {
01367       GuidConverter converter(iter->first);
01368 
01369       if (participant_handle == iter->second
01370           && converter.entityKind() == KIND_PARTICIPANT) {
01371         found = true;
01372         break;
01373       }
01374     }
01375 
01376     if (!found)
01377       return DDS::RETCODE_PRECONDITION_NOT_MET;
01378   }
01379 
01380   DDS::SampleInfoSeq info;
01381   DDS::ParticipantBuiltinTopicDataSeq data;
01382   DDS::DataReader_var dr =
01383     this->bit_subscriber_->lookup_datareader(BUILT_IN_PARTICIPANT_TOPIC);
01384   DDS::ParticipantBuiltinTopicDataDataReader_var bit_part_dr =
01385     DDS::ParticipantBuiltinTopicDataDataReader::_narrow(dr);
01386   DDS::ReturnCode_t ret = bit_part_dr->read_instance(data,
01387                                                      info,
01388                                                      1,
01389                                                      participant_handle,
01390                                                      DDS::ANY_SAMPLE_STATE,
01391                                                      DDS::ANY_VIEW_STATE,
01392                                                      DDS::ANY_INSTANCE_STATE);
01393 
01394   if (ret == DDS::RETCODE_OK) {
01395     if (info[0].valid_data)
01396       participant_data = data[0];
01397 
01398     else
01399       return DDS::RETCODE_NO_DATA;
01400   }
01401 
01402   return ret;
01403 }
01404 
01405 DDS::ReturnCode_t
01406 DomainParticipantImpl::get_discovered_topics(
01407   DDS::InstanceHandleSeq & topic_handles)
01408 {
01409   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01410                    guard,
01411                    this->handle_protector_,
01412                    DDS::RETCODE_ERROR);
01413 
01414   HandleMap::const_iterator itEnd = this->handles_.end();
01415 
01416   for (HandleMap::const_iterator iter = this->handles_.begin();
01417        iter != itEnd; ++iter) {
01418     GuidConverter converter(iter->first);
01419 
01420     if (converter.entityKind() == KIND_TOPIC) {
01421 
01422       // skip the ignored topic
01423       if (this->ignored_topics_.find(iter->first)
01424           != this->ignored_topics_.end ()) {
01425         continue;
01426       }
01427 
01428       push_back(topic_handles, iter->second);
01429     }
01430   }
01431 
01432   return DDS::RETCODE_OK;
01433 }
01434 
01435 DDS::ReturnCode_t
01436 DomainParticipantImpl::get_discovered_topic_data(
01437   DDS::TopicBuiltinTopicData & topic_data,
01438   DDS::InstanceHandle_t topic_handle)
01439 {
01440   {
01441     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01442                      guard,
01443                      this->handle_protector_,
01444                      DDS::RETCODE_ERROR);
01445 
01446     bool found = false;
01447     HandleMap::const_iterator itEnd = this->handles_.end();
01448 
01449     for (HandleMap::const_iterator iter = this->handles_.begin();
01450          iter != itEnd; ++iter) {
01451       GuidConverter converter(iter->first);
01452 
01453       if (topic_handle == iter->second
01454           && converter.entityKind() == KIND_TOPIC) {
01455         found = true;
01456         break;
01457       }
01458     }
01459 
01460     if (!found)
01461       return DDS::RETCODE_PRECONDITION_NOT_MET;
01462   }
01463 
01464   DDS::DataReader_var dr =
01465     bit_subscriber_->lookup_datareader(BUILT_IN_TOPIC_TOPIC);
01466   DDS::TopicBuiltinTopicDataDataReader_var bit_topic_dr =
01467     DDS::TopicBuiltinTopicDataDataReader::_narrow(dr);
01468 
01469   DDS::SampleInfoSeq info;
01470   DDS::TopicBuiltinTopicDataSeq data;
01471   DDS::ReturnCode_t ret =
01472     bit_topic_dr->read_instance(data,
01473                                 info,
01474                                 1,
01475                                 topic_handle,
01476                                 DDS::ANY_SAMPLE_STATE,
01477                                 DDS::ANY_VIEW_STATE,
01478                                 DDS::ANY_INSTANCE_STATE);
01479 
01480   if (ret == DDS::RETCODE_OK) {
01481     if (info[0].valid_data)
01482       topic_data = data[0];
01483 
01484     else
01485       return DDS::RETCODE_NO_DATA;
01486   }
01487 
01488   return ret;
01489 }
01490 
01491 #endif
01492 
01493 DDS::ReturnCode_t
01494 DomainParticipantImpl::enable()
01495 {
01496   //According spec:
01497   // - Calling enable on an already enabled Entity returns OK and has no
01498   // effect.
01499   // - Calling enable on an Entity whose factory is not enabled will fail
01500   // and return PRECONDITION_NOT_MET.
01501 
01502   if (this->is_enabled()) {
01503     return DDS::RETCODE_OK;
01504   }
01505 
01506   DDS::DomainParticipantFactoryQos qos;
01507 
01508   if (this->factory_->get_qos(qos) != DDS::RETCODE_OK) {
01509     ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t)DomainParticipantImpl::enable failed to")
01510                ACE_TEXT(" get factory qos\n")));
01511     return DDS::RETCODE_ERROR;
01512   }
01513 
01514   if (qos.entity_factory.autoenable_created_entities == 0) {
01515     return DDS::RETCODE_PRECONDITION_NOT_MET;
01516   }
01517 
01518   DDS::ReturnCode_t ret = this->set_enabled();
01519 
01520   if (monitor_) {
01521     monitor_->report();
01522   }
01523   if (TheServiceParticipant->monitor_) {
01524     TheServiceParticipant->monitor_->report();
01525   }
01526 
01527   if (ret == DDS::RETCODE_OK && !TheTransientKludge->is_enabled()) {
01528     Discovery_rch disc = TheServiceParticipant->get_discovery(this->domain_id_);
01529     this->bit_subscriber_ = disc->init_bit(this);
01530   }
01531 
01532   return ret;
01533 }
01534 
01535 RepoId
01536 DomainParticipantImpl::get_id()
01537 {
01538   return dp_id_;
01539 }
01540 
01541 OPENDDS_STRING
01542 DomainParticipantImpl::get_unique_id()
01543 {
01544   return GuidConverter(dp_id_).uniqueId();
01545 }
01546 
01547 
01548 DDS::InstanceHandle_t
01549 DomainParticipantImpl::get_instance_handle()
01550 {
01551   return this->id_to_handle(this->dp_id_);
01552 }
01553 
01554 DDS::InstanceHandle_t
01555 DomainParticipantImpl::id_to_handle(const RepoId& id)
01556 {
01557   if (id == GUID_UNKNOWN) {
01558     return this->participant_handles_.next();
01559   }
01560 
01561   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01562                    guard,
01563                    this->handle_protector_,
01564                    HANDLE_UNKNOWN);
01565 
01566   HandleMap::const_iterator location = this->handles_.find(id);
01567   DDS::InstanceHandle_t result;
01568 
01569   if (location == this->handles_.end()) {
01570     // Map new handle in both directions
01571     result = this->participant_handles_.next();
01572     this->handles_[id] = result;
01573     this->repoIds_[result] = id;
01574   } else {
01575     result = location->second;
01576   }
01577 
01578   return result;
01579 }
01580 
01581 RepoId
01582 DomainParticipantImpl::get_repoid(const DDS::InstanceHandle_t& handle)
01583 {
01584   RepoId result = GUID_UNKNOWN;
01585   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01586                    guard,
01587                    this->handle_protector_,
01588                    GUID_UNKNOWN);
01589   RepoIdMap::const_iterator location = this->repoIds_.find(handle);
01590   if (location != this->repoIds_.end()) {
01591     result = location->second;
01592   }
01593   return result;
01594 }
01595 
01596 DDS::Topic_ptr
01597 DomainParticipantImpl::create_new_topic(
01598   const RepoId topic_id,
01599   const char * topic_name,
01600   const char * type_name,
01601   const DDS::TopicQos & qos,
01602   DDS::TopicListener_ptr a_listener,
01603   const DDS::StatusMask & mask,
01604   OpenDDS::DCPS::TypeSupport_ptr type_support)
01605 {
01606   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01607                    tao_mon,
01608                    this->topics_protector_,
01609                    DDS::Topic::_nil());
01610 
01611   /*
01612   TopicMap::mapped_type* entry = 0;
01613 
01614   if (Util::find(topics_, topic_name, entry) == 0) {
01615     entry->client_refs_ ++;
01616     return DDS::Topic::_duplicate(entry->pair_.obj_.in());
01617   }
01618   */
01619 
01620   TopicImpl* topic_servant = 0;
01621 
01622   ACE_NEW_RETURN(topic_servant,
01623                  TopicImpl(topic_id,
01624                            topic_name,
01625                            type_name,
01626                            type_support,
01627                            qos,
01628                            a_listener,
01629                            mask,
01630                            this),
01631                  DDS::Topic::_nil());
01632 
01633   if ((enabled_ == true)
01634       && (qos_.entity_factory.autoenable_created_entities == 1)) {
01635     topic_servant->enable();
01636   }
01637 
01638   DDS::Topic_ptr obj(topic_servant);
01639 
01640   // this object will also act as a guard against leaking the new TopicImpl
01641   RefCounted_Topic refCounted_topic(Topic_Pair(topic_servant, obj, NO_DUP));
01642 
01643   if (OpenDDS::DCPS::bind(topics_, topic_name, refCounted_topic) == -1) {
01644     ACE_ERROR((LM_ERROR,
01645                ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::create_topic, ")
01646                ACE_TEXT("%p \n"),
01647                ACE_TEXT("bind")));
01648     return DDS::Topic::_nil();
01649   }
01650 
01651   if (this->monitor_) {
01652     this->monitor_->report();
01653   }
01654 
01655   // the topics_ map has one reference and we duplicate to give
01656   // the caller another reference.
01657   return DDS::Topic::_duplicate(refCounted_topic.pair_.obj_.in());
01658 }
01659 
01660 int
01661 DomainParticipantImpl::is_clean() const
01662 {
01663   int sub_is_clean = subscribers_.empty();
01664   int topics_is_clean = topics_.size() == 0;
01665 
01666   if (!TheTransientKludge->is_enabled()) {
01667     // There are four topics and builtin topic subscribers
01668     // left.
01669 
01670     sub_is_clean = sub_is_clean == 0 ? subscribers_.size() == 1 : 1;
01671     topics_is_clean = topics_is_clean == 0 ? topics_.size() == 4 : 1;
01672   }
01673   return (publishers_.empty()
01674           && sub_is_clean == 1
01675           && topics_is_clean == 1);
01676 }
01677 
01678 void
01679 DomainParticipantImpl::set_object_reference(const DDS::DomainParticipant_ptr& dp)
01680 {
01681   if (!CORBA::is_nil(participant_objref_.in())) {
01682     ACE_ERROR((LM_ERROR,
01683                ACE_TEXT("(%P|%t) ERROR: DomainParticipantImpl::set_object_reference, ")
01684                ACE_TEXT("This participant is already activated. \n")));
01685     return;
01686   }
01687 
01688   participant_objref_ = DDS::DomainParticipant::_duplicate(dp);
01689 }
01690 
01691 DDS::DomainParticipantListener_ptr
01692 DomainParticipantImpl::listener_for(DDS::StatusKind kind)
01693 {
01694   if (CORBA::is_nil(listener_.in()) || (listener_mask_ & kind) == 0) {
01695     return DDS::DomainParticipantListener::_nil ();
01696   } else {
01697     return DDS::DomainParticipantListener::_duplicate(listener_.in());
01698   }
01699 }
01700 
01701 void
01702 DomainParticipantImpl::get_topic_ids(TopicIdVec& topics)
01703 {
01704   ACE_GUARD(ACE_Recursive_Thread_Mutex,
01705             guard,
01706             this->topics_protector_);
01707 
01708   topics.reserve(topics_.size());
01709   for (TopicMap::iterator it(topics_.begin());
01710        it != topics_.end(); ++it) {
01711     topics.push_back(it->second.pair_.svt_->get_id());
01712   }
01713 }
01714 
01715 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
01716 
01717 OwnershipManager*
01718 DomainParticipantImpl::ownership_manager()
01719 {
01720 #if !defined (DDS_HAS_MINIMUM_BIT)
01721 
01722   DDS::DataReader_var dr =
01723     bit_subscriber_->lookup_datareader(BUILT_IN_PUBLICATION_TOPIC);
01724   DDS::PublicationBuiltinTopicDataDataReader_var bit_pub_dr =
01725     DDS::PublicationBuiltinTopicDataDataReader::_narrow(dr);
01726 
01727   if (!CORBA::is_nil(bit_pub_dr.in())) {
01728     DDS::DataReaderListener_var listener = bit_pub_dr->get_listener();
01729     if (CORBA::is_nil(listener.in())) {
01730       DDS::DataReaderListener_var bit_pub_listener =
01731         new BitPubListenerImpl(this);
01732       bit_pub_dr->set_listener(bit_pub_listener, DDS::DATA_AVAILABLE_STATUS);
01733       // Must call on_data_available when attaching a listener late - samples may be waiting
01734       bit_pub_listener->on_data_available(bit_pub_dr.in());
01735     }
01736   }
01737 
01738 #endif
01739   return &this->owner_man_;
01740 }
01741 
01742 void
01743 DomainParticipantImpl::update_ownership_strength (const PublicationId& pub_id,
01744                                                   const CORBA::Long& ownership_strength)
01745 {
01746   ACE_GUARD(ACE_Recursive_Thread_Mutex,
01747             tao_mon,
01748             this->subscribers_protector_);
01749 
01750   if (this->get_deleted ())
01751     return;
01752 
01753   for (SubscriberSet::iterator it(this->subscribers_.begin());
01754       it != this->subscribers_.end(); ++it) {
01755     it->svt_->update_ownership_strength(pub_id, ownership_strength);
01756   }
01757 }
01758 
01759 #endif // OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
01760 
01761 DomainParticipantImpl::RepoIdSequence::RepoIdSequence(RepoId& base) :
01762   base_(base),
01763   serial_(0),
01764   builder_(base_)
01765 {
01766 }
01767 
01768 RepoId
01769 DomainParticipantImpl::RepoIdSequence::next()
01770 {
01771   builder_.entityKey(++serial_);
01772   return builder_;
01773 }
01774 
01775 
01776 ////////////////////////////////////////////////////////////////
01777 
01778 
01779 bool
01780 DomainParticipantImpl::validate_publisher_qos(DDS::PublisherQos & pub_qos)
01781 {
01782   if (pub_qos == PUBLISHER_QOS_DEFAULT) {
01783     this->get_default_publisher_qos(pub_qos);
01784   }
01785 
01786   OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK(pub_qos, false);
01787 
01788   if (!Qos_Helper::valid(pub_qos) || !Qos_Helper::consistent(pub_qos)) {
01789     ACE_ERROR((LM_ERROR,
01790                ACE_TEXT("(%P|%t) ERROR: ")
01791                ACE_TEXT("DomainParticipantImpl::validate_publisher_qos, ")
01792                ACE_TEXT("invalid qos.\n")));
01793     return false;
01794   }
01795 
01796   return true;
01797 }
01798 
01799 bool
01800 DomainParticipantImpl::validate_subscriber_qos(DDS::SubscriberQos & subscriber_qos)
01801 {
01802   if (subscriber_qos == SUBSCRIBER_QOS_DEFAULT) {
01803     this->get_default_subscriber_qos(subscriber_qos);
01804   }
01805 
01806   OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK(subscriber_qos, false);
01807 
01808   if (!Qos_Helper::valid(subscriber_qos) || !Qos_Helper::consistent(subscriber_qos)) {
01809     ACE_ERROR((LM_ERROR,
01810                ACE_TEXT("(%P|%t) ERROR: ")
01811                ACE_TEXT("DomainParticipantImpl::validate_subscriber_qos, ")
01812                ACE_TEXT("invalid qos.\n")));
01813     return false;
01814   }
01815 
01816 
01817   return true;
01818 }
01819 
01820 Recorder_ptr
01821 DomainParticipantImpl::create_recorder(DDS::Topic_ptr a_topic,
01822                                        const DDS::SubscriberQos& subscriber_qos,
01823                                        const DDS::DataReaderQos& datareader_qos,
01824                                        const RecorderListener_rch& a_listener,
01825                                        DDS::StatusMask mask)
01826 {
01827   if (CORBA::is_nil(a_topic)) {
01828     ACE_ERROR((LM_ERROR,
01829                ACE_TEXT("(%P|%t) ERROR: ")
01830                ACE_TEXT("SubscriberImpl::create_datareader, ")
01831                ACE_TEXT("topic desc is nil.\n")));
01832     return 0;
01833   }
01834 
01835   DDS::SubscriberQos sub_qos = subscriber_qos;
01836   DDS::DataReaderQos dr_qos;
01837 
01838   if (! this->validate_subscriber_qos(sub_qos) ||
01839       ! SubscriberImpl::validate_datareader_qos(datareader_qos,
01840                                                 TheServiceParticipant->initial_DataReaderQos(),
01841                                                 a_topic,
01842                                                 dr_qos, false) ) {
01843     return 0;
01844   }
01845 
01846   RecorderImpl* recorder(new RecorderImpl);
01847   Recorder_var result(recorder);
01848 
01849   recorder->init(dynamic_cast<TopicDescriptionImpl*>(a_topic),
01850     dr_qos, a_listener,
01851     mask, this, subscriber_qos);
01852 
01853   if ((enabled_ == true) && (qos_.entity_factory.autoenable_created_entities == 1)) {
01854     recorder->enable();
01855   }
01856 
01857   ACE_Guard<ACE_Recursive_Thread_Mutex> guard(recorders_protector_);
01858   recorders_.insert(result);
01859 
01860   return result._retn();
01861 }
01862 
01863 Replayer_ptr
01864 DomainParticipantImpl::create_replayer(DDS::Topic_ptr a_topic,
01865                                        const DDS::PublisherQos& publisher_qos,
01866                                        const DDS::DataWriterQos& datawriter_qos,
01867                                        const ReplayerListener_rch& a_listener,
01868                                        DDS::StatusMask mask)
01869 {
01870   if (CORBA::is_nil(a_topic)) {
01871     ACE_ERROR((LM_ERROR,
01872                ACE_TEXT("(%P|%t) ERROR: ")
01873                ACE_TEXT("SubscriberImpl::create_datareader, ")
01874                ACE_TEXT("topic desc is nil.\n")));
01875     return 0;
01876   }
01877 
01878   DDS::PublisherQos pub_qos = publisher_qos;
01879   DDS::DataWriterQos dw_qos;
01880 
01881   if (! this->validate_publisher_qos(pub_qos) ||
01882       ! PublisherImpl::validate_datawriter_qos(datawriter_qos,
01883                                                TheServiceParticipant->initial_DataWriterQos(),
01884                                                a_topic,
01885                                                dw_qos)) {
01886     return 0;
01887   }
01888 
01889   TopicImpl* topic_servant = dynamic_cast<TopicImpl*>(a_topic);
01890 
01891   ReplayerImpl* replayer(new ReplayerImpl);
01892   Replayer_var result(replayer);
01893 
01894   replayer->init(a_topic, topic_servant, dw_qos, a_listener, mask, this, pub_qos);
01895 
01896   if (this->enabled_ == true
01897       && qos_.entity_factory.autoenable_created_entities == 1) {
01898 
01899     DDS::ReturnCode_t ret = replayer->enable();
01900 
01901     if (ret != DDS::RETCODE_OK) {
01902       ACE_ERROR((LM_ERROR,
01903                  ACE_TEXT("(%P|%t) ERROR: ")
01904                  ACE_TEXT("DomainParticipantImpl::create_replayer, ")
01905                  ACE_TEXT("enable failed.\n")));
01906       return 0;
01907     }
01908   }
01909 
01910   ACE_Guard<ACE_Recursive_Thread_Mutex> guard(replayers_protector_);
01911   replayers_.insert(result);
01912   return result._retn();
01913 }
01914 
01915 void
01916 DomainParticipantImpl::delete_recorder(Recorder_ptr recorder)
01917 {
01918   const Recorder_var recvar(Recorder::_duplicate(recorder));
01919   ACE_Guard<ACE_Recursive_Thread_Mutex> guard(recorders_protector_);
01920   recorders_.erase(recvar);
01921 }
01922 
01923 void
01924 DomainParticipantImpl::delete_replayer(Replayer_ptr replayer)
01925 {
01926   const Replayer_var repvar(Replayer::_duplicate(replayer));
01927   ACE_Guard<ACE_Recursive_Thread_Mutex> guard(replayers_protector_);
01928   replayers_.erase(repvar);
01929 }
01930 
01931 void
01932 DomainParticipantImpl::add_adjust_liveliness_timers(DataWriterImpl* writer)
01933 {
01934   automatic_liveliness_timer_.add_adjust(writer);
01935   participant_liveliness_timer_.add_adjust(writer);
01936 }
01937 
01938 void
01939 DomainParticipantImpl::remove_adjust_liveliness_timers()
01940 {
01941   automatic_liveliness_timer_.remove_adjust();
01942   participant_liveliness_timer_.remove_adjust();
01943 }
01944 
01945 DomainParticipantImpl::LivelinessTimer::LivelinessTimer(DomainParticipantImpl& impl,
01946                                                         DDS::LivelinessQosPolicyKind kind)
01947   : impl_(impl)
01948   , kind_ (kind)
01949   , interval_ (ACE_Time_Value::max_time)
01950   , recalculate_interval_ (false)
01951   , scheduled_ (false)
01952 { }
01953 
01954 DomainParticipantImpl::LivelinessTimer::~LivelinessTimer()
01955 {
01956   if (scheduled_) {
01957     TheServiceParticipant->timer()->cancel_timer(this);
01958   }
01959 }
01960 
01961 void
01962 DomainParticipantImpl::LivelinessTimer::add_adjust(OpenDDS::DCPS::DataWriterImpl* writer)
01963 {
01964   ACE_GUARD(ACE_Thread_Mutex,
01965             guard,
01966             this->lock_);
01967 
01968   const ACE_Time_Value now = ACE_OS::gettimeofday();
01969 
01970   // Calculate the time remaining to liveliness check.
01971   const ACE_Time_Value remaining = interval_ - (now - last_liveliness_check_);
01972 
01973   // Adopt a smaller interval.
01974   const ACE_Time_Value i = writer->liveliness_check_interval(kind_);
01975   if (i < interval_) {
01976     interval_ = i;
01977   }
01978 
01979   // Reschedule or schedule a timer if necessary.
01980   if (scheduled_ && interval_ < remaining) {
01981     TheServiceParticipant->timer()->cancel_timer(this);
01982     TheServiceParticipant->timer()->schedule_timer(this, 0, interval_);
01983   } else if (!scheduled_) {
01984     TheServiceParticipant->timer()->schedule_timer(this, 0, interval_);
01985     scheduled_ = true;
01986     last_liveliness_check_ = now;
01987   }
01988 }
01989 
01990 void
01991 DomainParticipantImpl::LivelinessTimer::remove_adjust()
01992 {
01993   ACE_GUARD(ACE_Thread_Mutex,
01994             guard,
01995             this->lock_);
01996 
01997   recalculate_interval_ = true;
01998 }
01999 
02000 int
02001 DomainParticipantImpl::LivelinessTimer::handle_timeout(const ACE_Time_Value & tv, const void* /* arg */)
02002 {
02003   ACE_GUARD_RETURN(ACE_Thread_Mutex,
02004                    guard,
02005                    this->lock_,
02006                    0);
02007 
02008   scheduled_ = false;
02009 
02010   if (recalculate_interval_) {
02011     interval_ = impl_.liveliness_check_interval(kind_);
02012     recalculate_interval_ = false;
02013   }
02014 
02015   if (interval_ != ACE_Time_Value::max_time) {
02016     dispatch(tv);
02017     last_liveliness_check_ = tv;
02018     TheServiceParticipant->timer()->schedule_timer(this, 0, interval_);
02019     scheduled_ = true;
02020   }
02021 
02022   return 0;
02023 }
02024 
02025 DomainParticipantImpl::AutomaticLivelinessTimer::AutomaticLivelinessTimer(DomainParticipantImpl& impl)
02026   : LivelinessTimer (impl, DDS::AUTOMATIC_LIVELINESS_QOS)
02027 { }
02028 
02029 void
02030 DomainParticipantImpl::AutomaticLivelinessTimer::dispatch(const ACE_Time_Value& /* tv */)
02031 {
02032   impl_.signal_liveliness (kind_);
02033 }
02034 
02035 DomainParticipantImpl::ParticipantLivelinessTimer::ParticipantLivelinessTimer(DomainParticipantImpl& impl)
02036   : LivelinessTimer (impl, DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS)
02037 { }
02038 
02039 void
02040 DomainParticipantImpl::ParticipantLivelinessTimer::dispatch(const ACE_Time_Value& tv)
02041 {
02042   if (impl_.participant_liveliness_activity_after (tv - interval())) {
02043     impl_.signal_liveliness (kind_);
02044   }
02045 }
02046 
02047 ACE_Time_Value
02048 DomainParticipantImpl::liveliness_check_interval(DDS::LivelinessQosPolicyKind kind)
02049 {
02050   ACE_Time_Value tv = ACE_Time_Value::max_time;
02051 
02052   ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
02053                     tao_mon,
02054                     this->publishers_protector_,
02055                     tv);
02056 
02057   for (PublisherSet::iterator it(publishers_.begin());
02058        it != publishers_.end(); ++it) {
02059     tv = std::min (tv, it->svt_->liveliness_check_interval(kind));
02060   }
02061 
02062   return tv;
02063 }
02064 
02065 bool
02066 DomainParticipantImpl::participant_liveliness_activity_after(const ACE_Time_Value& tv)
02067 {
02068   if (last_liveliness_activity_ > tv) {
02069     return true;
02070   }
02071 
02072   ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
02073                     tao_mon,
02074                     this->publishers_protector_,
02075                     tv);
02076 
02077   for (PublisherSet::iterator it(publishers_.begin());
02078        it != publishers_.end(); ++it) {
02079     if (it->svt_->participant_liveliness_activity_after(tv)) {
02080       return true;
02081     }
02082   }
02083 
02084   return false;
02085 }
02086 
02087 void
02088 DomainParticipantImpl::signal_liveliness (DDS::LivelinessQosPolicyKind kind)
02089 {
02090   TheServiceParticipant->get_discovery(domain_id_)->signal_liveliness (domain_id_, get_id(), kind);
02091 }
02092 
02093 int
02094 DomainParticipantImpl::handle_exception(ACE_HANDLE /*fd*/)
02095 {
02096   // delete publishers
02097   {
02098     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
02099                      tao_mon,
02100                      this->publishers_protector_,
02101                      DDS::RETCODE_ERROR);
02102 
02103     PublisherSet::iterator pubIter = publishers_.begin();
02104     DDS::Publisher_ptr pubPtr;
02105     size_t pubsize = publishers_.size();
02106 
02107     while (pubsize > 0) {
02108       pubPtr = (*pubIter).obj_.in();
02109       ++pubIter;
02110 
02111       DDS::ReturnCode_t result
02112       = pubPtr->delete_contained_entities();
02113 
02114       if (result != DDS::RETCODE_OK) {
02115         return result;
02116       }
02117 
02118       result = delete_publisher(pubPtr);
02119 
02120       if (result != DDS::RETCODE_OK) {
02121         return result;
02122       }
02123 
02124       pubsize--;
02125     }
02126 
02127   }
02128 
02129   // delete subscribers
02130   {
02131     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
02132                      tao_mon,
02133                      this->subscribers_protector_,
02134                      DDS::RETCODE_ERROR);
02135 
02136     SubscriberSet::iterator subIter = subscribers_.begin();
02137     DDS::Subscriber_ptr subPtr;
02138     size_t subsize = subscribers_.size();
02139 
02140     while (subsize > 0) {
02141       subPtr = (*subIter).obj_.in();
02142       ++subIter;
02143 
02144       DDS::ReturnCode_t result = subPtr->delete_contained_entities();
02145 
02146       if (result != DDS::RETCODE_OK) {
02147         return result;
02148       }
02149 
02150       result = delete_subscriber(subPtr);
02151 
02152       if (result != DDS::RETCODE_OK) {
02153         return result;
02154       }
02155 
02156       subsize--;
02157     }
02158   }
02159 
02160   DDS::ReturnCode_t ret = DDS::RETCODE_OK;
02161   // delete topics
02162   {
02163     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
02164                      tao_mon,
02165                      this->topics_protector_,
02166                      DDS::RETCODE_ERROR);
02167 
02168     TopicMap::iterator topicIter = topics_.begin();
02169     DDS::Topic_ptr topicPtr;
02170     size_t topicsize = topics_.size();
02171 
02172     while (topicsize > 0) {
02173       topicPtr = topicIter->second.pair_.obj_.in();
02174       ++topicIter;
02175 
02176       // Delete the topic the reference count.
02177       DDS::ReturnCode_t result = this->delete_topic_i(topicPtr, true);
02178 
02179       if (result != DDS::RETCODE_OK) {
02180         return result;
02181       }
02182       topicsize--;
02183     }
02184   }
02185 
02186   {
02187     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
02188                      tao_mon,
02189                      this->recorders_protector_,
02190                      DDS::RETCODE_ERROR);
02191 
02192     RecorderSet::iterator it = recorders_.begin();
02193     for (; it != recorders_.end(); ++it ){
02194       RecorderImpl* impl = static_cast<RecorderImpl* >(it->in());
02195       DDS::ReturnCode_t result = DDS::RETCODE_ERROR;
02196       if (impl) result = impl->cleanup();
02197       if (result != DDS::RETCODE_OK) ret = result;
02198     }
02199     recorders_.clear();
02200   }
02201 
02202   {
02203     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
02204                      tao_mon,
02205                      this->replayers_protector_,
02206                      DDS::RETCODE_ERROR);
02207 
02208     ReplayerSet::iterator it = replayers_.begin();
02209     for (; it != replayers_.end(); ++it ){
02210       ReplayerImpl* impl = static_cast<ReplayerImpl* >(it->in());
02211       DDS::ReturnCode_t result = DDS::RETCODE_ERROR;
02212       if (impl) result = impl->cleanup();
02213       if (result != DDS::RETCODE_OK) ret = result;
02214 
02215     }
02216 
02217     replayers_.clear();
02218   }
02219 
02220   shutdown_mutex_.acquire();
02221   shutdown_result_ = ret;
02222   shutdown_complete_ = true;
02223   shutdown_condition_.signal();
02224   shutdown_mutex_.release();
02225 
02226   return 0;
02227 }
02228 
02229 } // namespace DCPS
02230 } // namespace OpenDDS

Generated on Fri Feb 12 20:05:22 2016 for OpenDDS by  doxygen 1.4.7