27 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 34 #ifndef DDS_HAS_MINIMUM_BIT 38 #ifndef DDS_HAS_MINIMUM_BIT 39 # include <dds/DdsDcpsCoreTypeSupportC.h> 40 #endif // !defined (DDS_HAS_MINIMUM_BIT) 41 #include <dds/DdsDcpsCoreC.h> 42 #include <dds/DdsDcpsGuidTypeSupportImpl.h> 59 : data_dropped_count_(0)
60 , data_delivered_count_(0)
61 , controlTracker(
"DataWriterImpl")
65 , skip_serialize_(false)
75 , coherent_samples_(0)
76 , liveliness_lost_(false)
79 , last_deadline_missed_total_count_(0)
81 , min_suspended_transaction_id_(0)
82 , max_suspended_transaction_id_(0)
83 , liveliness_asserted_(false)
113 #ifndef OPENDDS_SAFETY_PROFILE 117 if (type_lookup_service) {
140 DDS::DataWriterListener_ptr a_listener,
152 #if !defined (DDS_HAS_MINIMUM_BIT) 154 #endif // !defined (DDS_HAS_MINIMUM_BIT) 187 return participant->assign_handle();
196 participant->return_handle(handle);
204 if (participant_servant) {
205 return participant_servant->get_builtin_subscriber_proxy();
228 ACE_TEXT(
" This is a deleted datawriter, ignoring add.\n")));
246 ACE_TEXT(
"(%P|%t) DataWriterImpl::add_association(): ")
247 ACE_TEXT(
"adding subscription to publication %C with priority %d.\n"),
272 ACE_TEXT(
"(%P|%t) DataWriterImpl::add_association: ")
273 ACE_TEXT(
"ERROR: transport layer failed to associate.\n")));
286 ACE_TEXT(
"(%P|%t) DataWriterImpl::transport_assoc_done: ")
287 ACE_TEXT(
"ERROR: transport layer failed to associate %C\n"),
298 ACE_TEXT(
"(%P|%t) DataWriterImpl::transport_assoc_done: ")
299 ACE_TEXT(
"writer %C succeeded in associating with reader %C\n"),
309 ACE_TEXT(
"(%P|%t) DataWriterImpl::transport_assoc_done: ")
310 ACE_TEXT(
"writer %C reader %C calling association_complete_i\n"),
321 ACE_TEXT(
"(%P|%t) DataWriterImpl::transport_assoc_done: ")
322 ACE_TEXT(
"ERROR: DataWriter (%C) should always be active in current implementation\n"),
333 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC 334 : participant_(participant)
335 , filter_class_name_(filterClassName)
337 , expression_params_(params)
342 if (part && *filter) {
343 eval_ = part->get_filter_eval(filter);
350 ACE_UNUSED_ARG(filterClassName);
351 ACE_UNUSED_ARG(filter);
352 ACE_UNUSED_ARG(params);
353 ACE_UNUSED_ARG(participant);
355 #endif // OPENDDS_NO_CONTENT_FILTERED_TOPIC 359 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC 362 if (participant && !filter_.empty()) {
363 participant->deref_filter_eval(filter_.c_str());
366 #endif // OPENDDS_NO_CONTENT_FILTERED_TOPIC 372 DBG_ENTRY_LVL(
"DataWriterImpl",
"association_complete_i", 6);
374 bool reader_durable =
false;
375 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC 385 ACE_TEXT(
"(%P|%t) DataWriterImpl::association_complete_i - ")
386 ACE_TEXT(
"bit %d local %C remote %C\n"),
394 ACE_TEXT(
"(%P|%t) ERROR: DataWriterImpl::association_complete_i: ")
395 ACE_TEXT(
"insert %C from pending failed.\n"),
401 RepoIdToReaderInfoMap::const_iterator it =
reader_info_.find(remote_id);
404 reader_durable = it->second.durable_;
405 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC 406 filterClassName = it->second.filter_class_name_;
407 eval = it->second.eval_;
408 expression_params = it->second.expression_params_;
434 ACE_TEXT(
"(%P|%t) WARNING: DataWriterImpl::association_complete_i: ")
435 ACE_TEXT(
"id_to_handle_map_%C = 0x%x failed.\n"),
442 ACE_TEXT(
"(%P|%t) DataWriterImpl::association_complete_i: ")
443 ACE_TEXT(
"id_to_handle_map_%C = 0x%x.\n"),
456 DDS::DataWriterListener_var listener =
475 if (reader_durable) {
479 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
480 , filterClassName, eval.
in(), expression_params
501 reader_info_.find(remote_id)->second.expected_sequence_;
504 list_el != list.
end(); ++list_el) {
505 list_el->get_header().historic_sample_ =
true;
507 if (list_el->get_header().sequence_ > seq) {
508 seq = list_el->get_header().sequence_;
514 if (!publisher || publisher->is_suspended()) {
544 ACE_TEXT(
"DataWriterImpl::association_complete_i: ")
545 ACE_TEXT(
"send_w_control failed.\n")));
556 if (readers.length() == 0) {
569 ACE_TEXT(
"(%P|%t) DataWriterImpl::remove_associations: ")
570 ACE_TEXT(
"bit %d local %C remote %C num remotes %d\n"),
602 if (
remove(
readers_, readers[i]) == 0) {
603 ++ fully_associated_len;
604 fully_associated_readers.length(fully_associated_len);
605 fully_associated_readers [fully_associated_len - 1] = readers[i];
609 rds [rds_len - 1] = readers[i];
620 if (fully_associated_len > 0 && !
is_bit_) {
624 for (
CORBA::ULong i = 0; i < fully_associated_len; ++i) {
634 int matchedSubscriptions =
647 handles[fully_associated_len - 1];
651 DDS::DataWriterListener_var listener =
674 if (notify_lost && handles.length() > 0) {
679 for (
unsigned int i = 0; i < handles.length(); ++i) {
680 participant->return_handle(handles[i]);
686 DBG_ENTRY_LVL(
"DataWriterImpl",
"replay_durable_data_for", 6);
688 bool reader_durable =
false;
689 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC 697 RepoIdToReaderInfoMap::const_iterator it =
reader_info_.find(remote_id);
700 reader_durable = it->second.durable_;
701 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC 702 filterClassName = it->second.filter_class_name_;
703 eval = it->second.eval_;
704 expression_params = it->second.expression_params_;
710 if (reader_durable) {
714 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
715 , filterClassName, eval.
in(), expression_params
736 reader_info_.find(remote_id)->second.expected_sequence_;
739 list_el != list.
end(); ++list_el) {
740 list_el->get_header().historic_sample_ =
true;
742 if (list_el->get_header().sequence_ > seq) {
743 seq = list_el->get_header().sequence_;
749 if (!publisher || publisher->is_suspended()) {
775 ACE_TEXT(
"DataWriterImpl::replay_durable_data_for: ")
776 ACE_TEXT(
"send_w_control failed.\n")));
785 DBG_ENTRY_LVL(
"DataWriterImpl",
"remove_all_associations", 6);
795 readers.length(size);
797 RepoIdSet::iterator itEnd =
readers_.end();
800 for (RepoIdSet::iterator it =
readers_.begin(); it != itEnd; ++it) {
814 ACE_TEXT(
"(%P|%t) WARNING: DataWriterImpl::remove_all_associations() - ")
815 ACE_TEXT(
"caught exception from remove_associations.\n")));
845 RepoIdToReaderInfoMap::const_iterator iter =
reader_info_.find(readerId);
856 DDS::DataWriterListener_var listener =
894 #ifdef OPENDDS_NO_CONTENT_FILTERED_TOPIC 895 ACE_UNUSED_ARG(readerId);
896 ACE_UNUSED_ARG(params);
900 RepoIdToReaderInfoMap::iterator iter =
reader_info_.find(readerId);
903 iter->second.expression_params_ = params;
908 ACE_TEXT(
"(%P|%t) WARNING: DataWriterImpl::update_subscription_params()")
909 ACE_TEXT(
" - writer: %C has no info about reader: %C\n"),
941 publisher->get_qos(publisherQos);
951 ACE_TEXT(
"(%P|%t) DataWriterImpl::set_qos, ")
956 if (!(
qos_ == new_qos)) {
991 listener_ = DDS::DataWriterListener::_duplicate(a_listener);
995 DDS::DataWriterListener_ptr
999 return DDS::DataWriterListener::_duplicate(
listener_.in());
1002 DataWriterListener_ptr
1006 return DataWriterListener::_narrow(
listener_.in());
1030 ACE_TEXT(
"(%P|%t) DataWriterImpl::create_ack_token() - ")
1054 ACE_TEXT(
"DataWriterImpl::send_request_ack: ")
1055 ACE_TEXT(
"obtain_buffer_for_control returned %d.\n"),
1077 ACE_TEXT(
"DataWriterImpl::send_request_ack: ")
1078 ACE_TEXT(
"enqueue_control failed.\n")),
1102 ACE_TEXT(
" waiting for acknowledgment of sequence %q at %T\n"),
1200 return participant->assert_liveliness();
1254 ACE_TEXT(
"DataWriterImpl::get_matched_subscriptions: ")
1255 ACE_TEXT(
" Entity is not enabled.\n")),
1266 subscription_handles.length(
1269 for (RepoIdToHandleMap::iterator
1272 ++current, ++index) {
1273 subscription_handles[index] = current->second;
1279 #if !defined (DDS_HAS_MINIMUM_BIT) 1287 ACE_TEXT(
"(%P|%t) ERROR: DataWriterImpl::")
1288 ACE_TEXT(
"get_matched_subscription_data: ")
1289 ACE_TEXT(
"Entity is not enabled.\n")),
1295 DDS::SubscriptionBuiltinTopicDataSeq data;
1298 ret = instance_handle_to_bit_data<DDS::SubscriptionBuiltinTopicDataDataReader_var>(
1301 subscription_handle,
1306 subscription_data = data[0];
1311 #endif // !defined (DDS_HAS_MINIMUM_BIT) 1327 if (!publisher || !publisher->is_enabled()) {
1337 dp_id_ = participant->get_id();
1352 CORBA::Long max_instances = 0, max_total_samples = 0;
1375 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
1387 max_samples_per_instance,
1389 max_durable_per_instance,
1395 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
1414 "(%P|%t) DataWriterImpl::enable-mb" 1415 " Cached_Allocator_With_Overflow %x with %B chunks\n",
1420 "(%P|%t) DataWriterImpl::enable-db" 1421 " Cached_Allocator_With_Overflow %x with %B chunks\n",
1426 "(%P|%t) DataWriterImpl::enable-header" 1427 " Cached_Allocator_With_Overflow %x with %B chunks\n",
1444 ACE_TEXT(
"(%P|%t) ERROR: DataWriterImpl::enable: %p.\n"),
1454 participant->add_adjust_liveliness_timers(
this);
1459 disco->pre_writer(
this);
1469 ACE_TEXT(
"(%P|%t) ERROR: DataWriterImpl::enable, ")
1470 ACE_TEXT(
"Transport Exception.\n")));
1486 return setup_serialization_result;
1491 publisher->get_qos(pub_qos);
1499 const GUID_t publication_id =
1515 "add_publication failed\n"));
1521 #if defined(OPENDDS_SECURITY) 1529 "got GUID %C, publishing to topic name \"%C\" type \"%C\"\n",
1545 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE 1549 if (durability_cache != 0) {
1551 if (!durability_cache->get_data(this->domain_id_,
1559 ACE_TEXT(
"(%P|%t) ERROR: DataWriterImpl::enable: ")
1560 ACE_TEXT(
"unable to retrieve durable data\n")));
1573 return writer_enabled_result;
1579 DBG_ENTRY_LVL(
"DataWriterImpl",
"send_all_to_flush_control",6);
1590 this->
send(list, transaction_id);
1603 ACE_TEXT(
"DataWriterImpl::register_instance_i: ")
1604 ACE_TEXT(
"Entity is not enabled.\n")),
1611 ACE_TEXT(
"(%P|%t) ERROR: DataWriterImpl::register_instance_i: ")
1612 ACE_TEXT(
"register instance with container failed, returned <%C>.\n"),
1626 ACE_TEXT(
"DataWriterImpl::register_instance_i: ")
1627 ACE_TEXT(
"obtain_buffer_for_control failed, returned <%C>.\n"),
1648 ACE_TEXT(
"DataWriterImpl::register_instance_i: ")
1649 ACE_TEXT(
"enqueue_control failed, returned <%C>\n"),
1663 DBG_ENTRY_LVL(
"DataWriterImpl",
"register_instance_from_durable_data",6);
1673 ACE_TEXT(
"(%P|%t) ERROR: DataWriterImpl::register_instance_from_durable_data: ")
1674 ACE_TEXT(
"register instance with container failed, returned <%C>.\n"),
1692 ACE_TEXT(
"(%P|%t) ERROR: DataWriterImpl::unregister_instance_i: ")
1693 ACE_TEXT(
"Entity is not enabled.\n")),
1706 ret = this->
data_container_->unregister(handle, unregistered_sample_data);
1711 ACE_TEXT(
"DataWriterImpl::unregister_instance_i: ")
1712 ACE_TEXT(
"unregister with container failed.\n")),
1722 ACE_TEXT(
"DataWriterImpl::unregister_instance_i: ")
1723 ACE_TEXT(
"obtain_buffer_for_control returned %d.\n"),
1730 move(unregistered_sample_data),
1740 ACE_TEXT(
"DataWriterImpl::unregister_instance_i: ")
1741 ACE_TEXT(
"enqueue_control failed.\n")),
1753 DBG_ENTRY_LVL(
"DataWriterImpl",
"dispose_and_unregister", 6);
1764 ACE_TEXT(
"DataWriterImpl::dispose_and_unregister: ")
1765 ACE_TEXT(
"dispose on container failed.\n")),
1774 ACE_TEXT(
"DataWriterImpl::dispose_and_unregister: ")
1775 ACE_TEXT(
"unregister with container failed.\n")),
1785 ACE_TEXT(
"DataWriterImpl::dispose_and_unregister: ")
1786 ACE_TEXT(
"obtain_buffer_for_control returned %d.\n"),
1803 ACE_TEXT(
"DataWriterImpl::dispose_and_unregister: ")
1804 ACE_TEXT(
"enqueue_control failed.\n")),
1827 const void* real_data)
1834 GUIDSeq_var filter_out_var(filter_out);
1838 ACE_TEXT(
"(%P|%t) ERROR: DataWriterImpl::write: ")
1839 ACE_TEXT(
"Entity is not enabled.\n")),
1857 ACE_TEXT(
"DataWriterImpl::write: ")
1858 ACE_TEXT(
"obtain_buffer returned %d.\n"),
1885 ACE_TEXT(
"DataWriterImpl::write: ")
1901 if (!publisher || publisher->is_suspended()) {
1916 this->
send(list, transaction_id);
1921 if (observer && real_data && vd) {
1935 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC 1940 const GUID_t* buf = filter_out->get_buffer();
1941 excluded.insert(buf, buf + filter_out->length());
1944 for (RepoIdToReaderInfoMap::iterator iter =
reader_info_.begin(),
1947 if (excluded.count(iter->first) == 0) {
1948 iter->second.expected_sequence_ = sn;
1953 ACE_UNUSED_ARG(filter_out);
1954 for (RepoIdToReaderInfoMap::iterator iter =
reader_info_.begin(),
1956 iter->second.expected_sequence_ = sn;
1959 #endif // OPENDDS_NO_CONTENT_FILTERED_TOPIC 1990 ACE_TEXT(
"(%P|%t) ERROR: DataWriterImpl::dispose: ")
1991 ACE_TEXT(
"Entity is not enabled.\n")),
2005 ACE_TEXT(
"DataWriterImpl::dispose: ")
2016 ACE_TEXT(
"DataWriterImpl::dispose: ")
2017 ACE_TEXT(
"obtain_buffer_for_control returned %d.\n"),
2024 move(registered_sample_data),
2034 ACE_TEXT(
"DataWriterImpl::dispose: ")
2035 ACE_TEXT(
"enqueue_control failed.\n")),
2114 static_cast<ACE_Message_Block*>(
2130 *message << header_data;
2136 RepoIdToReaderInfoMap::iterator reader;
2139 reader->second.expected_sequence_ = sequence;
2144 ACE_TEXT(
"(%P|%t) DataWriterImpl::create_control_message: ")
2145 ACE_TEXT(
"from publication %C sending control sample: %C .\n"),
2158 bool content_filter)
2163 if (0 == instance) {
2165 ACE_TEXT(
"(%P|%t) DataWriterImpl::create_sample_data_message ")
2166 ACE_TEXT(
"failed to find instance for handle %d\n"),
2182 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 2184 publisher->qos_.presentation.access_scope
2210 static_cast<ACE_Message_Block*>(
2224 message.
reset(tmp_message);
2225 *message << header_data;
2228 ACE_TEXT(
"(%P|%t) DataWriterImpl::create_sample_data_message: ")
2229 ACE_TEXT(
"from publication %C sending data sample: %C .\n"),
2243 ACE_TEXT(
"(%P|%t) ERROR: DataWriterImpl::data_delivered: ")
2244 ACE_TEXT(
"The publication id %C from delivered element ")
2245 ACE_TEXT(
"does not match the datawriter's id %C\n"),
2269 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC 2278 ACE_ERROR((
LM_ERROR,
"(%P|%t) ERROR: DataWriterImpl::filter_out: Could not cast type support, not filtering\n"));
2283 if (filterClassName ==
"DDSSQL" ||
2284 filterClassName ==
"OPENDDSSQL") {
2291 }
catch (
const std::runtime_error&) {
2310 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 2369 serializer << end_msg;
2383 ACE_TEXT(
"(%P|%t) ERROR: DataWriterImpl::end_coherent_changes:")
2384 ACE_TEXT(
" unable to send END_COHERENT_CHANGES control message!\n")));
2388 #endif // OPENDDS_NO_OBJECT_MODEL_PROFILE 2392 bool dropped_by_transport)
2410 DDS::DataWriterListener_ptr
2423 return publisher->listener_for(kind);
2426 return DDS::DataWriterListener::_duplicate(
listener_.in());
2437 bool liveliness_lost =
false;
2448 liveliness_lost =
true;
2455 liveliness_lost =
true;
2469 ACE_TEXT(
"(%P|%t) ERROR: DataWriterImpl::handle_timeout: %p.\n"),
2477 ACE_TEXT(
"(%P|%t) ERROR: DataWriterImpl::handle_timeout: %p.\n"),
2488 liveliness_lost =
true;
2495 DDS::DataWriterListener_var listener =
2526 ACE_TEXT(
"(%P|%t) ERROR: DataWriterImpl::send_liveliness: ")
2527 ACE_TEXT(
"send_control failed.\n")),
2547 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE 2553 ACE_TEXT(
"failed to make data durable.\n")));
2575 DBG_ENTRY_LVL(
"DataWriterImpl",
"notify_publication_disconnected",6);
2589 the_listener->on_publication_disconnected(
this, status);
2597 DBG_ENTRY_LVL(
"DataWriterImpl",
"notify_publication_reconnected",6);
2611 the_listener->on_publication_reconnected(
this, status);
2619 DBG_ENTRY_LVL(
"DataWriterImpl",
"notify_publication_lost",6);
2634 the_listener->on_publication_lost(
this, status);
2642 DBG_ENTRY_LVL(
"DataWriterImpl",
"notify_publication_lost",6);
2660 the_listener->on_publication_lost(
this, status);
2686 ACE_TEXT(
"(%P|%t) DataWriterImpl::lookup_instance_handles: ")
2687 ACE_TEXT(
"searching for handles for reader Ids: %C.\n"),
2691 hdls.length(num_rds);
2694 hdls[i] = participant->lookup_handle(ids[i]);
2698 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE 2732 publisher->get_qos(qos_data.
pub_qos);
2738 #if defined(OPENDDS_SECURITY) 2756 for (RepoIdToReaderInfoMap::const_iterator it =
reader_info_.begin(),
2798 writer->handle_timeout(tv, arg);
2800 this->reactor()->cancel_timer(
this);
2817 disco->update_publication_locators(domain_id,
2819 publication_id_copy,
2841 "Encountered unsupported combination of XCDR1 encoding and mutable extensibility " 2842 "for writer of type %C\n",
2849 "Unaligned CDR is not supported by transport types that require encapsulation\n"));
2855 "Encountered unsupported or unknown data representation: %C ",
2856 "for writer of type %C\n",
2867 "Could not find a valid data representation\n"));
2874 "Setup successfully with %C data representation.\n",
2880 if (buffer_size_bound) {
2881 const size_t chunk_size = buffer_size_bound.
get();
2885 "using data allocator at %x with %B %B byte chunks\n",
2892 "sample size is unbounded, not using data allocator, " 2893 "always allocating from heap\n"));
2912 const InstanceValuesToHandles::iterator it =
find_instance(sample);
2926 return registered_handle;
2935 "unregister_instance_w_timestamp", sample, instance_handle,
true);
2947 #if defined(OPENDDS_SECURITY) && OPENDDS_HAS_DYNAMIC_DATA_ADAPTER 2955 "(%P|%t) NOTICE: DataWriterImpl::dispose_w_timestamp: unable to dispose instance SecurityException[%d.%d]: %C\n",
2963 "dispose_w_timestamp", sample, instance_handle);
2967 return dispose(instance_handle, source_timestamp);
2991 static_cast<ACE_Message_Block*>(
3013 "to_message_block failed\n"));
3025 if (!(serializer << encap)) {
3028 "failed to serialize data encapsulation header\n"));
3036 "failed to serialize sample data\n"));
3043 "set_encapsulation_options failed\n"));
3056 InstanceHandlesToValues::value_type(handle, sample)).second) {
3060 InstanceValuesToHandles::value_type(sample, handle)).second) {
3067 DataWriterImpl::InstanceValuesToHandles::iterator
3086 #if defined(OPENDDS_SECURITY) && OPENDDS_HAS_DYNAMIC_DATA_ADAPTER 3087 DDS::DynamicData_var dynamic_data = copy->get_dynamic_data(
dynamic_type_);
3094 "(%P|%t) NOTICE: DataWriterImpl::get_or_create_instance_handle: unable to register instance SecurityException[%d.%d]: %C\n",
3106 ACE_ERROR((
LM_NOTICE,
"(%P|%t) NOTICE: %CDataWriterImpl::get_or_create_instance_handle: " 3107 "failed to serialize sample\n", ts->
name()));
3124 ACE_ERROR((
LM_NOTICE,
"(%P|%t) NOTICE: %CDataWriterImpl::get_or_create_instance_handle: " 3125 "insert instance failed\n", ts->
name()));
3137 const char*
const method_name,
3146 const InstanceValuesToHandles::iterator pos =
find_instance(sample);
3150 "The instance sample is not registered\n",
3156 if (instance_handle !=
DDS::HANDLE_NIL && instance_handle != pos->second) {
3160 instance_handle = pos->second;
3185 "register failed: %C\n",
3192 handle = registered_handle;
3197 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC 3200 for (RepoIdToReaderInfoMap::iterator iter =
reader_info_.begin(),
3203 if (!ri.
eval_.is_nil()) {
3204 if (!filter_out.ptr()) {
3208 push_back(filter_out.inout(), iter->first);
3215 return write_sample(sample, handle, source_timestamp, filter_out._retn());
3228 "failed to serialize sample\n"));
void set_sample(Message_Block_Ptr sample)
void set_status_changed_flag(DDS::StatusKind status, bool status_changed_flag)
void control_dropped(const Message_Block_Ptr &sample, bool dropped_by_transport)
ACE_Thread_Mutex sync_unreg_rem_assocs_lock_
void send(SendStateDataSampleList send_list, ACE_UINT64 transaction_id=0)
RcHandle< PublicationInstance > PublicationInstance_rch
DDS::ReturnCode_t get_key_value(Sample_rch &sample, DDS::InstanceHandle_t handle)
RcHandle< FilterEvaluator > eval_
MonotonicTimePoint last_liveliness_activity_time_
Timestamp of last write/dispose/assert_liveliness.
WeakRcHandle< PublisherImpl > publisher_servant_
The publisher servant which creates this datawriter.
DDS::ReturnCode_t write_sample(const Sample &sample, DDS::InstanceHandle_t handle, const DDS::Time_t &source_timestamp, GUIDSeq *filter_out)
RcHandle< T > rchandle_from(T *pointer)
sequence< InstanceHandle_t > InstanceHandleSeq
ReaderInfo(const char *filter_class_name, const char *filter, const DDS::StringSeq ¶ms, WeakRcHandle< DomainParticipantImpl > participant, bool durable)
SerializedSizeBound buffer_size_bound() const
size_t n_chunks_
The number of chunks for the cached allocator.
const DDS::StatusMask NO_STATUS_MASK
#define ACE_GUARD(MUTEX, OBJ, LOCK)
virtual Extensibility max_extensibility() const =0
GroupCoherentSamples group_coherent_samples_
OPENDDS_STRING topic_name
const StatusKind LIVELINESS_LOST_STATUS
static String kind_to_string(Kind value)
ACE_Thread_Mutex sn_lock_
Mutex for sequence_number_.
DDS::Time_t to_dds_time() const
virtual DDS::ReturnCode_t set_listener(DDS::DataWriterListener_ptr a_listener, DDS::StatusMask mask)
virtual void remove_associations(const ReaderIdSeq &readers, bool callback)
const LogLevel::Value value
DataWriterListener_ptr get_ext_listener()
TopicDescriptionPtr< TopicImpl > topic_servant_
The topic servant.
DDS::ReturnCode_t set_enabled()
const DataSampleHeader & get_header() const
CORBA::String_var topic_name_
The name of associated topic.
const ReturnCode_t RETCODE_PRECONDITION_NOT_MET
static const ACE_Time_Value max_time
const InstanceHandle_t HANDLE_NIL
char message_id_
The enum MessageId.
boolean autodispose_unregistered_instances
Observer_rch get_observer(Observer::Event e)
Base class to hold configuration settings for TransportImpls.
void enable_transport(bool reliable, bool durable)
const ReturnCode_t RETCODE_INCONSISTENT_POLICY
virtual void unregister_for_reader(const GUID_t &participant, const GUID_t &writerid, const GUID_t &readerid)
virtual void retrieve_inline_qos_data(TransportSendListener::InlineQosData &qos_data) const
ReliabilityQosPolicy reliability
::DDS::DataReaderQos readerQos
long max_samples_per_instance
virtual DDS::ReturnCode_t wait_for_acknowledgments(const DDS::Duration_t &max_wait)
iterator end()
Return iterator to end of list.
DDS::DataWriterListener_ptr listener_for(DDS::StatusKind kind)
SendStateDataSampleList get_resend_data()
virtual void on_deleted(DDS::DataWriter_ptr)
SendControlStatus send_w_control(SendStateDataSampleList send_list, const DataSampleHeader &header, Message_Block_Ptr msg, const GUID_t &destination)
Duration_t max_blocking_time
DDS::QosPolicyId_t last_policy_id
void set_filter_out(GUIDSeq *filter_out)
const TransportLocatorSeq & connection_info() const
const long DURATION_INFINITE_SEC
DurabilityQosPolicy durability
#define OPENDDS_NO_DURABILITY_SERVICE_COMPATIBILITY_CHECK(qos, error_rtn_value)
TransportLocatorSeq remote_data_
bool key_fields_only_
Only the key fields of the data sample are present in the payload.
void reset()
Reset to initial state.
const GUID_t GUID_UNKNOWN
Nil value for GUID.
HistoryQosPolicyKind kind
int bind(Container &c, const FirstType &first, const SecondType &second)
bool should_ack() const
Does this writer have samples to be acknowledged?
SendStateDataSampleList available_data_list_
DDS::Security::PermissionsHandle participant_permissions_handle_
DDS::ReturnCode_t unregister_instance_i(DDS::InstanceHandle_t handle, const DDS::Time_t &source_timestamp)
bool send_liveliness(const MonotonicTimePoint &now)
Send the liveliness message.
void notify_publication_disconnected(const ReaderIdSeq &subids)
virtual void on_disassociated(DDS::DataWriter_ptr, const GUID_t &)
Atomic< int > data_delivered_count_
DCPS::String repr_to_string(const DDS::DataRepresentationId_t &repr)
unique_ptr< DataSampleHeaderAllocator > header_allocator_
The header data allocator.
virtual void update_subscription_params(const GUID_t &readerId, const DDS::StringSeq ¶ms)
DDS::InstanceHandle_t lookup_instance(const Sample &sample)
static bool changeable(const DDS::UserDataQosPolicy &qos1, const DDS::UserDataQosPolicy &qos2)
ACE_UINT32 source_timestamp_nanosec_
GUID_t get_pub_id() const
void send_all_to_flush_control(ACE_Guard< ACE_Recursive_Thread_Mutex > &guard)
Dynamic_Cached_Allocator_With_Overflow< ACE_Thread_Mutex > DataAllocator
MonotonicTime_t participant_discovered_at_
void set_deleted(bool state)
DDS::ReturnCode_t dispose(DDS::InstanceHandle_t handle, const DDS::Time_t &source_timestamp)
DDS::ReturnCode_t create_sample_data_message(Message_Block_Ptr data, DDS::InstanceHandle_t instance_handle, DataSampleHeader &header_data, Message_Block_Ptr &message, const DDS::Time_t &source_timestamp, bool content_filter)
const char * c_str() const
void notify_publication_reconnected(const ReaderIdSeq &subids)
DataBlockLockPool::DataBlockLock * get_db_lock()
Underlying data cache for both OpenDDS TRANSIENT and PERSISTENT DURABILITY implementations..
virtual DDS::ReturnCode_t get_offered_incompatible_qos_status(DDS::OfferedIncompatibleQosStatus &status)
WriterCoherentSample coherent_samples_
void data_delivered(const DataSampleElement *sample)
#define OPENDDS_ASSERT(C)
const StatusKind OFFERED_INCOMPATIBLE_QOS_STATUS
void wait_pending()
Wait for pending data and control messages to drain.
SequenceNumber get_max_sn() const
TransportLocator discovery_locator_
DDS::LivelinessLostStatus liveliness_lost_status_
Status conditions.
ACE_CDR::ULong remote_transport_context_
Holds and distributes locks to be used for locking ACE_Data_Blocks. Currently it does not require ret...
DeadlineQosPolicy deadline
bool participant_liveliness_activity_after(const MonotonicTimePoint &tv)
bool filter_out(const DataSampleElement &elt, const OPENDDS_STRING &filterClassName, const FilterEvaluator &evaluator, const DDS::StringSeq &expression_params) const
#define OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(qos, error_rtn_value)
void populate_connection_info()
const ACE_Time_Value & value() const
virtual DDS::ReturnCode_t get_offered_deadline_missed_status(DDS::OfferedDeadlineMissedStatus &status)
DDS::DataWriterListener_var listener_
Used to notify the entity for relevant events.
void association_complete_i(const GUID_t &remote_id)
reference_wrapper< T > ref(T &r)
sequence< TransportLocator > TransportLocatorSeq
MonotonicTimePoint deadline() const
const StatusKind PUBLICATION_MATCHED_STATUS
bool cdr_encapsulation() const
bool topicIsBIT(const char *name, const char *type)
T::rv_reference move(T &p)
MonotonicTimePoint wait_pending_deadline_
DataRepresentationQosPolicy representation
ACE_Reactor_Timer_Interface * reactor_
Cached_Allocator_With_Overflow< DataSampleHeader, ACE_Null_Mutex > DataSampleHeaderAllocator
void unregister_instances(const DDS::Time_t &source_timestamp)
#define ACE_CDR_BYTE_ORDER
ACE_Thread_Mutex listener_mutex_
Mutex to protect listener info.
DDS::PublisherQos pub_qos
ACE_Message_Block * serialize_sample(const Sample &sample)
void disassociate(const GUID_t &peerId)
unique_ptr< Monitor > monitor_
Monitor object for this entity.
DDS::DataWriterQos qos_
The qos policy list of this datawriter.
OpenDDS::DCPS::TypeSupport_ptr get_type_support()
void serialized_size(const Encoding &encoding, size_t &size, const SequenceNumber &)
bool liveliness_asserted_
const ValueDispatcher * get_value_dispatcher() const
MessageTracker controlTracker
DDS::OfferedDeadlineMissedStatus offered_deadline_missed_status_
DDS::InstanceHandle_t get_next_handle()
DDS::InstanceStateKind instance_state() const
void data_dropped(const DataSampleElement *element, bool dropped_by_transport)
const DDS::StatusMask DEFAULT_STATUS_MASK
bool valid_data() const
Returns true if the sample has a complete serialized payload.
AckToken create_ack_token(DDS::Duration_t max_wait) const
Create an AckToken for ack operations.
void terminate_send_if_suspended()
RepoIdToReaderInfoMap reader_info_
SequenceNumber get_next_sn_i()
bool need_sequence_repair()
Implements the OpenDDS::DCPS::Publisher interfaces.
void set_writer_effective_data_rep_qos(DDS::DataRepresentationIdSeq &qos, bool cdr_encapsulated)
class OpenDDS::DCPS::DataWriterImpl::EncodingMode encoding_mode_
const ReturnCode_t RETCODE_IMMUTABLE_POLICY
#define OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(qos, error_rtn_value)
MessageId
One byte message id (<256)
friend class WriteDataContainer
bool insert_instance(DDS::InstanceHandle_t handle, Sample_rch &sample)
static TimePoint_T< SystemClock > now()
Security::SecurityConfig_rch security_config_
Implements the DDS::Topic interface.
virtual void register_for_reader(const GUID_t &participant, const GUID_t &writerid, const GUID_t &readerid, const TransportLocatorSeq &locators, DiscoveryListener *listener)
virtual DDS::InstanceHandle_t get_instance_handle()
DDS::QosPolicyCountSeq policies
RcHandle< LivenessTimer > liveness_timer_
const Encoding & encoding() const
sequence< GUID_t > ReaderIdSeq
InstanceHandle_t last_instance_handle
static bool set_encapsulation_options(Message_Block_Ptr &mb)
DurabilityServiceQosPolicy durability_service
DDS::ReturnCode_t instance_must_exist(const char *method_name, const Sample &sample, DDS::InstanceHandle_t &instance_handle, bool remove=false)
::DDS::StringSeq exprParams
virtual DDS::ReturnCode_t get_qos(DDS::DataWriterQos &qos)
virtual int handle_timeout(const ACE_Time_Value &tv, const void *arg)
Handle the assert liveliness timeout.
void enqueue_tail(const DataSampleElement *element)
DDS::StringSeq expression_params_
#define ACE_NEW_MALLOC_RETURN(POINTER, ALLOCATOR, CONSTRUCTOR, RET_VAL)
virtual const char * name() const =0
bool coherent_changes_pending()
Are coherent changes pending?
void begin_coherent_changes()
Starts a coherent change set; should only be called once.
DDS::ReturnCode_t wait_for_specific_ack(const AckToken &token)
Duration_t lease_duration
DataSample * get_sample() const
virtual DDS::DynamicData_var get_dynamic_data(DDS::DynamicType_ptr type) const =0
virtual DDS::Topic_ptr get_topic()
InstanceValuesToHandles::iterator find_instance(const Sample &sample)
void get_readers(RepoIdSet &readers)
void end_coherent_changes(const GroupCoherentSamples &group_samples)
Ends a coherent change set; should only be called once.
ReliabilityQosPolicyKind kind
const StatusKind OFFERED_DEADLINE_MISSED_STATUS
DurabilityQosPolicyKind kind
DDS::DataWriterQos passed_qos_
Class to serialize and deserialize data for DDS.
DurabilityQosPolicy durability
PublicationInstance_rch get_handle_instance(DDS::InstanceHandle_t handle)
virtual bool to_message_block(ACE_Message_Block &mb) const =0
static size_t get_max_serialized_size()
Similar to IDL compiler generated methods.
WeakRcHandle< DomainParticipantImpl > participant_servant_
virtual DDS::ReturnCode_t assert_liveliness()
void unregister_for_reader(const GUID_t &participant, const GUID_t &writerid, const GUID_t &readerid)
void add_types(const XTypes::TypeLookupService_rch &tls) const
bool from_encoding(const Encoding &encoding, Extensibility extensibility)
virtual DDS::ReturnCode_t set_qos(const DDS::DataWriterQos &qos)
#define OPENDDS_NO_OWNERSHIP_STRENGTH_COMPATIBILITY_CHECK(qos, error_rtn_value)
virtual ~DataWriterImpl()
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
const ReturnCode_t RETCODE_TIMEOUT
virtual DDS::DynamicType_ptr get_type() const
long ParticipantCryptoHandle
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
QosPolicyCountSeq policies
virtual void update_locators(const GUID_t &remote, const TransportLocatorSeq &locators)
void update_locators(const GUID_t &remote, const TransportLocatorSeq &locators)
SequenceNumber last_sample_
void return_handle(DDS::InstanceHandle_t handle)
DDS::ReturnCode_t dispose_w_timestamp(const Sample &sample, DDS::InstanceHandle_t instance_handle, const DDS::Time_t &source_timestamp)
virtual WeakRcHandle< ICE::Endpoint > get_ice_endpoint()
GUID_t topic_id_
The associated topic repository id.
virtual void replay_durable_data_for(const GUID_t &remote_sub_id)
InstanceHandlesToValues instance_handles_to_values_
MonotonicTime_t participantDiscoveredAt
void lookup_instance_handles(const ReaderIdSeq &ids, DDS::InstanceHandleSeq &hdls)
Lookup the instance handles by the subscription repo ids.
Atomic< int > data_dropped_count_
Statistics counter.
ACE_Message_Block * cont(void) const
TypeSupportImpl * type_support_
DDS::DomainId_t domain_id_
The domain id.
ResourceLimitsQosPolicy resource_limits
iterator begin()
Return iterator to beginning of list.
ACE_UINT64 min_suspended_transaction_id_
The cached available data while suspending and associated transaction ids.
DDS::InstanceHandle_t register_instance_w_timestamp(const Sample &sample, const DDS::Time_t ×tamp)
size_t association_chunk_multiplier_
The multiplier for allocators affected by associations.
bool need_sequence_repair_i() const
virtual Sample_rch copy(Mutability mutability, Extent extent) const =0
unique_ptr< DataAllocator > data_allocator_
virtual int cancel_timer(long timer_id, const void **arg=0, int dont_call_handle_close=1)=0
size_t total_length(void) const
DDS::ReturnCode_t setup_serialization()
virtual RcHandle< EntityImpl > parent() const
ACE_UINT32 message_length_
End Coherent Change message.
void get_instance_handles(InstanceHandleVec &instance_handles)
void send_suspended_data()
DDS::DomainId_t domain_id() const
unique_ptr< Monitor > periodic_monitor_
Periodic Monitor object for this entity.
const DDS::DataRepresentationId_t UNALIGNED_CDR_DATA_REPRESENTATION
unique_ptr< MessageBlockAllocator > mb_allocator_
The message block allocator.
DDS::ReturnCode_t num_samples(DDS::InstanceHandle_t handle, size_t &size)
DDS::DataWriterQos dw_qos
void remove_all_associations()
DDS::ReturnCode_t write(Message_Block_Ptr sample, DDS::InstanceHandle_t handle, const DDS::Time_t &source_timestamp, GUIDSeq *filter_out, const void *real_data)
ACE_UINT32 coherent_samples_
DDS::InstanceHandle_t get_entity_instance_handle(const GUID_t &id, const RcHandle< DomainParticipantImpl > &participant)
HANDLE_TYPE_NATIVE InstanceHandle_t
char const * get_type_name() const
ACE_INT32 source_timestamp_sec_
long count_since_last_send
void register_for_reader(const GUID_t &participant, const GUID_t &writerid, const GUID_t &readerid, const TransportLocatorSeq &locators, OpenDDS::DCPS::DiscoveryListener *listener)
virtual bool eval(FilterEvaluator &evaluator, const DDS::StringSeq ¶ms) const =0
WeakRcHandle< ICE::Endpoint > get_ice_endpoint()
const unsigned long DURATION_INFINITE_NSEC
virtual void on_enabled(DDS::DataWriter_ptr)
const char *const BUILT_IN_SUBSCRIPTION_TOPIC
SendControlStatus send_control(const DataSampleHeader &header, Message_Block_Ptr msg)
bool associate(const AssociationData &peer, bool active)
TransportPriorityQosPolicy transport_priority
const ReturnCode_t RETCODE_NOT_ENABLED
size_t get_max_serialized_size()
Similar to IDL compiler generated methods.
unsigned long long ACE_UINT64
sequence< GUID_t > GUIDSeq
Cached_Allocator_With_Overflow< ACE_Message_Block, ACE_Thread_Mutex > MessageBlockAllocator
DDS::PublicationMatchedStatus publication_match_status_
AtomicBool enabled_
The flag indicates the entity is enabled.
long current_count_change
void push_back(Seq &seq, const typename Seq::value_type &val)
std::vector-style push_back() for CORBA Sequences
void notify_publication_lost(const ReaderIdSeq &subids)
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
virtual DDS::DataWriterListener_ptr get_listener()
ReliabilityQosPolicy reliability
virtual bool check_transport_qos(const TransportInst &inst)
OpenDDS_Dcps_Export LogLevel log_level
DDS::ReturnCode_t assert_liveliness_by_participant()
virtual void add_association(const GUID_t &yourId, const ReaderAssociation &reader, bool active)
bool eval(const T &sample, const DDS::StringSeq ¶ms) const
bool repr_to_encoding_kind(DDS::DataRepresentationId_t repr, Encoding::Kind &kind)
virtual DDS::Publisher_ptr get_publisher()
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
DDS::ReturnCode_t dispose_and_unregister(DDS::InstanceHandle_t handle, const DDS::Time_t ×tamp)
const SequenceNumber_t SEQUENCENUMBER_UNKNOWN
virtual DDS::ReturnCode_t enable()
SendStateDataSampleList STL-style iterator implementation.
DDS::ReturnCode_t write_w_timestamp(const Sample &sample, DDS::InstanceHandle_t handle, const DDS::Time_t &source_timestamp)
Sequence number abstraction. Only allows positive 64 bit values.
void to_type_info(XTypes::TypeInformation &type_info) const
bool deadline_is_infinite() const
TransportLocatorSeq readerTransInfo
void init(TopicImpl *topic_servant, const DDS::DataWriterQos &qos, DDS::DataWriterListener_ptr a_listener, const DDS::StatusMask &mask, WeakRcHandle< DomainParticipantImpl > participant_servant, PublisherImpl *publisher_servant)
const char * retcode_to_string(DDS::ReturnCode_t value)
static const ACE_Time_Value zero
void transport_discovery_change()
void control_delivered(const Message_Block_Ptr &sample)
TimeDuration liveliness_check_interval_
The time interval for sending liveliness message.
virtual DDS::ReturnCode_t get_liveliness_lost_status(DDS::LivelinessLostStatus &status)
static SequenceNumber SEQUENCENUMBER_UNKNOWN()
DDS::ReturnCode_t copy(DDS::DynamicData_ptr dest, DDS::DynamicData_ptr src)
const ReturnCode_t RETCODE_ERROR
DDS::ReturnCode_t register_instance_from_durable_data(DDS::InstanceHandle_t &handle, Message_Block_Ptr data, const DDS::Time_t &source_timestamp)
virtual SendControlStatus send_control(const DataSampleHeader &header, Message_Block_Ptr msg)
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
DDS::ReturnCode_t get_or_create_instance_handle(DDS::InstanceHandle_t &handle, const Sample &sample, const DDS::Time_t &source_timestamp)
ACE_UINT64 get_unsent_data(SendStateDataSampleList &list)
DDS::OfferedIncompatibleQosStatus offered_incompatible_qos_status_
LifespanQosPolicy lifespan
ACE_INT32 lifespan_duration_sec_
DDS::StatusMask listener_mask_
static TransportRegistry * instance()
Return a singleton instance of this class.
DDS::DynamicType_var dynamic_type_
TypeSupportImpl * get_type_support() const
WriterDataLifecycleQosPolicy writer_data_lifecycle
virtual void on_sample_sent(DDS::DataWriter_ptr, const Sample &)
#define ACE_NEW_RETURN(POINTER, CONSTRUCTOR, RET_VAL)
size_t buffer_size(const Sample &sample) const
const ReturnCode_t RETCODE_OK
void notify_status_condition()
const ReturnCode_t RETCODE_UNSUPPORTED
virtual void on_associated(DDS::DataWriter_ptr, const GUID_t &)
virtual void on_qos_changed(DDS::DataWriter_ptr)
ACE_Thread_Mutex reader_info_lock_
::DDS::InstanceHandleSeq subscription_handles
TransportLocator readerDiscInfo
const char * to_string(MessageId value)
const ReturnCode_t RETCODE_NOT_ALLOWED_BY_SECURITY
LivelinessQosPolicyKind kind
ACE_Recursive_Thread_Mutex & get_lock() const
ACE_UINT32 lifespan_duration_nanosec_
virtual DDS::ReturnCode_t get_matched_subscriptions(DDS::InstanceHandleSeq &subscription_handles)
ACE_UINT64 max_suspended_transaction_id_
void wait_messages_pending(const char *caller)
virtual long schedule_timer(ACE_Event_Handler *event_handler, const void *arg, const ACE_Time_Value &delay, const ACE_Time_Value &interval=ACE_Time_Value::zero)=0
QosPolicyId_t last_policy_id
DDS::ReturnCode_t send_request_ack()
const long LENGTH_UNLIMITED
ACE_Recursive_Thread_Mutex lock_
#define ACE_ERROR_RETURN(X, Y)
LivelinessQosPolicy liveliness
RcHandle< T > lock() const
virtual DDS::ReturnCode_t get_matched_subscription_data(DDS::SubscriptionBuiltinTopicData &subscription_data, DDS::InstanceHandle_t subscription_handle)
int insert(Container &c, const ValueType &v)
const character_type * in(void) const
DataRepresentationIdSeq value
virtual DDS::ReturnCode_t get_publication_matched_status(DDS::PublicationMatchedStatus &status)
void check_and_set_repo_id(const GUID_t &id)
static bool valid(const DDS::UserDataQosPolicy &qos)
virtual Extensibility base_extensibility() const =0
Returns the extensibility of just the topic type.
#define ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY
const DCPS::Encoding encoding(DCPS::Encoding::KIND_UNALIGNED_CDR, DCPS::ENDIAN_BIG)
#define TheServiceParticipant
RepoIdToHandleMap id_to_handle_map_
void track_sequence_number(GUIDSeq *filter_out)
InstanceValuesToHandles instance_values_to_handles_
virtual int handle_timeout(const ACE_Time_Value &tv, const void *arg)
Handle the assert liveliness timeout.
CORBA::Long last_deadline_missed_total_count_
static bool consistent(const DDS::ResourceLimitsQosPolicy &resource_limits, const DDS::HistoryQosPolicy &history)
SequenceNumber sequence_number_
The sequence number unique in DataWriter scope.
GUID_t publication_id_
The repository id of this datawriter/publication.
virtual bool serialize(Serializer &ser) const =0
InstanceHandle_t last_subscription_handle
The Internal API and Implementation of OpenDDS.
Defines the interface that allows DataWriters (and lower levels) to inform discovery.
virtual void update_incompatible_qos(const IncompatibleQosStatus &status)
DDS::ReturnCode_t register_instance_i(DDS::InstanceHandle_t &handle, Message_Block_Ptr data, const DDS::Time_t &source_timestamp)
RcHandle< WriteDataContainer > data_container_
The sample data container.
void set_wait_pending_deadline(const MonotonicTimePoint &deadline)
ACE_Message_Block * create_control_message(MessageId message_id, DataSampleHeader &header, Message_Block_Ptr data, const DDS::Time_t &source_timestamp)
virtual void transport_assoc_done(int flags, const GUID_t &remote_id)
#define OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(qos, error_rtn_value)
Cached_Allocator_With_Overflow< ACE_Data_Block, ACE_Thread_Mutex > DataBlockAllocator
CORBA::String_var type_name_
The type name of associated topic.
TimeDuration liveliness_check_interval(DDS::LivelinessQosPolicyKind kind)
DDS::Security::ParticipantCryptoHandle get_crypto_handle() const
virtual const void * native_data() const =0
unsigned long transportContext
unique_ptr< DataBlockAllocator > db_allocator_
The data block allocator.
DDS::ReturnCode_t unregister_instance_w_timestamp(const Sample &sample, DDS::InstanceHandle_t instance_handle, const DDS::Time_t ×tamp)
RcHandle< BitSubscriber > get_builtin_subscriber_proxy() const
sequence< string > StringSeq
bool has_non_key_fields(const TypeSupportImpl &ts) const
SendControlStatus
Return code type for send_control() operations.
static const TimeDuration max_value
const ReturnCode_t RETCODE_BAD_PARAMETER