6 #ifndef OPENDDS_DCPS_DATAWRITERIMPL_H 7 #define OPENDDS_DCPS_DATAWRITERIMPL_H 28 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC 32 #include <dds/DdsDcpsDomainC.h> 33 #include <dds/DdsDcpsTopicC.h> 40 #ifndef ACE_LACKS_PRAGMA_ONCE 52 class DomainParticipantImpl;
54 class DataSampleElement;
55 class SendStateDataSampleList;
56 struct AssociationData;
102 , max_wait_(max_wait)
103 , sequence_(sequence)
126 skip_serialize_ =
value;
131 return skip_serialize_;
136 return data_allocator_.get();
146 DDS::DataWriterListener_ptr a_listener,
149 virtual DDS::DataWriterListener_ptr get_listener();
151 virtual DDS::Topic_ptr get_topic();
156 virtual DDS::Publisher_ptr get_publisher();
179 void get_instance_handles(InstanceHandleVec& instance_handles);
186 #if !defined (DDS_HAS_MINIMUM_BIT) 190 #endif // !defined (DDS_HAS_MINIMUM_BIT) 194 virtual void add_association(
const GUID_t& yourId,
198 virtual void transport_assoc_done(
int flags,
const GUID_t& remote_id);
200 virtual void remove_associations(
const ReaderIdSeq & readers,
203 virtual void replay_durable_data_for(
const GUID_t& remote_sub_id);
207 virtual void update_subscription_params(
const GUID_t& readerId,
221 DDS::DataWriterListener_ptr a_listener,
244 register_instance_from_durable_data(
254 unregister_instance_i(
262 void unregister_instances(
const DDS::Time_t& source_timestamp);
276 const void* real_data);
303 return data_container_->get_unsent_data(list);
308 return data_container_->get_resend_data();
319 void unregister_all();
328 void transport_discovery_change();
337 bool should_ack()
const;
346 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 349 bool coherent_changes_pending();
352 void begin_coherent_changes();
355 void end_coherent_changes(
const GroupCoherentSamples& group_samples);
362 char const* get_type_name()
const;
370 bool dropped_by_transport);
377 bool dropped_by_transport);
384 return data_container_->
lock_;
404 void send_suspended_data();
406 void remove_all_associations();
408 virtual void register_for_reader(
const GUID_t& participant,
414 virtual void unregister_for_reader(
const GUID_t& participant,
418 virtual void update_locators(
const GUID_t& remote,
421 void notify_publication_disconnected(
const ReaderIdSeq& subids);
422 void notify_publication_reconnected(
const ReaderIdSeq& subids);
423 void notify_publication_lost(
const ReaderIdSeq& subids);
444 bool content_filter);
446 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE 468 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC 477 return db_lock_pool_->get_lock();
491 return publication_id_;
497 return sequence_number_;
524 publication_id_ = id;
531 return get_next_sn_i();
541 return sequence_number_;
545 DataWriterListener_ptr get_ext_listener();
549 void prepare_to_delete();
582 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC 626 , bound_(ts->serialized_size_bound(
encoding_))
627 , key_only_bound_(ts->key_only_serialized_size_bound(
encoding_))
667 return type_support_;
671 const char* method_name,
674 bool remove =
false);
688 void track_sequence_number(
GUIDSeq* filter_out);
702 create_control_message(
MessageId message_id,
711 void lookup_instance_handles(
const ReaderIdSeq& ids,
718 return this->domain_id_;
726 #ifdef OPENDDS_SECURITY 730 void association_complete_i(
const GUID_t& remote_id);
734 friend class ::DDS_TEST;
842 bool need_sequence_repair();
843 bool need_sequence_repair_i()
const;
863 InstanceValuesToHandles::iterator find_instance(
const Sample& sample);
865 #ifdef OPENDDS_SECURITY 885 virtual int handle_timeout(
const ACE_Time_Value& tv,
const void* arg);
ACE_Thread_Mutex sync_unreg_rem_assocs_lock_
RcHandle< FilterEvaluator > eval_
Defines the interface for Discovery callbacks into the DataWriter.
MonotonicTimePoint last_liveliness_activity_time_
Timestamp of last write/dispose/assert_liveliness.
Implements the OpenDDS::DCPS::Entity interfaces.
WeakRcHandle< PublisherImpl > publisher_servant_
The publisher servant which creates this datawriter.
sequence< InstanceHandle_t > InstanceHandleSeq
SerializedSizeBound buffer_size_bound() const
size_t n_chunks_
The number of chunks for the cached allocator.
ACE_Thread_Mutex sn_lock_
Mutex for sequence_number_.
const LogLevel::Value value
TopicDescriptionPtr< TopicImpl > topic_servant_
The topic servant.
CORBA::String_var topic_name_
The name of associated topic.
void set_marshal_skip_serialize(bool value)
::DDS::InstanceHandle_t register_instance_w_timestamp(in<%SCOPED%> instance, in ::DDS::Time_t timestamp)
Base class to hold configuration settings for TransportImpls.
SendStateDataSampleList get_resend_data()
const long DURATION_INFINITE_SEC
const GUID_t GUID_UNKNOWN
Nil value for GUID.
SendStateDataSampleList available_data_list_
DDS::Security::PermissionsHandle participant_permissions_handle_
#define OpenDDS_Dcps_Export
DDS::Duration_t max_wait_
Atomic< int > data_delivered_count_
unique_ptr< DataSampleHeaderAllocator > header_allocator_
The header data allocator.
CORBA::Long get_priority_value(const AssociationData &) const
::DDS::InstanceHandle_t lookup_instance(in<%SCOPED%> instance_data)
Dynamic_Cached_Allocator_With_Overflow< ACE_Thread_Mutex > DataAllocator
OPENDDS_STRING filter_class_name_
DataBlockLockPool::DataBlockLock * get_db_lock()
SequenceNumber get_max_sn() const
DDS::LivelinessLostStatus liveliness_lost_status_
Status conditions.
DDS::DataWriterListener_var listener_
Used to notify the entity for relevant events.
sequence< TransportLocator > TransportLocatorSeq
MonotonicTimePoint deadline() const
MonotonicTimePoint wait_pending_deadline_
ACE_Reactor_Timer_Interface * reactor_
ACE_Thread_Mutex listener_mutex_
Mutex to protect listener info.
SequenceNumber get_next_sn()
unique_ptr< Monitor > monitor_
Monitor object for this entity.
DDS::DataWriterQos qos_
The qos policy list of this datawriter.
RcHandle< DataWriterImpl > DataWriterImpl_rch
ACE_Guard< ACE_Thread_Mutex > lock_
void serialized_size(const Encoding &encoding, size_t &size, const SequenceNumber &)
bool liveliness_asserted_
const ValueDispatcher * get_value_dispatcher() const
MessageTracker controlTracker
DDS::OfferedDeadlineMissedStatus offered_deadline_missed_status_
RepoIdToReaderInfoMap reader_info_
SequenceNumber get_next_sn_i()
Implements the OpenDDS::DCPS::Publisher interfaces.
::DDS::ReturnCode_t dispose(in<%SCOPED%> instance_data, in ::DDS::InstanceHandle_t instance_handle)
MessageId
One byte message id (<256)
Security::SecurityConfig_rch security_config_
bool get_marshal_skip_serialize() const
Implements the DDS::Topic interface.
RcHandle< LivenessTimer > liveness_timer_
const Encoding & encoding() const
sequence< GUID_t > ReaderIdSeq
DOMAINID_TYPE_NATIVE DomainId_t
DDS::StringSeq expression_params_
Implements the OpenDDS::DCPS::DataWriterRemote interfaces and DDS::DataWriter interfaces.
DDS::DataWriterQos passed_qos_
WeakRcHandle< DomainParticipantImpl > participant_servant_
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
long ParticipantCryptoHandle
::DDS::ReturnCode_t write_w_timestamp(in<%SCOPED%> instance_data, in ::DDS::InstanceHandle_t handle, in ::DDS::Time_t source_timestamp)
GUID_t topic_id_
The associated topic repository id.
typedef OPENDDS_MAP_CMP(GUID_t, WriterCoherentSample, GUID_tKeyLessThan) GroupCoherentSamples
InstanceHandlesToValues instance_handles_to_values_
Atomic< int > data_dropped_count_
Statistics counter.
TypeSupportImpl * type_support_
DDS::DomainId_t domain_id_
The domain id.
ACE_UINT64 min_suspended_transaction_id_
The cached available data while suspending and associated transaction ids.
AckToken(const DDS::Duration_t &max_wait, const SequenceNumber &sequence)
size_t association_chunk_multiplier_
The multiplier for allocators affected by associations.
unique_ptr< DataAllocator > data_allocator_
AckCustomization(AckToken &at)
DDS::DomainId_t domain_id() const
unique_ptr< Monitor > periodic_monitor_
Periodic Monitor object for this entity.
MonotonicTimePoint tstamp_
unique_ptr< MessageBlockAllocator > mb_allocator_
The message block allocator.
SerializedSizeBound key_only_bound_
ACE_UINT32 coherent_samples_
HANDLE_TYPE_NATIVE InstanceHandle_t
Mix-in class for DDS entities which directly use the transport layer.
const unsigned long DURATION_INFINITE_NSEC
ACE_recursive_thread_mutex_t lock_
TransportPriorityQosPolicy transport_priority
EncodingMode(const TypeSupportImpl *ts, Encoding::Kind kind, bool swap_the_bytes)
unsigned long long ACE_UINT64
sequence< GUID_t > GUIDSeq
DDS::PublicationMatchedStatus publication_match_status_
virtual size_t serialized_size(const Encoding &enc) const =0
WeakRcHandle< DomainParticipantImpl > participant_
DataAllocator * data_allocator() const
Sequence number abstraction. Only allows positive 64 bit values.
bool deadline_is_infinite() const
TimeDuration liveliness_check_interval_
The time interval for sending liveliness message.
static SequenceNumber SEQUENCENUMBER_UNKNOWN()
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
ACE_UINT64 get_unsent_data(SendStateDataSampleList &list)
DDS::OfferedIncompatibleQosStatus offered_incompatible_qos_status_
DDS::StatusMask listener_mask_
DDS::DynamicType_var dynamic_type_
TypeSupportImpl * get_type_support() const
::DDS::ReturnCode_t unregister_instance_w_timestamp(in<%SCOPED%> instance, in ::DDS::InstanceHandle_t handle, in ::DDS::Time_t timestamp)
::DDS::ReturnCode_t write(in<%SCOPED%> instance_data, in ::DDS::InstanceHandle_t handle)
WeakRcHandle< DataWriterImpl > writer_
::DDS::ReturnCode_t get_key_value(inout<%SCOPED%> key_holder, in ::DDS::InstanceHandle_t handle)
size_t buffer_size(const Sample &sample) const
SequenceNumber expected_sequence_
ACE_Thread_Mutex reader_info_lock_
ACE_Recursive_Thread_Mutex & get_lock() const
ACE_UINT64 max_suspended_transaction_id_
ACE_Recursive_Thread_Mutex lock_
typedef OPENDDS_VECTOR(ActionConnectionRecord) ConnectionRecords
void check_and_set_repo_id(const GUID_t &id)
unique_ptr< DataBlockLockPool > db_lock_pool_
RepoIdToHandleMap id_to_handle_map_
InstanceValuesToHandles instance_values_to_handles_
CORBA::Long last_deadline_missed_total_count_
SequenceNumber sequence_number_
The sequence number unique in DataWriter scope.
LivenessTimer(DataWriterImpl &writer)
GUID_t publication_id_
The repository id of this datawriter/publication.
The Internal API and Implementation of OpenDDS.
A container for instances sample data.
Defines the interface that allows DataWriters (and lower levels) to inform discovery.
SerializedSizeBound bound_
RcHandle< WriteDataContainer > data_container_
The sample data container.
typedef OPENDDS_MAP(OPENDDS_STRING, OPENDDS_STRING) ValueMap
Helper types and functions for config file parsing.
CORBA::String_var type_name_
The type name of associated topic.
unique_ptr< DataBlockAllocator > db_allocator_
The data block allocator.
sequence< string > StringSeq
SendControlStatus
Return code type for send_control() operations.
ACE_TCHAR * timestamp(const ACE_Time_Value &time_value, ACE_TCHAR date_and_time[], size_t time_len, bool return_pointer_to_first_digit=false)
const Encoding & encoding_
::DDS::ReturnCode_t dispose_w_timestamp(in<%SCOPED%> instance_data, in ::DDS::InstanceHandle_t instance_handle, in ::DDS::Time_t source_timestamp)