10 #include "DataWriterRemoteC.h" 24 #include "tao/BiDir_GIOP/BiDirGIOP.h" 27 #if !defined (DDS_HAS_MINIMUM_BIT) 36 #include "dds/DdsDcpsCoreTypeSupportImpl.h" 40 const char ROOT_POA[] =
"RootPOA";
41 const char BIDIR_POA[] =
"BiDirPOA";
43 struct DestroyPolicy {
47 ~DestroyPolicy() { p_->destroy(); }
61 return root_poa->find_POA(BIDIR_POA,
false );
68 policy <<= BiDirPolicy::BOTH;
70 orb->
create_policy(BiDirPolicy::BIDIRECTIONAL_POLICY_TYPE, policy);
71 DestroyPolicy destroy(policies[0]);
72 PortableServer::POAManager_var manager = root_poa->the_POAManager();
74 return root_poa->create_POA(BIDIR_POA, manager, policies);
81 return root_poa.
_retn();
88 template <
class T_impl,
class T_ptr>
98 dynamic_cast<T_impl*
>(poa->reference_to_servant(p));
111 typename T::_stub_ptr_type servant_to_remote_reference(T* servant,
CORBA::ORB_ptr orb)
117 typename T::_stub_ptr_type the_obj = T::_stub_type::_narrow(obj.
in());
126 poa->reference_to_id(obj);
127 poa->deactivate_object(oid.in());
138 const std::string& ior)
141 bit_transport_port_(0),
142 use_local_bit_config_(false),
143 orb_from_user_(false)
148 const DCPSInfo_var& info)
166 ACE_TEXT(
"ERROR: InfoRepoDiscovery::~InfoRepoDiscovery - ")
167 ACE_TEXT(
"Exception caught during ORB shutdown: %C.\n"),
196 }
catch (CORBA::INV_OBJREF&) {
199 std::string second_try(
"corbaloc:iiop:");
201 second_try +=
"/DCPSInfoRepo";
206 return DCPSInfo::_narrow(o.
in());
221 int argc = argv->
argc();
230 PortableServer::POAManager_var poa_manager = poa->the_POAManager();
231 poa_manager->activate();
243 ACE_TEXT(
"(%P|%t) ERROR: InfoRepoDiscovery::get_dcps_info: ")
244 ACE_TEXT(
"unable to narrow DCPSInfo (%C) for key %C.\n"),
246 this->
key().c_str()));
247 return DCPSInfo::_nil();
252 return DCPSInfo::_nil();
269 #if !defined (DDS_HAS_MINIMUM_BIT) 273 std::string(
"_BITTransportConfig_") +
key();
277 std::string(
"_BITTCPTransportInst_") +
key();
290 tcp_inst->datalink_release_delay_ = 0;
300 ACE_TEXT(
" - BIT tcp transport %C\n"), tcp_inst->local_address_string().c_str()));
312 #if defined (DDS_HAS_MINIMUM_BIT) 313 ACE_UNUSED_ARG(participant);
324 DDS::Subscriber_var bit_subscriber =
326 DDS::SubscriberListener::_nil(),
334 "exception during transport initialization\n"));
341 bit_subscriber->get_default_datareader_qos(participantReaderQos);
350 DDS::TopicDescription_var bit_part_topic =
353 DDS::DataReader_var dr =
354 bit_subscriber->create_datareader(bit_part_topic,
355 participantReaderQos,
356 DDS::DataReaderListener::_nil(),
360 DDS::ParticipantBuiltinTopicDataDataReader_var pbit_dr =
361 DDS::ParticipantBuiltinTopicDataDataReader::_narrow(dr.in());
369 bit_subscriber->get_default_datareader_qos(dr_qos);
372 DDS::TopicDescription_var bit_topic_topic =
375 dr = bit_subscriber->create_datareader(bit_topic_topic,
377 DDS::DataReaderListener::_nil(),
380 DDS::TopicDescription_var bit_pub_topic =
383 dr = bit_subscriber->create_datareader(bit_pub_topic,
385 DDS::DataReaderListener::_nil(),
388 DDS::TopicDescription_var bit_sub_topic =
391 dr = bit_subscriber->create_datareader(bit_sub_topic,
393 DDS::DataReaderListener::_nil(),
407 "exception during DataReader initialization\n"));
410 return make_rch<BitSubscriber>(bit_subscriber);
436 const GUID_t& participantId)
439 return get_dcps_info()->attach_participant(domainId, participantId);
460 return info->add_domain_participant(domainId, qos);
469 #if defined(OPENDDS_SECURITY) 487 const GUID_t& participantId)
490 get_dcps_info()->remove_domain_participant(domainId, participantId);
500 const GUID_t& myParticipantId,
504 get_dcps_info()->ignore_domain_participant(domainId, myParticipantId, ignoreId);
514 const GUID_t& participant,
518 return get_dcps_info()->update_domain_participant_qos(domainId, participant, qos);
529 const GUID_t& participantId,
const char* topicName,
534 return get_dcps_info()->assert_topic(topicId, domainId, participantId, topicName,
535 dataTypeName, qos, hasDcpsKey);
545 const char* topicName,
547 DDS::TopicQos_out qos,
548 DCPS::GUID_t_out topicId)
551 return get_dcps_info()->find_topic(domainId, topicName, dataTypeName, qos, topicId);
563 return get_dcps_info()->remove_topic(domainId, participantId, topicId);
575 get_dcps_info()->ignore_topic(domainId, myParticipantId, ignoreId);
588 return get_dcps_info()->update_topic_qos(topicId, domainId, participantId, qos);
600 const GUID_t& participantId,
620 OpenDDS::DCPS::DataWriterRemote_var dr_remote_obj =
621 servant_to_remote_reference(writer_remote_impl,
orb_);
626 pubId =
get_dcps_info()->add_publication(domainId, participantId, topicId,
627 dr_remote_obj, qos, transInfo, publisherQos, serializedTypeInfo);
643 const GUID_t& participantId,
644 const GUID_t& publicationId)
650 bool removed =
false;
652 get_dcps_info()->remove_publication(domainId, participantId, publicationId);
663 const GUID_t& participantId,
667 get_dcps_info()->ignore_publication(domainId, participantId, ignoreId);
677 const GUID_t& participantId,
683 return get_dcps_info()->update_publication_qos(domainId, participantId, dwId,
696 const GUID_t& participantId,
702 const char* filterClassName,
703 const char* filterExpr,
719 OpenDDS::DCPS::DataReaderRemote_var dr_remote_obj =
720 servant_to_remote_reference(reader_remote_impl,
orb_);
725 subId =
get_dcps_info()->add_subscription(domainId, participantId, topicId,
726 dr_remote_obj, qos, transInfo, subscriberQos,
727 filterClassName, filterExpr, params,
743 const GUID_t& participantId,
744 const GUID_t& subscriptionId)
750 bool removed =
false;
752 get_dcps_info()->remove_subscription(domainId, participantId, subscriptionId);
763 const GUID_t& participantId,
767 get_dcps_info()->ignore_subscription(domainId, participantId, ignoreId);
777 const GUID_t& participantId,
783 return get_dcps_info()->update_subscription_qos(domainId, participantId,
793 const GUID_t& participantId,
799 return get_dcps_info()->update_subscription_params(domainId, participantId,
813 DataReaderMap::iterator drr =
dataReaderMap_.find(subscriptionId);
816 ACE_TEXT(
"(%P|%t) ERROR: InfoRepoDiscovery::removeDataReaderRemote: ")
817 ACE_TEXT(
" could not find DataReader for subscriptionId.\n")));
823 remote_reference_to_servant<DataReaderRemoteImpl>(drr->second.in(),
orb_);
825 deactivate_remote_object(drr->second.in(),
orb_);
826 }
catch (
const CORBA::BAD_INV_ORDER&) {
829 }
catch (
const CORBA::OBJECT_NOT_EXIST&) {
842 ACE_TEXT(
"(%P|%t) ERROR: InfoRepoDiscovery::removeDataWriterRemote: ")
843 ACE_TEXT(
" could not find DataWriter for publicationId.\n")));
849 remote_reference_to_servant<DataWriterRemoteImpl>(dwr->second.in(),
orb_);
851 deactivate_remote_object(dwr->second.in(),
orb_);
852 }
catch (
const CORBA::BAD_INV_ORDER&) {
855 }
catch (
const CORBA::OBJECT_NOT_EXIST&) {
877 ACE_TEXT(
"(%P|%t) NOTICE: InfoRepoDiscovery::Config::discovery_config ")
878 ACE_TEXT(
"failed to open [%s] section.\n"),
890 ACE_TEXT(
"(%P|%t) InfoRepoDiscovery::Config::discovery_config ")
891 ACE_TEXT(
"repo sections must have a subsection name\n")),
898 ACE_TEXT(
"(%P|%t) InfoRepoDiscovery::Config::discovery_config ")
899 ACE_TEXT(
"too many nesting layers in the [repo] section.\n")),
904 for (KeyList::const_iterator it=keys.begin(); it != keys.end(); ++it) {
905 std::string repo_name = (*it).first;
910 bool repoKeySpecified =
false, bitIpSpecified =
false,
911 bitPortSpecified =
false;
915 for (ValueMap::const_iterator it=values.begin(); it != values.end(); ++it) {
916 std::string
name = (*it).first;
917 if (name ==
"RepositoryKey") {
918 repoKey = (*it).second;
919 repoKeySpecified =
true;
922 ACE_TEXT(
"(%P|%t) [repository/%C]: RepositoryKey == %C\n"),
923 repo_name.c_str(), repoKey.c_str()));
926 }
else if (name ==
"RepositoryIor") {
927 repoIor = (*it).second;
931 ACE_TEXT(
"(%P|%t) [repository/%C]: RepositoryIor == %C\n"),
932 repo_name.c_str(), repoIor.c_str()));
934 }
else if (name ==
"DCPSBitTransportIPAddress") {
935 bitIp = (*it).second;
936 bitIpSpecified =
true;
939 ACE_TEXT(
"(%P|%t) [repository/%C]: DCPSBitTransportIPAddress == %C\n"),
940 repo_name.c_str(), bitIp.c_str()));
942 }
else if (name ==
"DCPSBitTransportPort") {
943 std::string
value = (*it).second;
945 bitPortSpecified =
true;
949 ACE_TEXT(
"(%P|%t) InfoRepoDiscovery::Config::discovery_config ")
950 ACE_TEXT(
"Illegal integer value for DCPSBitTransportPort (%C) in [repository/%C] section.\n"),
951 value.c_str(), repo_name.c_str()),
956 ACE_TEXT(
"(%P|%t) [repository/%C]: DCPSBitTransportPort == %d\n"),
957 repo_name.c_str(), bitPort));
961 ACE_TEXT(
"(%P|%t) InfoRepoDiscovery::Config::discovery_config ")
962 ACE_TEXT(
"Unexpected entry (%C) in [repository/%C] section.\n"),
963 name.c_str(), repo_name.c_str()),
968 if (values.find(
"RepositoryIor") == values.end()) {
970 ACE_TEXT(
"(%P|%t) InfoRepoDiscovery::Config::discovery_config ")
971 ACE_TEXT(
"Repository section [repository/%C] section is missing RepositoryIor value.\n"),
976 if (!repoKeySpecified) {
982 make_rch<InfoRepoDiscovery>(repoKey, repoIor.c_str()));
983 if (bitPortSpecified) discovery->bit_transport_port(bitPort);
984 if (bitIpSpecified) discovery->bit_transport_ip(bitIp);
986 DCPS::static_rchandle_cast<Discovery>(discovery));
1025 if (
orb_->orb_core()->has_shutdown() ==
false) {
1033 "ERROR: InfoRepoDiscovery::OrbRunner");
1037 "ERROR: InfoRepoDiscovery::OrbRunner");
1041 "ERROR: InfoRepoDiscovery::OrbRunner");
1044 if (
orb_->orb_core()->has_shutdown()) {
1048 orb_->orb_core()->reactor()->reset_reactor_event_loop();
1057 const char*
name() {
return "repository"; }
1085 ACE_TEXT(
"OpenDDS_InfoRepoDiscovery"),
virtual bool ignore_topic(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &myParticipantId, const OpenDDS::DCPS::GUID_t &ignoreId)
#define TheTransportRegistry
Implements the OpenDDS::DCPS::DomainParticipant interfaces.
CORBA::Object_ptr resolve_initial_references(const char *name, ACE_Time_Value *timeout=0)
CORBA::Object_ptr string_to_object(const char *str)
Implements the OpenDDS::DCPS::ReaderRemote interface that is used to add and remove associations...
const char * c_str(void) const
static const char * DEFAULT_REPO
Key value for the default repository IOR.
const LogLevel::Value value
int bit_transport_port_
The builtin topic transport port number.
virtual int init(int argc, ACE_TCHAR *argv[])
CORBA::Policy_ptr create_policy(CORBA::PolicyType type, const CORBA::Any &val)
ACE_FACTORY_DEFINE(OpenDDS_InfoRepoDiscovery, IRDiscoveryLoader)
virtual bool attach_participant(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId)
virtual bool update_subscription_qos(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &partId, const OpenDDS::DCPS::GUID_t &drId, const DDS::DataReaderQos &qos, const DDS::SubscriberQos &subscriberQos)
const GUID_t GUID_UNKNOWN
Nil value for GUID.
DDS::ReturnCode_t create_bit_topics(DomainParticipantImpl *participant)
DCPSInfo_var get_dcps_info()
Implements the OpenDDS::DCPS::DataWriterRemote interface.
& ACE_SVC_NAME(TAO_AV_TCP_Factory)
CORBA::OctetSeq_var ObjectId_var
static const ACE_TCHAR REPO_SECTION_NAME[]
std::string get_stringified_dcps_info_ior()
virtual DDS::Subscriber_ptr create_subscriber(const DDS::SubscriberQos &qos, DDS::SubscriberListener_ptr a_listener, DDS::StatusMask mask)
bool set_ORB(CORBA::ORB_ptr orb)
virtual void fini_bit(DCPS::DomainParticipantImpl *participant)
virtual bool update_subscription_params(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const OpenDDS::DCPS::GUID_t &subscriptionId, const DDS::StringSeq ¶ms)
static ACE_Thread_Mutex mtx_orb_runner_
LivelinessQosPolicy liveliness
virtual const ACE_Configuration_Section_Key & root_section(void) const
virtual OpenDDS::DCPS::GUID_t add_subscription(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const OpenDDS::DCPS::GUID_t &topicId, OpenDDS::DCPS::DataReaderCallbacks_rch subscription, const DDS::DataReaderQos &qos, const OpenDDS::DCPS::TransportLocatorSeq &transInfo, const DDS::SubscriberQos &subscriberQos, const char *filterClassName, const char *filterExpression, const DDS::StringSeq &exprParams, const XTypes::TypeInformation &type_info)
RcHandle< TransportConfig > TransportConfig_rch
int discovery_config(ACE_Configuration_Heap &cf)
virtual bool update_publication_qos(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &partId, const OpenDDS::DCPS::GUID_t &dwId, const DDS::DataWriterQos &qos, const DDS::PublisherQos &publisherQos)
sequence< TransportLocator > TransportLocatorSeq
virtual bool update_domain_participant_qos(DDS::DomainId_t domain, const OpenDDS::DCPS::GUID_t &participantId, const DDS::DomainParticipantQos &qos)
virtual ~InfoRepoDiscovery()
std::string bit_transport_ip_
The builtin topic transport address.
ACE_STATIC_SVC_DEFINE(ACE_Logging_Strategy, ACE_TEXT("Logging_Strategy"), ACE_Service_Type::SERVICE_OBJECT, &ACE_SVC_NAME(ACE_Logging_Strategy), ACE_Service_Type::DELETE_THIS|ACE_Service_Type::DELETE_OBJ, 0) extern "C" int _get_dll_unload_policy()
RcHandle< TransportInst > TransportInst_rch
The type definition for the smart-pointer to the underlying type.
void bind_config(const OPENDDS_STRING &name, DDS::Entity_ptr entity)
const DDS::StatusMask DEFAULT_STATUS_MASK
const char *const BUILT_IN_PUBLICATION_TOPIC
const char DEFAULT_ORB_NAME[]
virtual bool remove_subscription(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const OpenDDS::DCPS::GUID_t &subscriptionId)
DOMAINID_TYPE_NATIVE DomainId_t
TransportInst_rch new_inst(const std::string &)
DataWriterMap dataWriterMap_
virtual bool ignore_subscription(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &myParticipantId, const OpenDDS::DCPS::GUID_t &ignoreId)
Duration_t lease_duration
DurabilityQosPolicyKind kind
DurabilityQosPolicy durability
long ParticipantCryptoHandle
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
void serialize_type_info(const TypeInformation &type_info, T &seq, const DCPS::Encoding *encoding_option=0)
TransportConfig_rch bit_config_
TransportConfig_rch bit_config()
int thr_sigsetmask(int how, const sigset_t *nsm, sigset_t *osm)
bool use_local_bit_config_
sequence< Policy > PolicyList
virtual ACE_CString _info(void) const=0
virtual OpenDDS::DCPS::TopicStatus assert_topic(OpenDDS::DCPS::GUID_t_out topicId, DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const char *topicName, const char *dataTypeName, const DDS::TopicQos &qos, bool hasDcpsKey, TopicCallbacks *topic_callbacks)
const char *const BUILT_IN_PARTICIPANT_TOPIC
sequence< octet > OctetSeq
TransportInst_rch create_inst(const OPENDDS_STRING &name, const OPENDDS_STRING &transport_type)
static CORBA::ORB_ptr _duplicate(CORBA::ORB_ptr orb)
TransportConfig_rch create_config(const OPENDDS_STRING &name)
virtual OpenDDS::DCPS::TopicStatus remove_topic(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const OpenDDS::DCPS::GUID_t &topicId)
void removeDataWriterRemote(const GUID_t &publicationId)
virtual bool ignore_publication(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &myParticipantId, const OpenDDS::DCPS::GUID_t &ignoreId)
const char *const BUILT_IN_SUBSCRIPTION_TOPIC
#define SUBSCRIBER_QOS_DEFAULT
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
virtual OpenDDS::DCPS::TopicStatus find_topic(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const char *topicName, CORBA::String_out dataTypeName, DDS::TopicQos_out qos, OpenDDS::DCPS::GUID_t_out topicId)
virtual bool update_topic_qos(const OpenDDS::DCPS::GUID_t &topicId, DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const DDS::TopicQos &qos)
ORB_ptr ORB_init(int &argc, char *argv[], const char *orb_name=0)
Base class for concrete transports to provide new objects.
const char *const BUILT_IN_TOPIC_TOPIC
virtual int open_section(const ACE_Configuration_Section_Key &base, const ACE_TCHAR *sub_section, bool create, ACE_Configuration_Section_Key &result)
const char * retcode_to_string(DDS::ReturnCode_t value)
RcHandle< T > static_rchandle_cast(const RcHandle< U > &h)
virtual OpenDDS::DCPS::AddDomainStatus add_domain_participant(DDS::DomainId_t domain, const DDS::DomainParticipantQos &qos, XTypes::TypeLookupService_rch tls)
virtual OpenDDS::DCPS::AddDomainStatus add_domain_participant_secure(DDS::DomainId_t domain, const DDS::DomainParticipantQos &qos, XTypes::TypeLookupService_rch tls, const OpenDDS::DCPS::GUID_t &guid, DDS::Security::IdentityHandle id, DDS::Security::PermissionsHandle perm, DDS::Security::ParticipantCryptoHandle part_crypto)
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
virtual RcHandle< BitSubscriber > init_bit(DomainParticipantImpl *participant)
Discovery Strategy interface class.
static const char DEFAULT_INST_PREFIX[]
static TransportRegistry * instance()
Return a singleton instance of this class.
int processSections(ACE_Configuration_Heap &cf, const ACE_Configuration_Section_Key &key, KeyList &subsections)
static OrbRunner * orb_runner_
virtual bool remove_domain_participant(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId)
#define ACE_NEW_RETURN(POINTER, CONSTRUCTOR, RET_VAL)
const ReturnCode_t RETCODE_OK
virtual OpenDDS::DCPS::GUID_t generate_participant_guid()
virtual bool ignore_domain_participant(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &myParticipantId, const OpenDDS::DCPS::GUID_t &ignoreId)
#define ACE_ERROR_RETURN(X, Y)
DataReaderMap dataReaderMap_
void removeDataReaderRemote(const GUID_t &subscriptionId)
virtual int activate(long flags=THR_NEW_LWP|THR_JOINABLE|THR_INHERIT_SCHED, int n_threads=1, int force_active=0, long priority=ACE_DEFAULT_THREAD_PRIORITY, int grp_id=-1, ACE_Task_Base *task=0, ACE_hthread_t thread_handles[]=0, void *stack[]=0, size_t stack_size[]=0, ACE_thread_t thread_ids[]=0, const char *thr_name[]=0)
int pullValues(ACE_Configuration_Heap &cf, const ACE_Configuration_Section_Key &key, ValueMap &values)
Defines the interface for Discovery callbacks into the Topic.
#define TheServiceParticipant
virtual OpenDDS::DCPS::GUID_t add_publication(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const OpenDDS::DCPS::GUID_t &topicId, OpenDDS::DCPS::DataWriterCallbacks_rch publication, const DDS::DataWriterQos &qos, const OpenDDS::DCPS::TransportLocatorSeq &transInfo, const DDS::PublisherQos &publisherQos, const XTypes::TypeInformation &type_info)
int sigfillset(sigset_t *s)
The Internal API and Implementation of OpenDDS.
Atomic< unsigned long > use_count_
void _tao_print_exception(const char *info, FILE *f=stdout) const
virtual DDS::TopicDescription_ptr lookup_topicdescription(const char *name)
virtual bool remove_publication(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const OpenDDS::DCPS::GUID_t &publicationId)
bool register_type(const TransportType_rch &type)
sequence< string > StringSeq
bool convertToInteger(const String &s, T &value)
InfoRepoDiscovery(const RepoKey &key, const std::string &ior)