28 #ifndef DDS_HAS_MINIMUM_BIT 35 #include <dds/DdsDcpsCoreC.h> 36 #include <dds/DdsDcpsGuidTypeSupportImpl.h> 37 #ifndef DDS_HAS_MINIMUM_BIT 38 # include <dds/DdsDcpsCoreTypeSupportC.h> 54 , participant_servant_(0)
56 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
57 , is_exclusive_ownership_(false)
59 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
95 if (!disco || !disco->remove_subscription(
domain_id_,
100 "could not remove subscription from discovery\n"));
126 topic_desc_ = DDS::TopicDescription::_duplicate(a_topic_desc);
127 if (
TopicImpl* a_topic = dynamic_cast<TopicImpl*>(a_topic_desc)) {
135 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE 146 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE 185 "(%P|%t) RecorderImpl::data_received: " 186 "%C received sample: %C\n",
199 if (ser >> encap && encap.to_any_encoding(enc)) {
232 #ifndef OPENDDS_SAFETY_PROFILE 237 DDS::DynamicType_var dt = tls->type_identifier_to_dynamic(ti, pub_id);
240 "(%P|%t) RecorderImpl::add_association: " 241 "DynamicType added to map with guid: %C\n",
LogGuid(pub_id).c_str()));
243 dt_map_.insert(std::make_pair(pub_id, dt));
254 "bit %d local %C remote %C\n",
302 WriterMapType::value_type(
356 "transport layer failed to associate\n"));
423 RepoIdToHandleMap::value_type(writer.
writerId, handle));
427 ACE_TEXT(
"(%P|%t) RecorderImpl::add_association: ")
428 ACE_TEXT(
"id_to_handle_map_[ %C] = 0x%x.\n"),
487 if (writers.length() == 0) {
493 ACE_TEXT(
"(%P|%t) RecorderImpl::remove_associations: ")
494 ACE_TEXT(
"bit %d local %C remote %C num remotes %d\n"),
514 if (writers.length() == 0) {
520 ACE_TEXT(
"(%P|%t) RecorderImpl::remove_associations_i: ")
521 ACE_TEXT(
"bit %d local %C remote %C num remotes %d\n"),
545 wr_len = writers.length();
548 GUID_t writer_id = writers[i];
550 #ifndef OPENDDS_SAFETY_PROFILE 551 if (
dt_map_.erase(writer_id) == 0) {
554 "failed to find writer_id in the DynamicTypeByPubId map.\n"));
559 WriterMapType::iterator it =
writers_.find(writer_id);
561 it->second->removed();
564 if (
writers_.erase(writer_id) == 0) {
567 ACE_TEXT(
"(%P|%t) RecorderImpl::remove_associations_i: ")
568 ACE_TEXT(
"the writer local %C was already removed.\n"),
578 wr_len = updated_writers.length();
594 for (
CORBA::ULong i = 0; i < updated_writers.length(); ++i) {
612 = handles[ wr_len - 1];
644 for (
unsigned int i = 0; i < handles.length(); ++i) {
662 size =
static_cast<int>(
writers_.size());
663 writers.length(size);
665 WriterMapType::iterator curr_writer =
writers_.begin();
666 WriterMapType::iterator end_writer =
writers_.end();
670 while (curr_writer != end_writer) {
671 writers[i++] = curr_writer->first;
730 GUID_t prefix = remote_participant;
735 typedef std::pair<GUID_t, RcHandle<WriterInfo> > WriterSetElement;
741 for (WriterMapType::iterator pos =
writers_.lower_bound(prefix),
745 writers.push_back(std::make_pair(pos->first, pos->second));
750 for (WriterSet::iterator pos = writers.begin(), limit = writers.end();
753 pos->second->received_activity(now);
764 if (
subqos_ != subscriber_qos) {
791 disco->update_subscription_qos(
857 ACE_TEXT(
"(%P|%t) RecorderImpl::lookup_instance_handles: ")
858 ACE_TEXT(
"searching for handles for writer Ids: %C.\n"),
862 hdls.length(num_wrts);
874 ACE_TEXT(
"(%P|%t) RecorderImpl::enable\n")));
940 "add_subscription returned invalid id\n"));
973 #if !defined (DDS_HAS_MINIMUM_BIT) 985 DDS::PublicationBuiltinTopicDataSeq data;
987 DDS::ReturnCode_t const ret = instance_handle_to_bit_data<DDS::PublicationBuiltinTopicDataDataReader_var>(
999 #endif // !defined (DDS_HAS_MINIMUM_BIT) 1001 #ifndef OPENDDS_SAFETY_PROFILE 1006 if (dt_found ==
dt_map_.end()) {
1014 DDS::DynamicType_var dt = dt_found->second;
1016 DDS::DynamicData_var dd_var = dd;
1020 "Encountered unsupported combination of XCDR1 encoding and mutable extensibility.\n"));
1024 return dd_var._retn();
DataSampleHeader header_
The demarshalled sample header.
Implements the OpenDDS::DCPS::DomainParticipant interfaces.
bool check_xcdr1_mutable(DDS::DynamicType_ptr dt)
RcHandle< T > rchandle_from(T *pointer)
sequence< InstanceHandle_t > InstanceHandleSeq
#define ACE_GUARD(MUTEX, OBJ, LOCK)
typedef OPENDDS_VECTOR(WeakRcHandle< TransportImpl >) ImplsType
DDS::ReturnCode_t set_enabled()
DDS::StatusMask listener_mask_
const InstanceHandle_t HANDLE_NIL
RecorderListener_rch listener_
char message_id_
The enum MessageId.
Base class to hold configuration settings for TransportImpls.
void enable_transport(bool reliable, bool durable)
const ReturnCode_t RETCODE_INCONSISTENT_POLICY
ReliabilityQosPolicy reliability
DDS::QosPolicyId_t last_policy_id
RecorderListener_rch get_listener()
const TransportLocatorSeq & connection_info() const
DurabilityQosPolicy durability
TransportLocator writerDiscInfo
TransportLocatorSeq remote_data_
const GUID_t GUID_UNKNOWN
Nil value for GUID.
virtual bool check_transport_qos(const TransportInst &inst)
Message_Block_Ptr sample_
The data in unspecified format.
GUID_t publication_id_
Id of the datawriter that sent the sample.
static bool changeable(const DDS::UserDataQosPolicy &qos1, const DDS::UserDataQosPolicy &qos2)
ACE_UINT32 source_timestamp_nanosec_
OwnershipQosPolicy ownership
const char * c_str() const
OwnershipManager * ownership_manager()
DDS::TopicDescription_var topic_desc_
#define ACE_READ_GUARD(MUTEX, OBJ, LOCK)
TransportLocator discovery_locator_
ACE_CDR::ULong remote_transport_context_
DDS::SubscriberQos subqos_
#define OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(qos, error_rtn_value)
OwnershipQosPolicyKind kind
sequence< TransportLocator > TransportLocatorSeq
DomainParticipantImpl * participant()
void return_handle(DDS::InstanceHandle_t handle)
void disassociate(const GUID_t &peerId)
ACE_RW_Thread_Mutex writers_lock_
RW lock for reading/writing publications.
void set_reader_effective_data_rep_qos(DDS::DataRepresentationIdSeq &qos)
virtual void data_received(const ReceivedDataSample &sample)
DDS::DynamicData_ptr get_dynamic_data(const RawDataSample &sample)
const DDS::StatusMask DEFAULT_STATUS_MASK
unsigned long transportContext
void unregister_for_writer(const GUID_t &participant, const GUID_t &readerid, const GUID_t &writerid)
DDS::ReturnCode_t set_qos(const DDS::SubscriberQos &subscriber_qos, const DDS::DataReaderQos &datareader_qos)
const ReturnCode_t RETCODE_IMMUTABLE_POLICY
#define OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(qos, error_rtn_value)
static TimePoint_T< MonotonicClock > now()
const char *const BUILT_IN_PUBLICATION_TOPIC
Implements the DDS::Topic interface.
DDS::QosPolicyCountSeq policies
DDS::ReturnCode_t get_qos(DDS::SubscriberQos &subscriber_qos, DDS::DataReaderQos &datareader_qos)
InstanceHandle_t last_publication_handle
DomainParticipantImpl * participant_servant_
ReliabilityQosPolicyKind kind
TransportMessageBlockAllocator mb_alloc_
DDS::DomainId_t domain_id_
DurabilityQosPolicyKind kind
Class to serialize and deserialize data for DDS.
DurabilityQosPolicy durability
DDS::RequestedIncompatibleQosStatus requested_incompatible_qos_status_
virtual void remove_associations(const WriterIdSeq &writers, CORBA::Boolean callback)
Holds a data sample received by the transport.
DDS::ReturnCode_t cleanup()
DataRepresentationQosPolicy representation
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
QosPolicyId_t last_policy_id
long current_count_change
void remove_all_associations()
DDS::ReturnCode_t enable()
void lookup_instance_handles(const WriterIdSeq &ids, DDS::InstanceHandleSeq &hdls)
Lookup the instance handles by the publication repo ids.
virtual void remove_associations_i(const WriterIdSeq &writers, bool callback)
bool is_exclusive_ownership_
RepoIdToHandleMap id_to_handle_map_
DDS::DataReaderQos passed_qos_
sequence< GUID_t > WriterIdSeq
OwnershipManager * owner_manager_
DataSampleHeader header_
The sample data header.
virtual void register_for_writer(const GUID_t &, const GUID_t &, const GUID_t &, const TransportLocatorSeq &, DiscoveryListener *)
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
virtual void unregister_for_writer(const GUID_t &, const GUID_t &, const GUID_t &)
virtual DDS::DomainId_t get_domain_id()
virtual void on_recorder_matched(Recorder *recorder, const DDS::SubscriptionMatchedStatus &status)=0
ACE_Recursive_Thread_Mutex publication_handle_lock_
DDS::InstanceHandle_t get_entity_instance_handle(const GUID_t &id, const RcHandle< DomainParticipantImpl > &participant)
HANDLE_TYPE_NATIVE InstanceHandle_t
void init(TopicDescriptionImpl *a_topic_desc, const DDS::DataReaderQos &qos, RecorderListener_rch a_listener, const DDS::StatusMask &mask, DomainParticipantImpl *participant, DDS::SubscriberQos subqos)
ACE_INT32 source_timestamp_sec_
long count_since_last_send
TransportLocatorSeq writerTransInfo
bool associate(const AssociationData &peer, bool active)
TransportPriorityQosPolicy transport_priority
virtual GUID_t get_guid() const
virtual void update_incompatible_qos(const IncompatibleQosStatus &status)
OpenDDS_Dcps_Export bool equal_guid_prefixes(const GuidPrefix_t &lhs, const GuidPrefix_t &rhs)
AtomicBool enabled_
The flag indicates the entity is enabled.
Priority publication_transport_priority_
virtual void add_association(const GUID_t &yourId, const WriterAssociation &writer, bool active)
void push_back(Seq &seq, const typename Seq::value_type &val)
std::vector-style push_back() for CORBA Sequences
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
XTypes::TypeLookupService_rch get_type_lookup_service()
ReliabilityQosPolicy reliability
OpenDDS_Dcps_Export LogLevel log_level
DDS::SubscriptionMatchedStatus subscription_match_status_
#define OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK(qos, error_rtn_value)
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Implements the DDS::TopicDescription interface.
virtual void signal_liveliness(const GUID_t &remote_participant)
const ReturnCode_t RETCODE_ERROR
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
ACE_Message_Block * data(ACE_Allocator *mb_alloc=0) const
DynamicTypeByPubId dt_map_
const ReturnCode_t RETCODE_OK
const ReturnCode_t RETCODE_UNSUPPORTED
virtual void on_sample_data_received(Recorder *recorder, const RawDataSample &sample)=0
void add_to_dynamic_type_map(const GUID_t &pub_id, const XTypes::TypeIdentifier &ti)
void register_for_writer(const GUID_t &participant, const GUID_t &readerid, const GUID_t &writerid, const TransportLocatorSeq &locators, DiscoveryListener *listener)
const char * to_string(MessageId value)
QosPolicyCountSeq policies
DDS::InstanceHandle_t assign_handle(const GUID_t &id=GUID_UNKNOWN)
DataRepresentationIdSeq value
virtual void notify_subscription_disconnected(const WriterIdSeq &pubids)
virtual void notify_subscription_lost(const WriterIdSeq &pubids)
#define ACE_WRITE_GUARD(MUTEX, OBJ, LOCK)
static bool valid(const DDS::UserDataQosPolicy &qos)
DDS::InstanceHandle_t lookup_handle(const GUID_t &id) const
#define TheServiceParticipant
DDS::ReturnCode_t set_listener(const RecorderListener_rch &a_listener, DDS::StatusMask mask)
static bool consistent(const DDS::ResourceLimitsQosPolicy &resource_limits, const DDS::HistoryQosPolicy &history)
::DDS::DataWriterQos writerQos
The Internal API and Implementation of OpenDDS.
Defines the interface that allows DataWriters (and lower levels) to inform discovery.
virtual char * get_name()
TopicDescriptionPtr< TopicImpl > topic_servant_
virtual bool is_reliable() const =0
Does the transport as configured support RELIABLE_RELIABILITY_QOS?
virtual CORBA::Long get_priority_value(const AssociationData &data) const
virtual DDS::ReturnCode_t repoid_to_bit_key(const DCPS::GUID_t &id, DDS::BuiltinTopicKey_t &key)
#define OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(qos, error_rtn_value)
Encoding::Kind encoding_kind_
Holds information on which type of encoding was read from the encapsulation header.
sequence< string > StringSeq
virtual void notify_subscription_reconnected(const WriterIdSeq &pubids)
virtual DDS::InstanceHandle_t get_instance_handle()