25 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 30 #if !defined (DDS_HAS_MINIMUM_BIT) 32 #endif // !defined (DDS_HAS_MINIMUM_BIT) 53 : data_dropped_count_(0),
54 data_delivered_count_(0),
58 participant_servant_(0),
63 publisher_servant_(0),
70 empty_condition_(
lock_),
71 pending_write_count_(0)
134 if (!disco->remove_publication(
136 this->participant_servant_->get_id(),
140 ACE_TEXT(
"PublisherImpl::delete_datawriter, ")
141 ACE_TEXT(
"publication not removed from discovery.\n")),
149 DDS::Topic_ptr topic,
164 #if !defined (DDS_HAS_MINIMUM_BIT) 166 #endif // !defined (DDS_HAS_MINIMUM_BIT) 232 ACE_TEXT(
"(%P|%t) DataWriterImpl::set_qos, ")
238 if (!(
qos_ == qos)) {
330 "(%P|%t) ReplayerImpl::enable-mb" 331 " Cached_Allocator_With_Overflow %x with %d chunks\n",
336 "(%P|%t) ReplayerImpl::enable-db" 337 " Cached_Allocator_With_Overflow %x with %d chunks\n",
342 "(%P|%t) ReplayerImpl::enable-header" 343 " Cached_Allocator_With_Overflow %x with %d chunks\n",
356 ACE_TEXT(
"(%P|%t) ERROR: ReplayerImpl::enable, ")
357 ACE_TEXT(
"Transport Exception.\n")));
390 ACE_TEXT(
"(%P|%t) ERROR: ReplayerImpl::enable, ")
391 ACE_TEXT(
"add_publication returned invalid id.\n")));
409 ACE_TEXT(
"(%P|%t) ReplayerImpl::add_association - ")
410 ACE_TEXT(
"bit %d local %C remote %C\n"),
439 ACE_TEXT(
"(%P|%t) ReplayerImpl::add_association(): ")
440 ACE_TEXT(
"adding subscription to publication %C with priority %d.\n"),
459 ACE_TEXT(
"(%P|%t) ReplayerImpl::add_association: ")
460 ACE_TEXT(
"ERROR: transport layer failed to associate.\n")));
480 ACE_UNUSED_ARG(filter);
481 ACE_UNUSED_ARG(params);
482 ACE_UNUSED_ARG(participant);
499 ACE_TEXT(
"(%P|%t) ERROR: ReplayerImpl::association_complete_i: ")
500 ACE_TEXT(
"insert %C from pending failed.\n"),
525 ACE_TEXT(
"(%P|%t) ERROR: ReplayerImpl::association_complete_i: ")
526 ACE_TEXT(
"id_to_handle_map_%C = 0x%x failed.\n"),
533 ACE_TEXT(
"(%P|%t) ReplayerImpl::association_complete_i: ")
534 ACE_TEXT(
"id_to_handle_map_%C = 0x%x.\n"),
564 ACE_TEXT(
"(%P|%t) ReplayerImpl::remove_associations: ")
565 ACE_TEXT(
"bit %d local %C remote %C num remotes %d\n"),
598 ++fully_associated_len;
599 fully_associated_readers.length(fully_associated_len);
600 fully_associated_readers [fully_associated_len - 1] = readers[i];
605 RepoIdToSequenceMap::iterator where
618 rds [rds_len - 1] = readers[i];
625 if (fully_associated_len > 0 && !
is_bit_) {
629 for (
CORBA::ULong i = 0; i < fully_associated_len; ++i) {
641 int matchedSubscriptions =
655 handles[rds_len - 1];
679 if (notify_lost && handles.length() > 0) {
683 for (
unsigned int i = 0; i < handles.length(); ++i) {
698 readers.length(size);
700 RepoIdSet::iterator itEnd =
readers_.end();
703 for (RepoIdSet::iterator it =
readers_.begin(); it != itEnd; ++it) {
758 ACE_UNUSED_ARG(readerId);
759 ACE_UNUSED_ARG(params);
787 ACE_TEXT(
"(%P|%t) ERROR: ReplayerImpl::data_delivered: ")
788 ACE_TEXT(
" The publication id %C from delivered element ")
789 ACE_TEXT(
"does not match the datawriter's id %C\n"),
810 ACE_UNUSED_ARG(sample);
815 bool dropped_by_transport)
819 ACE_UNUSED_ARG(dropped_by_transport);
835 ACE_UNUSED_ARG(sample);
841 ACE_UNUSED_ARG(subids);
847 ACE_UNUSED_ARG(subids);
853 ACE_UNUSED_ARG(subids);
859 ACE_UNUSED_ARG(handles);
883 ACE_TEXT(
"(%P|%t) ERROR: ReplayerImpl::write: ")
884 ACE_TEXT(
"Invalid reader instance handle (%d)\n"), *reader_ih_ptr),
891 for (
int i = 0; i < num_samples; ++i) {
896 static_cast<DataSampleElement*>(
922 while (list.
dequeue(element)) {
937 for (RepoIdToReaderInfoMap::iterator iter =
reader_info_.begin(),
948 return this->
write(&sample, 1, 0);
985 static_cast<ACE_Message_Block*>(
1000 *message << header_data;
1020 ACE_TEXT(
"(%P|%t) ReplayerImpl::lookup_instance_handles: ")
1021 ACE_TEXT(
"searching for handles for reader Ids: %C.\n"),
1025 hdls.length(num_rds);
1035 for (RepoIdToReaderInfoMap::const_iterator it =
reader_info_.begin(),
1054 return write(&sample, 1, &subscription);
1059 const RawDataSampleList& samples )
1061 if (!samples.empty())
1062 return write(&samples[0], static_cast<int>(samples.size()), &subscription);
void set_sample(Message_Block_Ptr sample)
void association_complete_i(const GUID_t &remote_id)
void send(SendStateDataSampleList send_list, ACE_UINT64 transaction_id=0)
RcHandle< PublicationInstance > PublicationInstance_rch
Implements the OpenDDS::DCPS::DomainParticipant interfaces.
RcHandle< T > rchandle_from(T *pointer)
sequence< InstanceHandle_t > InstanceHandleSeq
ACE_CDR::ULong typeobject_serialized_size
#define ACE_GUARD(MUTEX, OBJ, LOCK)
OPENDDS_STRING topic_name
virtual void add_association(const GUID_t &yourId, const ReaderAssociation &reader, bool active)
unique_ptr< DataBlockAllocator > db_allocator_
DDS::ReturnCode_t set_enabled()
const DataSampleHeader & get_header() const
static const ACE_Time_Value max_time
const InstanceHandle_t HANDLE_NIL
char message_id_
The enum MessageId.
Base class to hold configuration settings for TransportImpls.
void enable_transport(bool reliable, bool durable)
const ReturnCode_t RETCODE_INCONSISTENT_POLICY
DDS::DataWriterQos qos_
The qos policy list of this datawriter.
virtual void init(DDS::Topic_ptr topic, TopicImpl *topic_servant, const DDS::DataWriterQos &qos, ReplayerListener_rch a_listener, const DDS::StatusMask &mask, OpenDDS::DCPS::DomainParticipantImpl *participant_servant, const DDS::PublisherQos &publisher_qos)
ReliabilityQosPolicy reliability
::DDS::DataReaderQos readerQos
void set_sub_id(CORBA::ULong index, OpenDDS::DCPS::GUID_t id)
DDS::QosPolicyId_t last_policy_id
const TransportLocatorSeq & connection_info() const
const long DURATION_INFINITE_SEC
DurabilityQosPolicy durability
#define OPENDDS_NO_DURABILITY_SERVICE_COMPATIBILITY_CHECK(qos, error_rtn_value)
TransportLocatorSeq remote_data_
const GUID_t GUID_UNKNOWN
Nil value for GUID.
int bind(Container &c, const FirstType &first, const SecondType &second)
virtual void notify_publication_disconnected(const ReaderIdSeq &subids)
RepoIdToHandleMap id_to_handle_map_
static bool changeable(const DDS::UserDataQosPolicy &qos1, const DDS::UserDataQosPolicy &qos2)
ACE_UINT32 source_timestamp_nanosec_
GUID_t get_pub_id() const
const char * c_str() const
unique_ptr< DataSampleElementAllocator > sample_list_element_allocator_
TransportLocator discovery_locator_
bool need_sequence_repair() const
ACE_CDR::ULong remote_transport_context_
#define OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(qos, error_rtn_value)
virtual DDS::ReturnCode_t write_to_reader(DDS::InstanceHandle_t subscription, const RawDataSample &sample)
virtual void notify_publication_lost(const ReaderIdSeq &subids)
sequence< TransportLocator > TransportLocatorSeq
ACE_Recursive_Thread_Mutex lock_
The sample data container.
virtual bool check_transport_qos(const TransportInst &inst)
bool cdr_encapsulation() const
unique_ptr< DataSampleHeaderAllocator > header_allocator_
bool topicIsBIT(const char *name, const char *type)
void return_handle(DDS::InstanceHandle_t handle)
T::rv_reference move(T &p)
DataRepresentationQosPolicy representation
Cached_Allocator_With_Overflow< DataSampleHeader, ACE_Null_Mutex > DataSampleHeaderAllocator
RepoIdToSequenceMap idToSequence_
DDS::PublisherQos pub_qos
void disassociate(const GUID_t &peerId)
ACE_Guard< ACE_Thread_Mutex > lock_
ReaderInfo(const char *filter, const DDS::StringSeq ¶ms, DomainParticipantImpl *participant, bool durable)
virtual void unregister_for_reader(const GUID_t &participant, const GUID_t &writerid, const GUID_t &readerid)
GUID_t get_repoid(DDS::InstanceHandle_t id) const
size_t association_chunk_multiplier_
The multiplier for allocators affected by associations.
TopicDescriptionPtr< TopicImpl > topic_servant_
The topic servant.
const DDS::StatusMask DEFAULT_STATUS_MASK
void set_writer_effective_data_rep_qos(DDS::DataRepresentationIdSeq &qos, bool cdr_encapsulated)
const ReturnCode_t RETCODE_IMMUTABLE_POLICY
#define OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(qos, error_rtn_value)
DomainParticipantImpl * participant()
void lookup_instance_handles(const ReaderIdSeq &ids, DDS::InstanceHandleSeq &hdls)
Lookup the instance handles by the subscription repo ids.
Implements the DDS::Topic interface.
DDS::QosPolicyCountSeq policies
sequence< GUID_t > ReaderIdSeq
RepoIdToReaderInfoMap reader_info_
virtual void retrieve_inline_qos_data(InlineQosData &qos_data) const
::DDS::StringSeq exprParams
unique_ptr< MessageBlockAllocator > mb_allocator_
GUID_t publication_id_
The repository id of this datawriter/publication.
DDS::PublicationMatchedStatus publication_match_status_
void enqueue_tail(const DataSampleElement *element)
#define ACE_NEW_MALLOC_RETURN(POINTER, ALLOCATOR, CONSTRUCTOR, RET_VAL)
virtual void control_delivered(const Message_Block_Ptr &sample)
ReliabilityQosPolicyKind kind
DurabilityQosPolicyKind kind
TypeIdentifierWithSize typeid_with_size
DurabilityQosPolicy durability
virtual void data_delivered(const DataSampleElement *sample)
static size_t get_max_serialized_size()
Similar to IDL compiler generated methods.
virtual DDS::InstanceHandle_t get_instance_handle()
void unregister_for_reader(const GUID_t &participant, const GUID_t &writerid, const GUID_t &readerid)
TypeIdentifierWithDependencies complete
#define OPENDDS_NO_OWNERSHIP_STRENGTH_COMPATIBILITY_CHECK(qos, error_rtn_value)
virtual DDS::ReturnCode_t set_qos(const DDS::PublisherQos &publisher_qos, const DDS::DataWriterQos &datawriter_qos)
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
QosPolicyCountSeq policies
DomainParticipantImpl * participant_servant_
DDS::ReturnCode_t create_sample_data_message(Message_Block_Ptr data, DataSampleHeader &header_data, Message_Block_Ptr &message, const DDS::Time_t &source_timestamp, bool content_filter)
virtual DDS::ReturnCode_t set_listener(const ReplayerListener_rch &a_listener, DDS::StatusMask mask)
CORBA::String_var type_name_
The type name of associated topic.
ResourceLimitsQosPolicy resource_limits
DDS::DataWriterQos passed_qos_
virtual void data_dropped(const DataSampleElement *sample, bool dropped_by_transport)
DDS::ReturnCode_t enable()
int data_dropped_count_
Statistics counter.
size_t total_length(void) const
virtual void remove_associations(const ReaderIdSeq &readers, CORBA::Boolean callback)
ACE_CDR::Long dependent_typeid_count
ACE_UINT32 message_length_
virtual DDS::DomainId_t get_domain_id()
void set_num_subs(CORBA::ULong num_subs)
virtual DDS::ReturnCode_t get_qos(DDS::PublisherQos &publisher_qos, DDS::DataWriterQos &datawriter_qos)
size_t n_chunks_
The number of chunks for the cached allocator.
bool dequeue(const DataSampleElement *stale)
bool notify_all()
Unblock all of the threads waiting on this condition.
GUID_t topic_id_
The associated topic repository id.
virtual DDS::ReturnCode_t write(const RawDataSample &sample)
DDS::DataWriterQos dw_qos
DDS::StatusMask listener_mask_
DDS::InstanceHandle_t get_entity_instance_handle(const GUID_t &id, const RcHandle< DomainParticipantImpl > &participant)
HANDLE_TYPE_NATIVE InstanceHandle_t
ACE_INT32 source_timestamp_sec_
long count_since_last_send
void register_for_reader(const GUID_t &participant, const GUID_t &writerid, const GUID_t &readerid, const TransportLocatorSeq &locators, OpenDDS::DCPS::DiscoveryListener *listener)
virtual void on_replayer_matched(Replayer *replayer, const DDS::PublicationMatchedStatus &status)
const unsigned long DURATION_INFINITE_NSEC
void remove_all_associations()
bool associate(const AssociationData &peer, bool active)
TransportPriorityQosPolicy transport_priority
CvStatus wait(ThreadStatusManager &thread_status_manager)
Block until thread is woken up.
Cached_Allocator_With_Overflow< ACE_Message_Block, ACE_Thread_Mutex > MessageBlockAllocator
AtomicBool enabled_
The flag indicates the entity is enabled.
long current_count_change
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
ReliabilityQosPolicy reliability
DDS::Time_t source_timestamp_
The timestamp the sender put on the sample.
DDS::ReturnCode_t cleanup()
virtual void control_dropped(const Message_Block_Ptr &sample, bool dropped_by_transport)
#define OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK(qos, error_rtn_value)
SequenceNumber sequence_number_
The sequence number unique in DataWriter scope.
DDS::PublisherQos publisher_qos_
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
virtual void update_subscription_params(const GUID_t &readerId, const DDS::StringSeq &exprParams)
virtual void register_for_reader(const GUID_t &participant, const GUID_t &writerid, const GUID_t &readerid, const TransportLocatorSeq &locators, DiscoveryListener *listener)
const SequenceNumber_t SEQUENCENUMBER_UNKNOWN
virtual void notify_publication_reconnected(const ReaderIdSeq &subids)
Sequence number abstraction. Only allows positive 64 bit values.
TransportLocatorSeq readerTransInfo
static const ACE_Time_Value zero
static SequenceNumber SEQUENCENUMBER_UNKNOWN()
const ReturnCode_t RETCODE_ERROR
DDS::Topic_var topic_objref_
The object reference of the associated topic.
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
LifespanQosPolicy lifespan
ACE_INT32 lifespan_duration_sec_
virtual GUID_t get_guid() const
int remove(Container &c, const ValueType &v)
Cached_Allocator_With_Overflow< DataSampleElement, ACE_Null_Mutex > DataSampleElementAllocator
const ReturnCode_t RETCODE_OK
const ReturnCode_t RETCODE_UNSUPPORTED
ReplayerListener_rch listener_
Used to notify the entity for relevant events.
virtual CORBA::Long get_priority_value(const AssociationData &data) const
TransportLocator readerDiscInfo
#define ACE_DES_FREE(POINTER, DEALLOCATOR, CLASS)
virtual ReplayerListener_rch get_listener()
DDS::DomainId_t domain_id_
The domain id.
ACE_UINT32 lifespan_duration_nanosec_
virtual void update_incompatible_qos(const IncompatibleQosStatus &status)
DDS::InstanceHandle_t assign_handle(const GUID_t &id=GUID_UNKNOWN)
QosPolicyId_t last_policy_id
const long LENGTH_UNLIMITED
#define ACE_ERROR_RETURN(X, Y)
int insert(Container &c, const ValueType &v)
const character_type * in(void) const
DataRepresentationIdSeq value
int data_delivered_count_
ConditionVariable< ACE_Recursive_Thread_Mutex > empty_condition_
static bool valid(const DDS::UserDataQosPolicy &qos)
#define ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY
DDS::InstanceHandle_t lookup_handle(const GUID_t &id) const
#define TheServiceParticipant
bool is_bit_
The time interval for sending liveliness message.
static bool consistent(const DDS::ResourceLimitsQosPolicy &resource_limits, const DDS::HistoryQosPolicy &history)
InstanceHandle_t last_subscription_handle
The Internal API and Implementation of OpenDDS.
Defines the interface that allows DataWriters (and lower levels) to inform discovery.
CORBA::String_var topic_name_
The name of associated topic.
DDS::OfferedIncompatibleQosStatus offered_incompatible_qos_status_
Status conditions.
TypeIdentifierWithDependencies minimal
#define OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(qos, error_rtn_value)
Cached_Allocator_With_Overflow< ACE_Data_Block, ACE_Thread_Mutex > DataBlockAllocator
unsigned long transportContext
sequence< string > StringSeq