#include <DataWriterImpl.h>
Inheritance diagram for OpenDDS::DCPS::DataWriterImpl:
Public Member Functions | |
typedef | OPENDDS_MAP_CMP (RepoId, SequenceNumber, GUID_tKeyLessThan) RepoIdToSequenceMap |
DataWriterImpl () | |
Constructor. | |
virtual | ~DataWriterImpl () |
Destructor. | |
virtual DDS::InstanceHandle_t | get_instance_handle () |
virtual DDS::ReturnCode_t | set_qos (const DDS::DataWriterQos &qos) |
virtual DDS::ReturnCode_t | get_qos (DDS::DataWriterQos &qos) |
virtual DDS::ReturnCode_t | set_listener (DDS::DataWriterListener_ptr a_listener, DDS::StatusMask mask) |
virtual DDS::DataWriterListener_ptr | get_listener () |
virtual DDS::Topic_ptr | get_topic () |
virtual DDS::ReturnCode_t | wait_for_acknowledgments (const DDS::Duration_t &max_wait) |
virtual DDS::Publisher_ptr | get_publisher () |
virtual DDS::ReturnCode_t | get_liveliness_lost_status (DDS::LivelinessLostStatus &status) |
virtual DDS::ReturnCode_t | get_offered_deadline_missed_status (DDS::OfferedDeadlineMissedStatus &status) |
virtual DDS::ReturnCode_t | get_offered_incompatible_qos_status (DDS::OfferedIncompatibleQosStatus &status) |
virtual DDS::ReturnCode_t | get_publication_matched_status (DDS::PublicationMatchedStatus &status) |
ACE_Time_Value | liveliness_check_interval (DDS::LivelinessQosPolicyKind kind) |
bool | participant_liveliness_activity_after (const ACE_Time_Value &tv) |
virtual DDS::ReturnCode_t | assert_liveliness () |
virtual DDS::ReturnCode_t | assert_liveliness_by_participant () |
typedef | OPENDDS_VECTOR (DDS::InstanceHandle_t) InstanceHandleVec |
void | get_instance_handles (InstanceHandleVec &instance_handles) |
void | get_readers (RepoIdSet &readers) |
virtual DDS::ReturnCode_t | get_matched_subscriptions (DDS::InstanceHandleSeq &subscription_handles) |
virtual DDS::ReturnCode_t | get_matched_subscription_data (DDS::SubscriptionBuiltinTopicData &subscription_data, DDS::InstanceHandle_t subscription_handle) |
virtual DDS::ReturnCode_t | enable () |
virtual void | add_association (const RepoId &yourId, const ReaderAssociation &reader, bool active) |
virtual void | transport_assoc_done (int flags, const RepoId &remote_id) |
virtual void | association_complete (const RepoId &remote_id) |
virtual void | remove_associations (const ReaderIdSeq &readers, bool callback) |
virtual void | update_incompatible_qos (const IncompatibleQosStatus &status) |
virtual void | update_subscription_params (const RepoId &readerId, const DDS::StringSeq ¶ms) |
virtual void | inconsistent_topic () |
void | cleanup () |
virtual void | init (DDS::Topic_ptr topic, TopicImpl *topic_servant, const DDS::DataWriterQos &qos, DDS::DataWriterListener_ptr a_listener, const DDS::StatusMask &mask, OpenDDS::DCPS::DomainParticipantImpl *participant_servant, OpenDDS::DCPS::PublisherImpl *publisher_servant, DDS::DataWriter_ptr dw_local) |
void | send_all_to_flush_control (ACE_Guard< ACE_Recursive_Thread_Mutex > &guard) |
DDS::ReturnCode_t | register_instance_i (DDS::InstanceHandle_t &handle, DataSample *data, const DDS::Time_t &source_timestamp) |
DDS::ReturnCode_t | register_instance_from_durable_data (DDS::InstanceHandle_t &handle, DataSample *data, const DDS::Time_t &source_timestamp) |
DDS::ReturnCode_t | unregister_instance_i (DDS::InstanceHandle_t handle, const DDS::Time_t &source_timestamp) |
void | unregister_instances (const DDS::Time_t &source_timestamp) |
DDS::ReturnCode_t | write (DataSample *sample, DDS::InstanceHandle_t handle, const DDS::Time_t &source_timestamp, GUIDSeq *filter_out) |
DDS::ReturnCode_t | dispose (DDS::InstanceHandle_t handle, const DDS::Time_t &source_timestamp) |
DDS::ReturnCode_t | num_samples (DDS::InstanceHandle_t handle, size_t &size) |
ACE_UINT64 | get_unsent_data (SendStateDataSampleList &list) |
SendStateDataSampleList | get_resend_data () |
RepoId | get_publication_id () |
RepoId | get_dp_id () |
void | unregister_all () |
void | data_delivered (const DataSampleElement *sample) |
void | control_delivered (ACE_Message_Block *sample) |
bool | should_ack () const |
Does this writer have samples to be acknowledged? | |
AckToken | create_ack_token (DDS::Duration_t max_wait) const |
Create an AckToken for ack operations. | |
virtual void | retrieve_inline_qos_data (TransportSendListener::InlineQosData &qos_data) const |
virtual bool | check_transport_qos (const TransportInst &inst) |
bool | coherent_changes_pending () |
Are coherent changes pending? | |
void | begin_coherent_changes () |
Starts a coherent change set; should only be called once. | |
void | end_coherent_changes (const GroupCoherentSamples &group_samples) |
Ends a coherent change set; should only be called once. | |
const char * | get_topic_name () |
char const * | get_type_name () const |
void | data_dropped (const DataSampleElement *element, bool dropped_by_transport) |
void | control_dropped (ACE_Message_Block *sample, bool dropped_by_transport) |
ACE_INLINE ACE_Recursive_Thread_Mutex & | get_lock () |
virtual void | unregistered (DDS::InstanceHandle_t instance_handle)=0 |
DDS::DataWriterListener_ptr | listener_for (DDS::StatusKind kind) |
virtual int | handle_timeout (const ACE_Time_Value &tv, const void *arg) |
Handle the assert liveliness timeout. | |
virtual int | handle_close (ACE_HANDLE, ACE_Reactor_Mask) |
void | send_suspended_data () |
void | remove_all_associations () |
virtual void | register_for_reader (const RepoId &participant, const RepoId &writerid, const RepoId &readerid, const TransportLocatorSeq &locators, DiscoveryListener *listener) |
virtual void | unregister_for_reader (const RepoId &participant, const RepoId &writerid, const RepoId &readerid) |
void | notify_publication_disconnected (const ReaderIdSeq &subids) |
void | notify_publication_reconnected (const ReaderIdSeq &subids) |
void | notify_publication_lost (const ReaderIdSeq &subids) |
virtual void | notify_connection_deleted (const RepoId &peerId) |
DDS::ReturnCode_t | create_sample_data_message (DataSample *data, DDS::InstanceHandle_t instance_handle, DataSampleHeader &header_data, ACE_Message_Block *&message, const DDS::Time_t &source_timestamp, bool content_filter) |
bool | persist_data () |
void | reschedule_deadline () |
void | wait_pending () |
Wait for pending samples to drain. | |
DDS::InstanceHandle_t | get_next_handle () |
virtual EntityImpl * | parent () const |
bool | filter_out (const DataSampleElement &elt, const OPENDDS_STRING &filterClassName, const FilterEvaluator &evaluator, const DDS::StringSeq &expression_params) const |
void | wait_control_pending () |
DataBlockLockPool::DataBlockLock * | get_db_lock () |
Public Attributes | |
int | data_dropped_count_ |
Statistics counter. | |
int | data_delivered_count_ |
MessageTracker | controlTracker |
Protected Member Functions | |
DDS::ReturnCode_t | wait_for_specific_ack (const AckToken &token) |
void | prepare_to_delete () |
virtual DDS::ReturnCode_t | enable_specific ()=0 |
PublicationInstance * | get_handle_instance (DDS::InstanceHandle_t handle) |
typedef | OPENDDS_MAP_CMP (RepoId, ReaderInfo, GUID_tKeyLessThan) RepoIdToReaderInfoMap |
virtual SendControlStatus | send_control (const DataSampleHeader &header, ACE_Message_Block *msg) |
bool | pending_control () |
Protected Attributes | |
size_t | n_chunks_ |
The number of chunks for the cached allocator. | |
size_t | association_chunk_multiplier_ |
The multiplier for allocators affected by associations. | |
CORBA::String_var | type_name_ |
The type name of associated topic. | |
DDS::DataWriterQos | qos_ |
The qos policy list of this datawriter. | |
DomainParticipantImpl * | participant_servant_ |
ACE_Thread_Mutex | reader_info_lock_ |
RepoIdToReaderInfoMap | reader_info_ |
Private Member Functions | |
void | track_sequence_number (GUIDSeq *filter_out) |
void | notify_publication_lost (const DDS::InstanceHandleSeq &handles) |
DDS::ReturnCode_t | dispose_and_unregister (DDS::InstanceHandle_t handle, const DDS::Time_t ×tamp) |
ACE_Message_Block * | create_control_message (MessageId message_id, DataSampleHeader &header, ACE_Message_Block *data, const DDS::Time_t &source_timestamp) |
bool | send_liveliness (const ACE_Time_Value &now) |
Send the liveliness message. | |
bool | lookup_instance_handles (const ReaderIdSeq &ids, DDS::InstanceHandleSeq &hdls) |
Lookup the instance handles by the subscription repo ids. | |
const RepoId & | get_repo_id () const |
DDS::DomainId_t | domain_id () const |
CORBA::Long | get_priority_value (const AssociationData &) const |
void | association_complete_i (const RepoId &remote_id) |
typedef | OPENDDS_MAP_CMP (RepoId, DDS::InstanceHandle_t, GUID_tKeyLessThan) RepoIdToHandleMap |
bool | need_sequence_repair () |
bool | need_sequence_repair_i () const |
DDS::ReturnCode_t | send_end_historic_samples (const RepoId &readerId) |
Private Attributes | |
CORBA::String_var | topic_name_ |
The name of associated topic. | |
RepoId | topic_id_ |
The associated topic repository id. | |
DDS::Topic_var | topic_objref_ |
The object reference of the associated topic. | |
TopicImpl * | topic_servant_ |
The topic servant. | |
DDS::StatusMask | listener_mask_ |
DDS::DataWriterListener_var | listener_ |
Used to notify the entity for relevant events. | |
DDS::DomainId_t | domain_id_ |
The domain id. | |
PublisherImpl * | publisher_servant_ |
The publisher servant which creates this datawriter. | |
DDS::DataWriter_var | dw_local_objref_ |
the object reference of the local datawriter | |
PublicationId | publication_id_ |
The repository id of this datawriter/publication. | |
SequenceNumber | sequence_number_ |
The sequence number unique in DataWriter scope. | |
bool | coherent_ |
ACE_UINT32 | coherent_samples_ |
WriteDataContainer * | data_container_ |
The sample data container. | |
ACE_Recursive_Thread_Mutex | lock_ |
RepoIdToHandleMap | id_to_handle_map_ |
RepoIdSet | readers_ |
DDS::LivelinessLostStatus | liveliness_lost_status_ |
Status conditions. | |
DDS::OfferedDeadlineMissedStatus | offered_deadline_missed_status_ |
DDS::OfferedIncompatibleQosStatus | offered_incompatible_qos_status_ |
DDS::PublicationMatchedStatus | publication_match_status_ |
bool | liveliness_lost_ |
MessageBlockAllocator * | mb_allocator_ |
The message block allocator. | |
DataBlockAllocator * | db_allocator_ |
The data block allocator. | |
DataSampleHeaderAllocator * | header_allocator_ |
The header data allocator. | |
ACE_Reactor_Timer_Interface * | reactor_ |
ACE_Time_Value | liveliness_check_interval_ |
The time interval for sending liveliness message. | |
ACE_Time_Value | last_liveliness_activity_time_ |
Timestamp of last write/dispose/assert_liveliness. | |
ACE_Time_Value | last_liveliness_check_time_ |
Timestamp of the last time liveliness was checked. | |
CORBA::Long | last_deadline_missed_total_count_ |
OfferedDeadlineWatchdog * | watchdog_ |
bool | cancel_timer_ |
bool | is_bit_ |
bool | initialized_ |
Flag indicates that the init() is called. | |
RepoIdSet | pending_readers_ |
RepoIdSet | assoc_complete_readers_ |
ACE_UINT64 | min_suspended_transaction_id_ |
The cached available data while suspending and associated transaction ids. | |
ACE_UINT64 | max_suspended_transaction_id_ |
SendStateDataSampleList | available_data_list_ |
Monitor * | monitor_ |
Monitor object for this entity. | |
Monitor * | periodic_monitor_ |
Periodic Monitor object for this entity. | |
DataBlockLockPool * | db_lock_pool_ |
bool | liveliness_asserted_ |
ACE_Thread_Mutex | sync_unreg_rem_assocs_lock_ |
Friends | |
class | WriteDataContainer |
class | PublisherImpl |
class | ::DDS_TEST |
Classes | |
struct | AckCustomization |
struct | AckToken |
struct | ReaderInfo |
See the DDS specification, OMG formal/04-12-02, for a description of the interface this class is implementing.
This class must be inherited by the type-specific datawriter which is specific to the data-type associated with the topic.
Definition at line 76 of file DataWriterImpl.h.
OpenDDS::DCPS::DataWriterImpl::DataWriterImpl | ( | ) |
Constructor.
Definition at line 56 of file DataWriterImpl.cpp.
References DDS::PublicationMatchedStatus::current_count, DDS::PublicationMatchedStatus::current_count_change, db_lock_pool_, DDS::HANDLE_NIL, DDS::OfferedDeadlineMissedStatus::last_instance_handle, DDS::OfferedIncompatibleQosStatus::last_policy_id, DDS::PublicationMatchedStatus::last_subscription_handle, liveliness_lost_status_, monitor_, n_chunks_, offered_deadline_missed_status_, offered_incompatible_qos_status_, periodic_monitor_, DDS::OfferedIncompatibleQosStatus::policies, publication_match_status_, TheServiceParticipant, DDS::PublicationMatchedStatus::total_count, DDS::OfferedIncompatibleQosStatus::total_count, DDS::OfferedDeadlineMissedStatus::total_count, DDS::LivelinessLostStatus::total_count, DDS::PublicationMatchedStatus::total_count_change, DDS::OfferedIncompatibleQosStatus::total_count_change, DDS::OfferedDeadlineMissedStatus::total_count_change, and DDS::LivelinessLostStatus::total_count_change.
00057 : data_dropped_count_(0), 00058 data_delivered_count_(0), 00059 controlTracker("DataWriterImpl"), 00060 n_chunks_(TheServiceParticipant->n_chunks()), 00061 association_chunk_multiplier_(TheServiceParticipant->association_chunk_multiplier()), 00062 qos_(TheServiceParticipant->initial_DataWriterQos()), 00063 participant_servant_(0), 00064 topic_id_(GUID_UNKNOWN), 00065 topic_servant_(0), 00066 listener_mask_(DEFAULT_STATUS_MASK), 00067 domain_id_(0), 00068 publisher_servant_(0), 00069 publication_id_(GUID_UNKNOWN), 00070 sequence_number_(SequenceNumber::SEQUENCENUMBER_UNKNOWN()), 00071 coherent_(false), 00072 coherent_samples_(0), 00073 data_container_(0), 00074 liveliness_lost_(false), 00075 mb_allocator_(0), 00076 db_allocator_(0), 00077 header_allocator_(0), 00078 reactor_(0), 00079 liveliness_check_interval_(ACE_Time_Value::max_time), 00080 last_liveliness_activity_time_(ACE_Time_Value::zero), 00081 last_deadline_missed_total_count_(0), 00082 watchdog_(), 00083 cancel_timer_(false), 00084 is_bit_(false), 00085 initialized_(false), 00086 min_suspended_transaction_id_(0), 00087 max_suspended_transaction_id_(0), 00088 monitor_(0), 00089 periodic_monitor_(0), 00090 db_lock_pool_(0), 00091 liveliness_asserted_(false) 00092 { 00093 liveliness_lost_status_.total_count = 0; 00094 liveliness_lost_status_.total_count_change = 0; 00095 00096 offered_deadline_missed_status_.total_count = 0; 00097 offered_deadline_missed_status_.total_count_change = 0; 00098 offered_deadline_missed_status_.last_instance_handle = DDS::HANDLE_NIL; 00099 00100 offered_incompatible_qos_status_.total_count = 0; 00101 offered_incompatible_qos_status_.total_count_change = 0; 00102 offered_incompatible_qos_status_.last_policy_id = 0; 00103 offered_incompatible_qos_status_.policies.length(0); 00104 00105 publication_match_status_.total_count = 0; 00106 publication_match_status_.total_count_change = 0; 00107 publication_match_status_.current_count = 0; 00108 publication_match_status_.current_count_change = 0; 00109 publication_match_status_.last_subscription_handle = DDS::HANDLE_NIL; 00110 00111 monitor_ = 00112 TheServiceParticipant->monitor_factory_->create_data_writer_monitor(this); 00113 periodic_monitor_ = 00114 TheServiceParticipant->monitor_factory_->create_data_writer_periodic_monitor(this); 00115 00116 db_lock_pool_ = new DataBlockLockPool((unsigned long)n_chunks_); 00117 }
OpenDDS::DCPS::DataWriterImpl::~DataWriterImpl | ( | ) | [virtual] |
Destructor.
Definition at line 121 of file DataWriterImpl.cpp.
References data_container_, db_allocator_, db_lock_pool_, DBG_ENTRY_LVL, header_allocator_, initialized_, and mb_allocator_.
00122 { 00123 DBG_ENTRY_LVL("DataWriterImpl","~DataWriterImpl",6); 00124 00125 if (initialized_) { 00126 delete data_container_; 00127 delete mb_allocator_; 00128 delete db_allocator_; 00129 delete header_allocator_; 00130 } 00131 delete db_lock_pool_; 00132 }
void OpenDDS::DCPS::DataWriterImpl::add_association | ( | const RepoId & | yourId, | |
const ReaderAssociation & | reader, | |||
bool | active | |||
) | [virtual] |
Implements OpenDDS::DCPS::DataWriterCallbacks.
Definition at line 215 of file DataWriterImpl.cpp.
References OpenDDS::DCPS::TransportClient::associate(), DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, DDS::DataReaderQos::durability, OpenDDS::DCPS::EntityImpl::entity_deleted_, OpenDDS::DCPS::ReaderAssociation::exprParams, OpenDDS::DCPS::ReaderAssociation::filterClassName, OpenDDS::DCPS::ReaderAssociation::filterExpression, get_publication_id(), OpenDDS::DCPS::GUID_UNKNOWN, is_bit_, OPENDDS_STRING, participant_servant_, publication_id_, qos_, reader_info_, OpenDDS::DCPS::ReaderAssociation::readerId, OpenDDS::DCPS::ReaderAssociation::readerQos, OpenDDS::DCPS::ReaderAssociation::readerTransInfo, DDS::DataReaderQos::reliability, DDS::RELIABLE_RELIABILITY_QOS, OpenDDS::DCPS::AssociationData::remote_data_, OpenDDS::DCPS::AssociationData::remote_durable_, OpenDDS::DCPS::AssociationData::remote_id_, OpenDDS::DCPS::AssociationData::remote_reliable_, TheServiceParticipant, DDS::DataWriterQos::transport_priority, and DDS::VOLATILE_DURABILITY_QOS.
00218 { 00219 DBG_ENTRY_LVL("DataWriterImpl", "add_association", 6); 00220 00221 if (DCPS_debug_level) { 00222 GuidConverter writer_converter(yourId); 00223 GuidConverter reader_converter(reader.readerId); 00224 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DataWriterImpl::add_association - ") 00225 ACE_TEXT("bit %d local %C remote %C\n"), is_bit_, 00226 OPENDDS_STRING(writer_converter).c_str(), 00227 OPENDDS_STRING(reader_converter).c_str())); 00228 } 00229 00230 if (entity_deleted_.value()) { 00231 if (DCPS_debug_level) 00232 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DataWriterImpl::add_association") 00233 ACE_TEXT(" This is a deleted datawriter, ignoring add.\n"))); 00234 00235 return; 00236 } 00237 00238 if (GUID_UNKNOWN == publication_id_) { 00239 publication_id_ = yourId; 00240 } 00241 00242 { 00243 ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_); 00244 reader_info_.insert(std::make_pair(reader.readerId, 00245 ReaderInfo(reader.filterClassName, 00246 TheServiceParticipant->publisher_content_filter() ? reader.filterExpression : "", 00247 reader.exprParams, participant_servant_, 00248 reader.readerQos.durability.kind > DDS::VOLATILE_DURABILITY_QOS))); 00249 } 00250 00251 if (DCPS_debug_level > 4) { 00252 GuidConverter converter(get_publication_id()); 00253 ACE_DEBUG((LM_DEBUG, 00254 ACE_TEXT("(%P|%t) DataWriterImpl::add_association(): ") 00255 ACE_TEXT("adding subscription to publication %C with priority %d.\n"), 00256 OPENDDS_STRING(converter).c_str(), 00257 qos_.transport_priority.value)); 00258 } 00259 00260 AssociationData data; 00261 data.remote_id_ = reader.readerId; 00262 data.remote_data_ = reader.readerTransInfo; 00263 data.remote_reliable_ = 00264 (reader.readerQos.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS); 00265 data.remote_durable_ = 00266 (reader.readerQos.durability.kind > DDS::VOLATILE_DURABILITY_QOS); 00267 00268 if (!associate(data, active)) { 00269 //FUTURE: inform inforepo and try again as passive peer 00270 if (DCPS_debug_level) { 00271 ACE_DEBUG((LM_ERROR, 00272 ACE_TEXT("(%P|%t) DataWriterImpl::add_association: ") 00273 ACE_TEXT("ERROR: transport layer failed to associate.\n"))); 00274 } 00275 } 00276 }
DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::assert_liveliness | ( | ) | [virtual] |
Implements DDS::DataWriter.
Definition at line 1110 of file DataWriterImpl.cpp.
References OpenDDS::DCPS::DomainParticipantImpl::assert_liveliness(), DDS::AUTOMATIC_LIVELINESS_QOS, DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS, DDS::MANUAL_BY_TOPIC_LIVELINESS_QOS, participant_servant_, DDS::RETCODE_ERROR, and DDS::RETCODE_OK.
01111 { 01112 switch (this->qos_.liveliness.kind) { 01113 case DDS::AUTOMATIC_LIVELINESS_QOS: 01114 // Do nothing. 01115 break; 01116 case DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS: 01117 return participant_servant_->assert_liveliness(); 01118 case DDS::MANUAL_BY_TOPIC_LIVELINESS_QOS: 01119 if (this->send_liveliness(ACE_OS::gettimeofday()) == false) { 01120 return DDS::RETCODE_ERROR; 01121 } 01122 break; 01123 } 01124 01125 return DDS::RETCODE_OK; 01126 }
DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::assert_liveliness_by_participant | ( | ) | [virtual] |
Definition at line 1129 of file DataWriterImpl.cpp.
References liveliness_asserted_, DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS, and DDS::RETCODE_OK.
01130 { 01131 // This operation is called by participant. 01132 01133 if (this->qos_.liveliness.kind == DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS) { 01134 // Set a flag indicating that we should send a liveliness message on the timer if necessary. 01135 liveliness_asserted_ = true; 01136 } 01137 01138 return DDS::RETCODE_OK; 01139 }
void OpenDDS::DCPS::DataWriterImpl::association_complete | ( | const RepoId & | remote_id | ) | [virtual] |
Implements OpenDDS::DCPS::DataWriterCallbacks.
Definition at line 395 of file DataWriterImpl.cpp.
References assoc_complete_readers_, association_complete_i(), DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, is_bit_, OPENDDS_STRING, pending_readers_, and OpenDDS::DCPS::remove().
00396 { 00397 DBG_ENTRY_LVL("DataWriterImpl", "association_complete", 6); 00398 00399 if (DCPS_debug_level >= 1) { 00400 GuidConverter writer_converter(this->publication_id_); 00401 GuidConverter reader_converter(remote_id); 00402 ACE_DEBUG((LM_DEBUG, 00403 ACE_TEXT("(%P|%t) DataWriterImpl::association_complete - ") 00404 ACE_TEXT("bit %d local %C remote %C\n"), 00405 is_bit_, 00406 OPENDDS_STRING(writer_converter).c_str(), 00407 OPENDDS_STRING(reader_converter).c_str())); 00408 } 00409 00410 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_); 00411 00412 if (OpenDDS::DCPS::remove(pending_readers_, remote_id) == -1) { 00413 if (DCPS_debug_level) { 00414 GuidConverter writer_converter(this->publication_id_); 00415 GuidConverter reader_converter(remote_id); 00416 ACE_DEBUG((LM_DEBUG, 00417 ACE_TEXT("(%P|%t) DataWriterImpl::association_complete - ") 00418 ACE_TEXT("bit %d local %C did not find pending reader: %C") 00419 ACE_TEXT("defer association_complete_i until add_association resumes\n"), 00420 is_bit_, 00421 OPENDDS_STRING(writer_converter).c_str(), 00422 OPENDDS_STRING(reader_converter).c_str())); 00423 } 00424 // Not found in pending_readers_, defer calling association_complete_i() 00425 // until add_association() resumes and sees this ID in assoc_complete_readers_. 00426 assoc_complete_readers_.insert(remote_id); 00427 00428 } else { 00429 association_complete_i(remote_id); 00430 } 00431 }
void OpenDDS::DCPS::DataWriterImpl::association_complete_i | ( | const RepoId & | remote_id | ) | [private] |
Definition at line 434 of file DataWriterImpl.cpp.
References OpenDDS::DCPS::bind(), controlTracker, create_control_message(), DDS::PublicationMatchedStatus::current_count, DDS::PublicationMatchedStatus::current_count_change, data_container_, DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, dw_local_objref_, OpenDDS::DCPS::END_HISTORIC_SAMPLES, OpenDDS::DCPS::gen_find_size(), get_db_lock(), get_resend_data(), header, OpenDDS::DCPS::DomainParticipantImpl::id_to_handle(), id_to_handle_map_, OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::insert(), is_bit_, DDS::PublicationMatchedStatus::last_subscription_handle, listener_for(), OpenDDS::DCPS::MessageTracker::message_dropped(), OpenDDS::DCPS::MessageTracker::message_sent(), monitor_, OpenDDS::DCPS::EntityImpl::notify_status_condition(), OPENDDS_STRING, participant_servant_, publication_match_status_, DDS::PUBLICATION_MATCHED_STATUS, reader_info_, readers_, OpenDDS::DCPS::WriteDataContainer::reenqueue_all(), OpenDDS::DCPS::Monitor::report(), OpenDDS::DCPS::SEND_CONTROL_ERROR, OpenDDS::DCPS::TransportClient::send_w_control(), OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), OpenDDS::DCPS::time_value_to_time(), DDS::PublicationMatchedStatus::total_count, and DDS::PublicationMatchedStatus::total_count_change.
Referenced by association_complete(), and transport_assoc_done().
00435 { 00436 DBG_ENTRY_LVL("DataWriterImpl", "association_complete_i", 6); 00437 00438 if (DCPS_debug_level >= 1) { 00439 GuidConverter writer_converter(this->publication_id_); 00440 GuidConverter reader_converter(remote_id); 00441 ACE_DEBUG((LM_DEBUG, 00442 ACE_TEXT("(%P|%t) DataWriterImpl::association_complete_i - ") 00443 ACE_TEXT("bit %d local %C remote %C\n"), 00444 is_bit_, 00445 OPENDDS_STRING(writer_converter).c_str(), 00446 OPENDDS_STRING(reader_converter).c_str())); 00447 } 00448 00449 bool reader_durable = false; 00450 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC 00451 OPENDDS_STRING filterClassName; 00452 RcHandle<FilterEvaluator> eval; 00453 DDS::StringSeq expression_params; 00454 #endif 00455 { 00456 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_); 00457 00458 if (OpenDDS::DCPS::insert(readers_, remote_id) == -1) { 00459 GuidConverter converter(remote_id); 00460 ACE_ERROR((LM_ERROR, 00461 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::association_complete_i: ") 00462 ACE_TEXT("insert %C from pending failed.\n"), 00463 OPENDDS_STRING(converter).c_str())); 00464 } 00465 } 00466 { 00467 ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_); 00468 RepoIdToReaderInfoMap::const_iterator it = reader_info_.find(remote_id); 00469 00470 if (it != reader_info_.end()) { 00471 reader_durable = it->second.durable_; 00472 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC 00473 filterClassName = it->second.filter_class_name_; 00474 eval = it->second.eval_; 00475 expression_params = it->second.expression_params_; 00476 #endif 00477 } 00478 } 00479 00480 if (this->monitor_) { 00481 this->monitor_->report(); 00482 } 00483 00484 if (!is_bit_) { 00485 00486 DDS::InstanceHandle_t handle = 00487 this->participant_servant_->id_to_handle(remote_id); 00488 00489 { 00490 // protect publication_match_status_ and status changed flags. 00491 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_); 00492 00493 // update the publication_match_status_ 00494 ++publication_match_status_.total_count; 00495 ++publication_match_status_.total_count_change; 00496 ++publication_match_status_.current_count; 00497 ++publication_match_status_.current_count_change; 00498 00499 if (OpenDDS::DCPS::bind(id_to_handle_map_, remote_id, handle) != 0) { 00500 GuidConverter converter(remote_id); 00501 ACE_DEBUG((LM_WARNING, 00502 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::association_complete_i: ") 00503 ACE_TEXT("id_to_handle_map_%C = 0x%x failed.\n"), 00504 OPENDDS_STRING(converter).c_str(), 00505 handle)); 00506 return; 00507 00508 } else if (DCPS_debug_level > 4) { 00509 GuidConverter converter(remote_id); 00510 ACE_DEBUG((LM_DEBUG, 00511 ACE_TEXT("(%P|%t) DataWriterImpl::association_complete_i: ") 00512 ACE_TEXT("id_to_handle_map_%C = 0x%x.\n"), 00513 OPENDDS_STRING(converter).c_str(), 00514 handle)); 00515 } 00516 00517 publication_match_status_.last_subscription_handle = handle; 00518 00519 set_status_changed_flag(DDS::PUBLICATION_MATCHED_STATUS, true); 00520 } 00521 00522 DDS::DataWriterListener_var listener = 00523 listener_for(DDS::PUBLICATION_MATCHED_STATUS); 00524 00525 if (!CORBA::is_nil(listener.in())) { 00526 00527 listener->on_publication_matched(dw_local_objref_.in(), 00528 publication_match_status_); 00529 00530 // TBD - why does the spec say to change this but not 00531 // change the ChangeFlagStatus after a listener call? 00532 publication_match_status_.total_count_change = 0; 00533 publication_match_status_.current_count_change = 0; 00534 } 00535 00536 notify_status_condition(); 00537 } 00538 00539 // Support DURABILITY QoS 00540 if (reader_durable) { 00541 // Tell the WriteDataContainer to resend all sending/sent 00542 // samples. 00543 this->data_container_->reenqueue_all(remote_id, this->qos_.lifespan 00544 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC 00545 , filterClassName, eval.in(), expression_params 00546 #endif 00547 ); 00548 00549 // Acquire the data writer container lock to avoid deadlock. The 00550 // thread calling association_complete() has to acquire lock in the 00551 // same order as the write()/register() operation. 00552 00553 // Since the thread calling association_complete() is the ORB 00554 // thread, it may have some performance penalty. If the 00555 // performance is an issue, we may need a new thread to handle the 00556 // data_available() calls. 00557 ACE_GUARD(ACE_Recursive_Thread_Mutex, 00558 guard, 00559 this->get_lock()); 00560 00561 SendStateDataSampleList list = this->get_resend_data(); 00562 { 00563 ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_); 00564 // Update the reader's expected sequence 00565 SequenceNumber& seq = 00566 reader_info_.find(remote_id)->second.expected_sequence_; 00567 00568 for (SendStateDataSampleList::iterator list_el = list.begin(); 00569 list_el != list.end(); ++list_el) { 00570 list_el->get_header().historic_sample_ = true; 00571 00572 if (list_el->get_header().sequence_ > seq) { 00573 seq = list_el->get_header().sequence_; 00574 } 00575 } 00576 } 00577 00578 if (this->publisher_servant_->is_suspended()) { 00579 this->available_data_list_.enqueue_tail(list); 00580 00581 } else { 00582 if (DCPS_debug_level >= 4) { 00583 ACE_DEBUG((LM_INFO, "(%P|%t) Sending historic samples\n")); 00584 } 00585 00586 size_t size = 0, padding = 0; 00587 gen_find_size(remote_id, size, padding); 00588 ACE_Message_Block* const data = 00589 new ACE_Message_Block(size, ACE_Message_Block::MB_DATA, 0, 0, 0, 00590 get_db_lock()); 00591 Serializer ser(data); 00592 ser << remote_id; 00593 00594 const DDS::Time_t timestamp = time_value_to_time(ACE_OS::gettimeofday()); 00595 DataSampleHeader header; 00596 ACE_Message_Block* const end_historic_samples = 00597 create_control_message(END_HISTORIC_SAMPLES, header, data, timestamp); 00598 00599 this->controlTracker.message_sent(); 00600 guard.release(); 00601 SendControlStatus ret = send_w_control(list, header, end_historic_samples, remote_id); 00602 if (ret == SEND_CONTROL_ERROR) { 00603 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ") 00604 ACE_TEXT("DataWriterImpl::association_complete_i: ") 00605 ACE_TEXT("send_w_control failed.\n"))); 00606 this->controlTracker.message_dropped(); 00607 } 00608 } 00609 } 00610 }
void OpenDDS::DCPS::DataWriterImpl::begin_coherent_changes | ( | ) |
Starts a coherent change set; should only be called once.
Definition at line 2198 of file DataWriterImpl.cpp.
References coherent_, and get_lock().
02199 { 02200 ACE_GUARD(ACE_Recursive_Thread_Mutex, 02201 guard, 02202 get_lock()); 02203 02204 this->coherent_ = true; 02205 }
bool OpenDDS::DCPS::DataWriterImpl::check_transport_qos | ( | const TransportInst & | inst | ) | [virtual] |
Implements OpenDDS::DCPS::TransportClient.
Definition at line 2177 of file DataWriterImpl.cpp.
02178 { 02179 // DataWriter does not impose any constraints on which transports 02180 // may be used based on QoS. 02181 return true; 02182 }
void OpenDDS::DCPS::DataWriterImpl::cleanup | ( | ) |
cleanup the DataWriter.
Definition at line 136 of file DataWriterImpl.cpp.
References cancel_timer_, dw_local_objref_, reactor_, OpenDDS::DCPS::TopicImpl::remove_entity_ref(), topic_objref_, and topic_servant_.
Referenced by OpenDDS::DCPS::PublisherImpl::delete_datawriter().
00137 { 00138 if (cancel_timer_) { 00139 // The cancel_timer will call handle_close to 00140 // remove_ref. 00141 (void) reactor_->cancel_timer(this, 0); 00142 cancel_timer_ = false; 00143 } 00144 00145 // release our Topic_var 00146 topic_objref_ = DDS::Topic::_nil(); 00147 topic_servant_->remove_entity_ref(); 00148 topic_servant_->_remove_ref(); 00149 topic_servant_ = 0; 00150 00151 dw_local_objref_ = DDS::DataWriter::_nil(); 00152 }
bool OpenDDS::DCPS::DataWriterImpl::coherent_changes_pending | ( | ) |
Are coherent changes pending?
Definition at line 2187 of file DataWriterImpl.cpp.
References coherent_, and get_lock().
02188 { 02189 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 02190 guard, 02191 get_lock(), 02192 false); 02193 02194 return this->coherent_; 02195 }
void OpenDDS::DCPS::DataWriterImpl::control_delivered | ( | ACE_Message_Block * | sample | ) | [virtual] |
This is called by transport to notify that the control message is delivered.
Reimplemented from OpenDDS::DCPS::TransportSendListener.
Definition at line 2135 of file DataWriterImpl.cpp.
References controlTracker, DBG_ENTRY_LVL, and OpenDDS::DCPS::MessageTracker::message_delivered().
02136 { 02137 DBG_ENTRY_LVL("DataWriterImpl","control_delivered",6); 02138 sample->release(); 02139 controlTracker.message_delivered(); 02140 }
void OpenDDS::DCPS::DataWriterImpl::control_dropped | ( | ACE_Message_Block * | sample, | |
bool | dropped_by_transport | |||
) | [virtual] |
This is called by transport to notify that the control message is dropped.
Reimplemented from OpenDDS::DCPS::TransportSendListener.
Definition at line 2276 of file DataWriterImpl.cpp.
References controlTracker, DBG_ENTRY_LVL, and OpenDDS::DCPS::MessageTracker::message_dropped().
02278 { 02279 DBG_ENTRY_LVL("DataWriterImpl","control_dropped",6); 02280 sample->release(); 02281 controlTracker.message_dropped(); 02282 }
DataWriterImpl::AckToken OpenDDS::DCPS::DataWriterImpl::create_ack_token | ( | DDS::Duration_t | max_wait | ) | const |
Create an AckToken for ack operations.
Definition at line 1003 of file DataWriterImpl.cpp.
References OpenDDS::DCPS::DCPS_debug_level.
Referenced by wait_for_acknowledgments().
01004 { 01005 if (DCPS_debug_level > 0) { 01006 ACE_DEBUG((LM_DEBUG, 01007 ACE_TEXT("(%P|%t) DataWriterImpl::create_ack_token() - ") 01008 ACE_TEXT("for sequence %q \n"), 01009 this->sequence_number_.getValue())); 01010 } 01011 return AckToken(max_wait, this->sequence_number_); 01012 }
ACE_Message_Block * OpenDDS::DCPS::DataWriterImpl::create_control_message | ( | MessageId | message_id, | |
DataSampleHeader & | header, | |||
ACE_Message_Block * | data, | |||
const DDS::Time_t & | source_timestamp | |||
) | [private] |
This method create a header message block and chain with the registered sample. The header contains the information needed. e.g. message id, length of whole message... The fast allocator is not used for the header.
Definition at line 1943 of file DataWriterImpl.cpp.
References OpenDDS::DCPS::DataSampleHeader::byte_order_, OpenDDS::DCPS::DataSampleHeader::coherent_change_, db_allocator_, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::DISPOSE_INSTANCE, OpenDDS::DCPS::DISPOSE_UNREGISTER_INSTANCE, get_db_lock(), OpenDDS::DCPS::INSTANCE_REGISTRATION, OpenDDS::DCPS::DataSampleHeader::key_fields_only_, OpenDDS::DCPS::Cached_Allocator_With_Overflow< T, ACE_LOCK >::malloc(), OpenDDS::DCPS::DataSampleHeader::max_marshaled_size(), mb_allocator_, OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::DataSampleHeader::message_length_, DDS::Time_t::nanosec, need_sequence_repair(), OPENDDS_STRING, publication_id_, OpenDDS::DCPS::DataSampleHeader::publication_id_, OpenDDS::DCPS::PublisherImpl::publisher_id_, OpenDDS::DCPS::DataSampleHeader::publisher_id_, publisher_servant_, reader_info_, DDS::Time_t::sec, OpenDDS::DCPS::DataSampleHeader::sequence_, sequence_number_, OpenDDS::DCPS::DataSampleHeader::sequence_repair_, OpenDDS::DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN(), OpenDDS::RTPS::SEQUENCENUMBER_UNKNOWN, OpenDDS::DCPS::DataSampleHeader::source_timestamp_nanosec_, OpenDDS::DCPS::DataSampleHeader::source_timestamp_sec_, OpenDDS::DCPS::TransportClient::swap_bytes(), OpenDDS::DCPS::to_string(), and OpenDDS::DCPS::UNREGISTER_INSTANCE.
Referenced by association_complete_i(), dispose(), dispose_and_unregister(), end_coherent_changes(), register_instance_i(), send_liveliness(), and unregister_instance_i().
01947 { 01948 header_data.message_id_ = message_id; 01949 header_data.byte_order_ = 01950 this->swap_bytes() ? !ACE_CDR_BYTE_ORDER : ACE_CDR_BYTE_ORDER; 01951 header_data.coherent_change_ = 0; 01952 01953 if (data) { 01954 header_data.message_length_ = static_cast<ACE_UINT32>(data->total_length()); 01955 01956 if (header_data.message_length_ == 0) { 01957 data->release(); 01958 } 01959 } 01960 01961 header_data.sequence_ = SequenceNumber::SEQUENCENUMBER_UNKNOWN(); 01962 header_data.sequence_repair_ = false; // set below 01963 header_data.source_timestamp_sec_ = source_timestamp.sec; 01964 header_data.source_timestamp_nanosec_ = source_timestamp.nanosec; 01965 header_data.publication_id_ = publication_id_; 01966 header_data.publisher_id_ = this->publisher_servant_->publisher_id_; 01967 01968 if (message_id == INSTANCE_REGISTRATION 01969 || message_id == DISPOSE_INSTANCE 01970 || message_id == UNREGISTER_INSTANCE 01971 || message_id == DISPOSE_UNREGISTER_INSTANCE) { 01972 01973 header_data.sequence_repair_ = need_sequence_repair(); 01974 01975 // Use the sequence number here for the sake of RTPS (where these 01976 // control messages map onto the Data Submessage). 01977 if (this->sequence_number_ == SequenceNumber::SEQUENCENUMBER_UNKNOWN()) { 01978 this->sequence_number_ = SequenceNumber(); 01979 01980 } else { 01981 ++this->sequence_number_; 01982 } 01983 01984 header_data.sequence_ = this->sequence_number_; 01985 header_data.key_fields_only_ = true; 01986 } 01987 01988 ACE_Message_Block* message = 0; 01989 ACE_NEW_MALLOC_RETURN(message, 01990 static_cast<ACE_Message_Block*>( 01991 mb_allocator_->malloc(sizeof(ACE_Message_Block))), 01992 ACE_Message_Block( 01993 DataSampleHeader::max_marshaled_size(), 01994 ACE_Message_Block::MB_DATA, 01995 header_data.message_length_ ? data : 0, //cont 01996 0, //data 01997 0, //allocator_strategy 01998 get_db_lock(), //locking_strategy 01999 ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY, 02000 ACE_Time_Value::zero, 02001 ACE_Time_Value::max_time, 02002 db_allocator_, 02003 mb_allocator_), 02004 0); 02005 02006 *message << header_data; 02007 02008 // If we incremented sequence number for this control message 02009 if (header_data.sequence_ != SequenceNumber::SEQUENCENUMBER_UNKNOWN()) { 02010 ACE_GUARD_RETURN(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_, 0); 02011 // Update the expected sequence number for all readers 02012 RepoIdToReaderInfoMap::iterator reader; 02013 02014 for (reader = reader_info_.begin(); reader != reader_info_.end(); ++reader) { 02015 reader->second.expected_sequence_ = sequence_number_; 02016 } 02017 } 02018 if (DCPS_debug_level >= 4) { 02019 const GuidConverter converter(publication_id_); 02020 ACE_DEBUG((LM_DEBUG, 02021 ACE_TEXT("(%P|%t) DataWriterImpl::create_control_message: ") 02022 ACE_TEXT("from publication %C sending control sample: %C .\n"), 02023 OPENDDS_STRING(converter).c_str(), 02024 to_string(header_data).c_str())); 02025 } 02026 return message; 02027 }
DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::create_sample_data_message | ( | DataSample * | data, | |
DDS::InstanceHandle_t | instance_handle, | |||
DataSampleHeader & | header_data, | |||
ACE_Message_Block *& | message, | |||
const DDS::Time_t & | source_timestamp, | |||
bool | content_filter | |||
) |
This method create a header message block and chain with the sample data. The header contains the information needed. e.g. message id, length of whole message... The fast allocator is used to allocate the message block, data block and header.
Definition at line 2030 of file DataWriterImpl.cpp.
References OpenDDS::DCPS::DataSampleHeader::byte_order_, OpenDDS::DCPS::TransportClient::cdr_encapsulation(), OpenDDS::DCPS::DataSampleHeader::cdr_encapsulation_, coherent_, OpenDDS::DCPS::DataSampleHeader::coherent_change_, OpenDDS::DCPS::DataSampleHeader::content_filter_, data_container_, db_allocator_, OpenDDS::DCPS::DCPS_debug_level, DDS::DURATION_INFINITE_NSEC, DDS::DURATION_INFINITE_SEC, get_db_lock(), OpenDDS::DCPS::WriteDataContainer::get_handle_instance(), OpenDDS::DCPS::DataSampleHeader::group_coherent_, DDS::GROUP_PRESENTATION_QOS, header_allocator_, DDS::DataWriterQos::lifespan, OpenDDS::DCPS::DataSampleHeader::lifespan_duration_, OpenDDS::DCPS::DataSampleHeader::lifespan_duration_nanosec_, OpenDDS::DCPS::DataSampleHeader::lifespan_duration_sec_, OpenDDS::DCPS::Cached_Allocator_With_Overflow< T, ACE_LOCK >::malloc(), OpenDDS::DCPS::DataSampleHeader::max_marshaled_size(), max_marshaled_size(), mb_allocator_, OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::DataSampleHeader::message_length_, DDS::Time_t::nanosec, need_sequence_repair(), OPENDDS_STRING, DDS::PublisherQos::presentation, publication_id_, OpenDDS::DCPS::DataSampleHeader::publication_id_, OpenDDS::DCPS::PublisherImpl::publisher_id_, OpenDDS::DCPS::DataSampleHeader::publisher_id_, publisher_servant_, qos_, OpenDDS::DCPS::PublisherImpl::qos_, DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::SAMPLE_DATA, DDS::Time_t::sec, OpenDDS::DCPS::DataSampleHeader::sequence_, sequence_number_, OpenDDS::DCPS::DataSampleHeader::sequence_repair_, OpenDDS::DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN(), OpenDDS::DCPS::DataSampleHeader::source_timestamp_nanosec_, OpenDDS::DCPS::DataSampleHeader::source_timestamp_sec_, OpenDDS::DCPS::TransportClient::swap_bytes(), and OpenDDS::DCPS::to_string().
Referenced by write().
02036 { 02037 PublicationInstance* const instance = 02038 data_container_->get_handle_instance(instance_handle); 02039 02040 if (0 == instance) { 02041 ACE_ERROR_RETURN((LM_ERROR, 02042 ACE_TEXT("(%P|%t) DataWriterImpl::create_sample_data_message ") 02043 ACE_TEXT("failed to find instance for handle %d\n"), 02044 instance_handle), 02045 DDS::RETCODE_ERROR); 02046 } 02047 02048 header_data.message_id_ = SAMPLE_DATA; 02049 header_data.byte_order_ = 02050 this->swap_bytes() ? !ACE_CDR_BYTE_ORDER : ACE_CDR_BYTE_ORDER; 02051 header_data.coherent_change_ = this->coherent_; 02052 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 02053 header_data.group_coherent_ = 02054 this->publisher_servant_->qos_.presentation.access_scope 02055 == DDS::GROUP_PRESENTATION_QOS; 02056 #endif 02057 header_data.content_filter_ = content_filter; 02058 header_data.cdr_encapsulation_ = this->cdr_encapsulation(); 02059 header_data.message_length_ = static_cast<ACE_UINT32>(data->total_length()); 02060 header_data.sequence_repair_ = need_sequence_repair(); 02061 02062 if (this->sequence_number_ == SequenceNumber::SEQUENCENUMBER_UNKNOWN()) { 02063 this->sequence_number_ = SequenceNumber(); 02064 02065 } else { 02066 ++this->sequence_number_; 02067 } 02068 02069 header_data.sequence_ = this->sequence_number_; 02070 header_data.source_timestamp_sec_ = source_timestamp.sec; 02071 header_data.source_timestamp_nanosec_ = source_timestamp.nanosec; 02072 02073 if (qos_.lifespan.duration.sec != DDS::DURATION_INFINITE_SEC 02074 || qos_.lifespan.duration.nanosec != DDS::DURATION_INFINITE_NSEC) { 02075 header_data.lifespan_duration_ = true; 02076 header_data.lifespan_duration_sec_ = qos_.lifespan.duration.sec; 02077 header_data.lifespan_duration_nanosec_ = qos_.lifespan.duration.nanosec; 02078 } 02079 02080 header_data.publication_id_ = publication_id_; 02081 header_data.publisher_id_ = this->publisher_servant_->publisher_id_; 02082 size_t max_marshaled_size = header_data.max_marshaled_size(); 02083 02084 ACE_NEW_MALLOC_RETURN(message, 02085 static_cast<ACE_Message_Block*>( 02086 mb_allocator_->malloc(sizeof(ACE_Message_Block))), 02087 ACE_Message_Block(max_marshaled_size, 02088 ACE_Message_Block::MB_DATA, 02089 data, //cont 02090 0, //data 02091 header_allocator_, //alloc_strategy 02092 get_db_lock(), //locking_strategy 02093 ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY, 02094 ACE_Time_Value::zero, 02095 ACE_Time_Value::max_time, 02096 db_allocator_, 02097 mb_allocator_), 02098 DDS::RETCODE_ERROR); 02099 02100 *message << header_data; 02101 if (DCPS_debug_level >= 4) { 02102 const GuidConverter converter(publication_id_); 02103 ACE_DEBUG((LM_DEBUG, 02104 ACE_TEXT("(%P|%t) DataWriterImpl::create_sample_data_message: ") 02105 ACE_TEXT("from publication %C sending data sample: %C .\n"), 02106 OPENDDS_STRING(converter).c_str(), 02107 to_string(header_data).c_str())); 02108 } 02109 return DDS::RETCODE_OK; 02110 }
void OpenDDS::DCPS::DataWriterImpl::data_delivered | ( | const DataSampleElement * | sample | ) | [virtual] |
This is called by transport to notify that the sample is delivered and it is delegated to WriteDataContainer to adjust the internal data sample threads.
Reimplemented from OpenDDS::DCPS::TransportSendListener.
Definition at line 2113 of file DataWriterImpl.cpp.
References data_container_, OpenDDS::DCPS::WriteDataContainer::data_delivered(), data_delivered_count_, DBG_ENTRY_LVL, OpenDDS::DCPS::DataSampleElement::get_pub_id(), OPENDDS_STRING, and publication_id_.
02114 { 02115 DBG_ENTRY_LVL("DataWriterImpl","data_delivered",6); 02116 02117 if (!(sample->get_pub_id() == this->publication_id_)) { 02118 GuidConverter sample_converter(sample->get_pub_id()); 02119 GuidConverter writer_converter(publication_id_); 02120 ACE_ERROR((LM_ERROR, 02121 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::data_delivered: ") 02122 ACE_TEXT(" The publication id %C from delivered element ") 02123 ACE_TEXT("does not match the datawriter's id %C\n"), 02124 OPENDDS_STRING(sample_converter).c_str(), 02125 OPENDDS_STRING(writer_converter).c_str())); 02126 return; 02127 } 02128 //provided for statistics tracking in tests 02129 ++data_delivered_count_; 02130 02131 this->data_container_->data_delivered(sample); 02132 }
void OpenDDS::DCPS::DataWriterImpl::data_dropped | ( | const DataSampleElement * | element, | |
bool | dropped_by_transport | |||
) | [virtual] |
This mothod is called by transport to notify the instance sample is dropped and it delegates to WriteDataContainer to update the internal list.
Reimplemented from OpenDDS::DCPS::TransportSendListener.
Definition at line 2264 of file DataWriterImpl.cpp.
References data_container_, OpenDDS::DCPS::WriteDataContainer::data_dropped(), data_dropped_count_, and DBG_ENTRY_LVL.
02266 { 02267 DBG_ENTRY_LVL("DataWriterImpl","data_dropped",6); 02268 02269 //provided for statistics tracking in tests 02270 ++data_dropped_count_; 02271 02272 this->data_container_->data_dropped(element, dropped_by_transport); 02273 }
DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::dispose | ( | DDS::InstanceHandle_t | handle, | |
const DDS::Time_t & | source_timestamp | |||
) |
Delegate to the WriteDataContainer to dispose all data samples for a given instance and tell the transport to broadcast the disposed instance.
Definition at line 1841 of file DataWriterImpl.cpp.
References create_control_message(), data_container_, DBG_ENTRY_LVL, OpenDDS::DCPS::WriteDataContainer::dispose(), OpenDDS::DCPS::DISPOSE_INSTANCE, OpenDDS::DCPS::EntityImpl::enabled_, OpenDDS::DCPS::WriteDataContainer::enqueue_control(), OpenDDS::DCPS::DataSampleElement::get_header(), get_lock(), OpenDDS::DCPS::WriteDataContainer::obtain_buffer_for_control(), DDS::RETCODE_ERROR, DDS::RETCODE_NOT_ENABLED, DDS::RETCODE_OK, send_all_to_flush_control(), and OpenDDS::DCPS::DataSampleElement::set_sample().
Referenced by OpenDDS::DCPS::DataWriterImpl_T< MessageType >::dispose_w_timestamp().
01843 { 01844 DBG_ENTRY_LVL("DataWriterImpl","dispose",6); 01845 01846 if (enabled_ == false) { 01847 ACE_ERROR_RETURN((LM_ERROR, 01848 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::dispose: ") 01849 ACE_TEXT(" Entity is not enabled. \n")), 01850 DDS::RETCODE_NOT_ENABLED); 01851 } 01852 01853 DDS::ReturnCode_t ret = ::DDS::RETCODE_ERROR; 01854 01855 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, guard, get_lock(), ret); 01856 01857 DataSample* registered_sample_data = 0; 01858 ret = this->data_container_->dispose(handle, registered_sample_data); 01859 01860 if (ret != DDS::RETCODE_OK) { 01861 ACE_ERROR_RETURN((LM_ERROR, 01862 ACE_TEXT("(%P|%t) ERROR: ") 01863 ACE_TEXT("DataWriterImpl::dispose: ") 01864 ACE_TEXT("dispose failed.\n")), 01865 ret); 01866 } 01867 01868 DataSampleElement* element = 0; 01869 ret = this->data_container_->obtain_buffer_for_control(element); 01870 01871 if (ret != DDS::RETCODE_OK) { 01872 ACE_ERROR_RETURN((LM_ERROR, 01873 ACE_TEXT("(%P|%t) ERROR: ") 01874 ACE_TEXT("DataWriterImpl::dispose: ") 01875 ACE_TEXT("obtain_buffer_for_control returned %d.\n"), 01876 ret), 01877 ret); 01878 } 01879 01880 element->set_sample(create_control_message(DISPOSE_INSTANCE, 01881 element->get_header(), 01882 registered_sample_data, 01883 source_timestamp)); 01884 ret = this->data_container_->enqueue_control(element); 01885 01886 if (ret != DDS::RETCODE_OK) { 01887 ACE_ERROR_RETURN((LM_ERROR, 01888 ACE_TEXT("(%P|%t) ERROR: ") 01889 ACE_TEXT("DataWriterImpl::dispose: ") 01890 ACE_TEXT("enqueue_control failed.\n")), 01891 ret); 01892 } 01893 01894 send_all_to_flush_control(guard); 01895 01896 return DDS::RETCODE_OK; 01897 }
DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::dispose_and_unregister | ( | DDS::InstanceHandle_t | handle, | |
const DDS::Time_t & | timestamp | |||
) | [private] |
Definition at line 1616 of file DataWriterImpl.cpp.
References create_control_message(), data_container_, DBG_ENTRY_LVL, OpenDDS::DCPS::WriteDataContainer::dispose(), OpenDDS::DCPS::DISPOSE_UNREGISTER_INSTANCE, OpenDDS::DCPS::WriteDataContainer::enqueue_control(), OpenDDS::DCPS::DataSampleElement::get_header(), get_lock(), OpenDDS::DCPS::WriteDataContainer::obtain_buffer_for_control(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, send_all_to_flush_control(), OpenDDS::DCPS::DataSampleElement::set_sample(), and OpenDDS::DCPS::WriteDataContainer::unregister().
Referenced by unregister_instance_i().
01618 { 01619 DBG_ENTRY_LVL("DataWriterImpl", "dispose_and_unregister", 6); 01620 01621 DDS::ReturnCode_t ret = DDS::RETCODE_ERROR; 01622 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, get_lock(), ret); 01623 01624 DataSample* data_sample = 0; 01625 ret = this->data_container_->dispose(handle, data_sample); 01626 01627 if (ret != DDS::RETCODE_OK) { 01628 ACE_ERROR_RETURN((LM_ERROR, 01629 ACE_TEXT("(%P|%t) ERROR: ") 01630 ACE_TEXT("DataWriterImpl::dispose_and_unregister: ") 01631 ACE_TEXT("dispose on container failed. \n")), 01632 ret); 01633 } 01634 01635 ret = this->data_container_->unregister(handle, data_sample, false); 01636 01637 if (ret != DDS::RETCODE_OK) { 01638 ACE_ERROR_RETURN((LM_ERROR, 01639 ACE_TEXT("(%P|%t) ERROR: ") 01640 ACE_TEXT("DataWriterImpl::dispose_and_unregister: ") 01641 ACE_TEXT("unregister with container failed. \n")), 01642 ret); 01643 } 01644 01645 DataSampleElement* element = 0; 01646 ret = this->data_container_->obtain_buffer_for_control(element); 01647 01648 if (ret != DDS::RETCODE_OK) { 01649 ACE_ERROR_RETURN((LM_ERROR, 01650 ACE_TEXT("(%P|%t) ERROR: ") 01651 ACE_TEXT("DataWriterImpl::dispose_and_unregister: ") 01652 ACE_TEXT("obtain_buffer_for_control returned %d.\n"), 01653 ret), 01654 ret); 01655 } 01656 01657 element->set_sample(create_control_message(DISPOSE_UNREGISTER_INSTANCE, 01658 element->get_header(), 01659 data_sample, 01660 source_timestamp)); 01661 01662 ret = this->data_container_->enqueue_control(element); 01663 01664 if (ret != DDS::RETCODE_OK) { 01665 ACE_ERROR_RETURN((LM_ERROR, 01666 ACE_TEXT("(%P|%t) ERROR: ") 01667 ACE_TEXT("DataWriterImpl::dispose_and_unregister: ") 01668 ACE_TEXT("enqueue_control failed.\n")), 01669 ret); 01670 } 01671 01672 send_all_to_flush_control(guard); 01673 return DDS::RETCODE_OK; 01674 }
DDS::DomainId_t OpenDDS::DCPS::DataWriterImpl::domain_id | ( | ) | const [inline, private, virtual] |
Implements OpenDDS::DCPS::TransportClient.
Definition at line 574 of file DataWriterImpl.h.
00574 { 00575 return this->domain_id_; 00576 }
DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::enable | ( | ) | [virtual] |
Implements DDS::Entity.
Definition at line 1228 of file DataWriterImpl.cpp.
References OpenDDS::DCPS::DomainParticipantImpl::add_adjust_liveliness_timers(), association_chunk_multiplier_, cancel_timer_, OpenDDS::DCPS::TransportClient::connection_info(), data_container_, db_allocator_, OpenDDS::DCPS::DCPS_debug_level, DDS::DataWriterQos::deadline, domain_id_, DDS::DataWriterQos::durability, DDS::DataWriterQos::durability_service, DDS::DURATION_INFINITE_NSEC, DDS::DURATION_INFINITE_SEC, OpenDDS::DCPS::duration_to_time_value(), enable_specific(), OpenDDS::DCPS::TransportClient::enable_transport(), OpenDDS::DCPS::TopicImpl::get_id(), OpenDDS::DCPS::get_instance_sample_list_depth(), OpenDDS::DCPS::PublisherImpl::get_qos(), get_topic_name(), get_type_name(), OpenDDS::DCPS::GUID_UNKNOWN, header_allocator_, DDS::DataWriterQos::history, last_deadline_missed_total_count_, last_liveliness_check_time_, DDS::LENGTH_UNLIMITED, DDS::DataWriterQos::liveliness, liveliness_check_interval_, mb_allocator_, monitor_, n_chunks_, DDS::Duration_t::nanosec, offered_deadline_missed_status_, participant_servant_, OpenDDS::DCPS::WriteDataContainer::publication_id_, publication_id_, publisher_servant_, qos_, reactor_, DDS::DataWriterQos::reliability, DDS::RELIABLE_RELIABILITY_QOS, OpenDDS::DCPS::Monitor::report(), DDS::DataWriterQos::resource_limits, DDS::RETCODE_ERROR, DDS::RETCODE_OK, DDS::RETCODE_PRECONDITION_NOT_MET, DDS::Duration_t::sec, OpenDDS::DCPS::EntityImpl::set_enabled(), TheServiceParticipant, topic_name_, topic_servant_, DDS::VOLATILE_DURABILITY_QOS, watchdog_, WriteDataContainer, and OpenDDS::DCPS::PublisherImpl::writer_enabled().
Referenced by OpenDDS::DCPS::PublisherImpl::create_datawriter().
01229 { 01230 //According spec: 01231 // - Calling enable on an already enabled Entity returns OK and has no 01232 // effect. 01233 // - Calling enable on an Entity whose factory is not enabled will fail 01234 // and return PRECONDITION_NOT_MET. 01235 01236 if (this->is_enabled()) { 01237 return DDS::RETCODE_OK; 01238 } 01239 01240 if (this->publisher_servant_->is_enabled() == false) { 01241 return DDS::RETCODE_PRECONDITION_NOT_MET; 01242 } 01243 01244 // Note: do configuration based on QoS in enable() because 01245 // before enable is called the QoS can be changed -- even 01246 // for Changeable=NO 01247 01248 // Configure WriteDataContainer constructor parameters from qos. 01249 01250 const bool reliable = qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS; 01251 01252 CORBA::Long const depth = 01253 get_instance_sample_list_depth( 01254 qos_.history.kind, 01255 qos_.history.depth, 01256 qos_.resource_limits.max_samples_per_instance); 01257 01258 bool resource_blocking = false; 01259 01260 if (qos_.resource_limits.max_samples != DDS::LENGTH_UNLIMITED) { 01261 n_chunks_ = qos_.resource_limits.max_samples; 01262 01263 if (qos_.resource_limits.max_instances == DDS::LENGTH_UNLIMITED) { 01264 resource_blocking = true; 01265 01266 } else { 01267 resource_blocking = 01268 (qos_.resource_limits.max_samples < qos_.resource_limits.max_instances) 01269 || (qos_.resource_limits.max_samples < 01270 (qos_.resource_limits.max_instances * depth)); 01271 } 01272 } 01273 01274 //else using value from Service_Participant 01275 01276 // enable the type specific part of this DataWriter 01277 this->enable_specific(); 01278 01279 CORBA::Long max_instances = 0; 01280 01281 if (reliable && qos_.resource_limits.max_instances != DDS::LENGTH_UNLIMITED) 01282 max_instances = qos_.resource_limits.max_instances; 01283 01284 CORBA::Long max_total_samples = 0; 01285 01286 if (reliable && resource_blocking) 01287 max_total_samples = qos_.resource_limits.max_samples; 01288 01289 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE 01290 // Get data durability cache if DataWriter QoS requires durable 01291 // samples. Publisher servant retains ownership of the cache. 01292 DataDurabilityCache* const durability_cache = 01293 TheServiceParticipant->get_data_durability_cache(qos_.durability); 01294 #endif 01295 01296 //Note: the QoS used to set n_chunks_ is Changable=No so 01297 // it is OK that we cannot change the size of our allocators. 01298 data_container_ = new WriteDataContainer(this, 01299 depth, 01300 qos_.reliability.max_blocking_time, 01301 n_chunks_, 01302 domain_id_, 01303 get_topic_name(), 01304 get_type_name(), 01305 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE 01306 durability_cache, 01307 qos_.durability_service, 01308 #endif 01309 max_instances, 01310 max_total_samples); 01311 01312 // +1 because we might allocate one before releasing another 01313 // TBD - see if this +1 can be removed. 01314 mb_allocator_ = new MessageBlockAllocator(n_chunks_ * association_chunk_multiplier_); 01315 db_allocator_ = new DataBlockAllocator(n_chunks_+1); 01316 header_allocator_ = new DataSampleHeaderAllocator(n_chunks_+1); 01317 01318 if (DCPS_debug_level >= 2) { 01319 ACE_DEBUG((LM_DEBUG, 01320 "(%P|%t) DataWriterImpl::enable-mb" 01321 " Cached_Allocator_With_Overflow %x with %d chunks\n", 01322 mb_allocator_, 01323 n_chunks_)); 01324 01325 ACE_DEBUG((LM_DEBUG, 01326 "(%P|%t) DataWriterImpl::enable-db" 01327 " Cached_Allocator_With_Overflow %x with %d chunks\n", 01328 db_allocator_, 01329 n_chunks_)); 01330 01331 ACE_DEBUG((LM_DEBUG, 01332 "(%P|%t) DataWriterImpl::enable-header" 01333 " Cached_Allocator_With_Overflow %x with %d chunks\n", 01334 header_allocator_, 01335 n_chunks_)); 01336 } 01337 01338 if (qos_.liveliness.lease_duration.sec != DDS::DURATION_INFINITE_SEC && 01339 qos_.liveliness.lease_duration.nanosec != DDS::DURATION_INFINITE_NSEC) { 01340 liveliness_check_interval_ = duration_to_time_value(qos_.liveliness.lease_duration); 01341 liveliness_check_interval_ *= TheServiceParticipant->liveliness_factor()/100.0; 01342 // Must be at least 1 micro second. 01343 if (liveliness_check_interval_ == ACE_Time_Value::zero) { 01344 liveliness_check_interval_ = ACE_Time_Value (0, 1); 01345 } 01346 01347 last_liveliness_check_time_ = ACE_OS::gettimeofday(); 01348 01349 if (reactor_->schedule_timer(this, 01350 0, 01351 liveliness_check_interval_, 01352 liveliness_check_interval_) == -1) { 01353 ACE_ERROR((LM_ERROR, 01354 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::enable: %p.\n"), 01355 ACE_TEXT("schedule_timer"))); 01356 01357 } else { 01358 cancel_timer_ = true; 01359 this->_add_ref(); 01360 } 01361 } 01362 01363 participant_servant_->add_adjust_liveliness_timers(this); 01364 01365 // Setup the offered deadline watchdog if the configured deadline 01366 // period is not the default (infinite). 01367 DDS::Duration_t const deadline_period = this->qos_.deadline.period; 01368 01369 if (deadline_period.sec != DDS::DURATION_INFINITE_SEC 01370 || deadline_period.nanosec != DDS::DURATION_INFINITE_NSEC) { 01371 this->watchdog_ = new OfferedDeadlineWatchdog( 01372 this->lock_, 01373 this->qos_.deadline, 01374 this, 01375 this->dw_local_objref_.in(), 01376 this->offered_deadline_missed_status_, 01377 this->last_deadline_missed_total_count_); 01378 } 01379 01380 Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_); 01381 disco->pre_writer(this); 01382 01383 this->set_enabled(); 01384 01385 try { 01386 this->enable_transport(reliable, 01387 this->qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS); 01388 01389 } catch (const Transport::Exception&) { 01390 ACE_ERROR((LM_ERROR, 01391 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::enable, ") 01392 ACE_TEXT("Transport Exception.\n"))); 01393 return DDS::RETCODE_ERROR; 01394 01395 } 01396 01397 const TransportLocatorSeq& trans_conf_info = connection_info(); 01398 01399 DDS::PublisherQos pub_qos; 01400 this->publisher_servant_->get_qos(pub_qos); 01401 01402 this->publication_id_ = 01403 disco->add_publication(this->domain_id_, 01404 this->participant_servant_->get_id(), 01405 this->topic_servant_->get_id(), 01406 this, 01407 this->qos_, 01408 trans_conf_info, 01409 pub_qos); 01410 01411 if (this->publication_id_ == GUID_UNKNOWN) { 01412 ACE_ERROR((LM_ERROR, 01413 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::enable, ") 01414 ACE_TEXT("add_publication returned invalid id. \n"))); 01415 return DDS::RETCODE_ERROR; 01416 } 01417 01418 this->data_container_->publication_id_ = this->publication_id_; 01419 01420 const DDS::ReturnCode_t writer_enabled_result = 01421 publisher_servant_->writer_enabled(topic_name_.in(), this); 01422 01423 if (this->monitor_) { 01424 this->monitor_->report(); 01425 } 01426 01427 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE 01428 01429 // Move cached data from the durability cache to the unsent data 01430 // queue. 01431 if (durability_cache != 0) { 01432 01433 if (!durability_cache->get_data(this->domain_id_, 01434 get_topic_name(), 01435 get_type_name(), 01436 this, 01437 this->mb_allocator_, 01438 this->db_allocator_, 01439 this->qos_.lifespan)) { 01440 ACE_ERROR((LM_ERROR, 01441 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::enable: ") 01442 ACE_TEXT("unable to retrieve durable data\n"))); 01443 } 01444 } 01445 01446 #endif 01447 01448 return writer_enabled_result; 01449 }
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::enable_specific | ( | ) | [protected, pure virtual] |
void OpenDDS::DCPS::DataWriterImpl::end_coherent_changes | ( | const GroupCoherentSamples & | group_samples | ) |
Ends a coherent change set; should only be called once.
Definition at line 2208 of file DataWriterImpl.cpp.
References coherent_, coherent_samples_, OpenDDS::DCPS::CoherentChangeControl::coherent_samples_, create_control_message(), OpenDDS::DCPS::END_COHERENT_CHANGES, get_db_lock(), get_lock(), OpenDDS::DCPS::CoherentChangeControl::group_coherent_, OpenDDS::DCPS::CoherentChangeControl::group_coherent_samples_, DDS::GROUP_PRESENTATION_QOS, header, OpenDDS::DCPS::WriterCoherentSample::last_sample_, OpenDDS::DCPS::CoherentChangeControl::max_marshaled_size(), max_marshaled_size(), OpenDDS::DCPS::WriterCoherentSample::num_samples_, DDS::PublisherQos::presentation, OpenDDS::DCPS::PublisherImpl::publisher_id_, OpenDDS::DCPS::CoherentChangeControl::publisher_id_, publisher_servant_, OpenDDS::DCPS::PublisherImpl::qos_, OpenDDS::DCPS::SEND_CONTROL_ERROR, sequence_number_, and OpenDDS::DCPS::time_value_to_time().
02209 { 02210 // PublisherImpl::pi_lock_ should be held. 02211 ACE_GUARD(ACE_Recursive_Thread_Mutex, 02212 guard, 02213 get_lock()); 02214 02215 CoherentChangeControl end_msg; 02216 end_msg.coherent_samples_.num_samples_ = this->coherent_samples_; 02217 end_msg.coherent_samples_.last_sample_ = this->sequence_number_; 02218 end_msg.group_coherent_ 02219 = this->publisher_servant_->qos_.presentation.access_scope == DDS::GROUP_PRESENTATION_QOS; 02220 02221 if (end_msg.group_coherent_) { 02222 end_msg.publisher_id_ = this->publisher_servant_->publisher_id_; 02223 end_msg.group_coherent_samples_ = group_samples; 02224 } 02225 02226 ACE_Message_Block* data = 0; 02227 size_t max_marshaled_size = end_msg.max_marshaled_size(); 02228 02229 ACE_NEW(data, ACE_Message_Block(max_marshaled_size, 02230 ACE_Message_Block::MB_DATA, 02231 0, //cont 02232 0, //data 02233 0, //alloc_strategy 02234 get_db_lock())); 02235 02236 Serializer serializer( 02237 data, 02238 this->swap_bytes()); 02239 02240 serializer << end_msg; 02241 02242 DDS::Time_t source_timestamp = 02243 time_value_to_time(ACE_OS::gettimeofday()); 02244 02245 DataSampleHeader header; 02246 ACE_Message_Block* control = 02247 create_control_message(END_COHERENT_CHANGES, header, data, source_timestamp); 02248 02249 02250 this->coherent_ = false; 02251 this->coherent_samples_ = 0; 02252 02253 guard.release(); 02254 if (this->send_control(header, control) == SEND_CONTROL_ERROR) { 02255 ACE_ERROR((LM_ERROR, 02256 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::end_coherent_changes:") 02257 ACE_TEXT(" unable to send END_COHERENT_CHANGES control message!\n"))); 02258 } 02259 }
bool OpenDDS::DCPS::DataWriterImpl::filter_out | ( | const DataSampleElement & | elt, | |
const OPENDDS_STRING & | filterClassName, | |||
const FilterEvaluator & | evaluator, | |||
const DDS::StringSeq & | expression_params | |||
) | const |
Definition at line 2150 of file DataWriterImpl.cpp.
References OpenDDS::DCPS::DataSampleHeader::byte_order_, OpenDDS::DCPS::DataSampleHeader::cdr_encapsulation_, OpenDDS::DCPS::FilterEvaluator::eval(), OpenDDS::DCPS::DataSampleElement::get_header(), OpenDDS::DCPS::DataSampleElement::get_sample(), OpenDDS::DCPS::TopicDescriptionImpl::get_type_support(), OpenDDS::DCPS::TypeSupportImpl::getMetaStructForType(), and topic_servant_.
Referenced by OpenDDS::DCPS::WriteDataContainer::copy_and_append().
02154 { 02155 TypeSupportImpl* const typesupport = 02156 dynamic_cast<TypeSupportImpl*>(topic_servant_->get_type_support()); 02157 02158 if (!typesupport) { 02159 ACE_ERROR((LM_ERROR, "(%P|%t) ERROR DataWriterImpl::filter_out - Could not cast type support, not filtering\n")); 02160 return false; 02161 } 02162 02163 if (filterClassName == "DDSSQL" || 02164 filterClassName == "OPENDDSSQL") { 02165 return !evaluator.eval(elt.get_sample()->cont(), 02166 elt.get_header().byte_order_ != ACE_CDR_BYTE_ORDER, 02167 elt.get_header().cdr_encapsulation_, typesupport->getMetaStructForType(), 02168 expression_params); 02169 } 02170 else { 02171 return false; 02172 } 02173 }
DataBlockLockPool::DataBlockLock* OpenDDS::DCPS::DataWriterImpl::get_db_lock | ( | ) | [inline] |
Definition at line 472 of file DataWriterImpl.h.
Referenced by association_complete_i(), create_control_message(), create_sample_data_message(), end_coherent_changes(), and OpenDDS::DCPS::DataDurabilityCache::get_data().
00472 { 00473 return db_lock_pool_->get_lock(); 00474 }
RepoId OpenDDS::DCPS::DataWriterImpl::get_dp_id | ( | ) |
Accessor of the repository id of the domain participant.
Definition at line 1925 of file DataWriterImpl.cpp.
References OpenDDS::DCPS::DomainParticipantImpl::get_id(), and participant_servant_.
Referenced by OpenDDS::DCPS::DWMonitorImpl::report().
01926 { 01927 return participant_servant_->get_id(); 01928 }
PublicationInstance * OpenDDS::DCPS::DataWriterImpl::get_handle_instance | ( | DDS::InstanceHandle_t | handle | ) | [protected] |
Attempt to locate an existing instance for the given handle.
Definition at line 2413 of file DataWriterImpl.cpp.
References data_container_, and OpenDDS::DCPS::WriteDataContainer::get_handle_instance().
02414 { 02415 PublicationInstance* instance = 0; 02416 02417 if (0 != data_container_) { 02418 instance = data_container_->get_handle_instance(handle); 02419 } 02420 02421 return instance; 02422 }
DDS::InstanceHandle_t OpenDDS::DCPS::DataWriterImpl::get_instance_handle | ( | ) | [virtual] |
Implements OpenDDS::DCPS::EntityImpl.
Definition at line 203 of file DataWriterImpl.cpp.
References OpenDDS::DCPS::DomainParticipantImpl::id_to_handle(), participant_servant_, and publication_id_.
00204 { 00205 return this->participant_servant_->id_to_handle(publication_id_); 00206 }
void OpenDDS::DCPS::DataWriterImpl::get_instance_handles | ( | InstanceHandleVec & | instance_handles | ) |
Definition at line 2612 of file DataWriterImpl.cpp.
References data_container_, and OpenDDS::DCPS::WriteDataContainer::get_instance_handles().
Referenced by OpenDDS::DCPS::DWMonitorImpl::report().
02613 { 02614 this->data_container_->get_instance_handles(instance_handles); 02615 }
DDS::DataWriterListener_ptr OpenDDS::DCPS::DataWriterImpl::get_listener | ( | ) | [virtual] |
Implements DDS::DataWriter.
Definition at line 982 of file DataWriterImpl.cpp.
References listener_.
00983 { 00984 return DDS::DataWriterListener::_duplicate(listener_.in()); 00985 }
DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::get_liveliness_lost_status | ( | DDS::LivelinessLostStatus & | status | ) | [virtual] |
Definition at line 1041 of file DataWriterImpl.cpp.
References DDS::LIVELINESS_LOST_STATUS, liveliness_lost_status_, DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), and DDS::LivelinessLostStatus::total_count_change.
01043 { 01044 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 01045 guard, 01046 this->lock_, 01047 DDS::RETCODE_ERROR); 01048 set_status_changed_flag(DDS::LIVELINESS_LOST_STATUS, false); 01049 status = liveliness_lost_status_; 01050 liveliness_lost_status_.total_count_change = 0; 01051 return DDS::RETCODE_OK; 01052 }
ACE_INLINE ACE_Recursive_Thread_Mutex& OpenDDS::DCPS::DataWriterImpl::get_lock | ( | ) | [inline] |
Accessor of the WriterDataContainer's lock.
Definition at line 366 of file DataWriterImpl.h.
Referenced by begin_coherent_changes(), coherent_changes_pending(), dispose(), dispose_and_unregister(), end_coherent_changes(), register_instance_from_durable_data(), unregister_instance_i(), and write().
00366 { 00367 return data_container_->lock_; 00368 }
DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::get_matched_subscription_data | ( | DDS::SubscriptionBuiltinTopicData & | subscription_data, | |
DDS::InstanceHandle_t | subscription_handle | |||
) | [virtual] |
Definition at line 1195 of file DataWriterImpl.cpp.
References OpenDDS::DCPS::BUILT_IN_SUBSCRIPTION_TOPIC, OpenDDS::DCPS::EntityImpl::enabled_, participant_servant_, DDS::RETCODE_NOT_ENABLED, and DDS::RETCODE_OK.
01198 { 01199 if (enabled_ == false) { 01200 ACE_ERROR_RETURN((LM_ERROR, 01201 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::") 01202 ACE_TEXT("get_matched_subscription_data: ") 01203 ACE_TEXT("Entity is not enabled. \n")), 01204 DDS::RETCODE_NOT_ENABLED); 01205 } 01206 01207 BIT_Helper_1 < DDS::SubscriptionBuiltinTopicDataDataReader, 01208 DDS::SubscriptionBuiltinTopicDataDataReader_var, 01209 DDS::SubscriptionBuiltinTopicDataSeq > hh; 01210 01211 DDS::SubscriptionBuiltinTopicDataSeq data; 01212 01213 DDS::ReturnCode_t ret = 01214 hh.instance_handle_to_bit_data(participant_servant_, 01215 BUILT_IN_SUBSCRIPTION_TOPIC, 01216 subscription_handle, 01217 data); 01218 01219 if (ret == DDS::RETCODE_OK) { 01220 subscription_data = data[0]; 01221 } 01222 01223 return ret; 01224 }
DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::get_matched_subscriptions | ( | DDS::InstanceHandleSeq & | subscription_handles | ) | [virtual] |
Definition at line 1162 of file DataWriterImpl.cpp.
References OpenDDS::DCPS::EntityImpl::enabled_, id_to_handle_map_, DDS::RETCODE_ERROR, DDS::RETCODE_NOT_ENABLED, and DDS::RETCODE_OK.
01164 { 01165 if (enabled_ == false) { 01166 ACE_ERROR_RETURN((LM_ERROR, 01167 ACE_TEXT("(%P|%t) ERROR: ") 01168 ACE_TEXT("DataWriterImpl::get_matched_subscriptions: ") 01169 ACE_TEXT(" Entity is not enabled. \n")), 01170 DDS::RETCODE_NOT_ENABLED); 01171 } 01172 01173 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 01174 guard, 01175 this->lock_, 01176 DDS::RETCODE_ERROR); 01177 01178 // Copy out the handles for the current set of subscriptions. 01179 int index = 0; 01180 subscription_handles.length( 01181 static_cast<CORBA::ULong>(this->id_to_handle_map_.size())); 01182 01183 for (RepoIdToHandleMap::iterator 01184 current = this->id_to_handle_map_.begin(); 01185 current != this->id_to_handle_map_.end(); 01186 ++current, ++index) { 01187 subscription_handles[index] = current->second; 01188 } 01189 01190 return DDS::RETCODE_OK; 01191 }
DDS::InstanceHandle_t OpenDDS::DCPS::DataWriterImpl::get_next_handle | ( | ) |
Get an instance handle for a new instance.
Definition at line 209 of file DataWriterImpl.cpp.
References OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::DomainParticipantImpl::id_to_handle(), and participant_servant_.
Referenced by OpenDDS::DCPS::WriteDataContainer::register_instance().
00210 { 00211 return this->participant_servant_->id_to_handle(GUID_UNKNOWN); 00212 }
DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::get_offered_deadline_missed_status | ( | DDS::OfferedDeadlineMissedStatus & | status | ) | [virtual] |
Definition at line 1055 of file DataWriterImpl.cpp.
References last_deadline_missed_total_count_, DDS::OFFERED_DEADLINE_MISSED_STATUS, offered_deadline_missed_status_, DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), DDS::OfferedDeadlineMissedStatus::total_count, and DDS::OfferedDeadlineMissedStatus::total_count_change.
01057 { 01058 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 01059 guard, 01060 this->lock_, 01061 DDS::RETCODE_ERROR); 01062 01063 set_status_changed_flag(DDS::OFFERED_DEADLINE_MISSED_STATUS, false); 01064 01065 this->offered_deadline_missed_status_.total_count_change = 01066 this->offered_deadline_missed_status_.total_count 01067 - this->last_deadline_missed_total_count_; 01068 01069 // Update for next status check. 01070 this->last_deadline_missed_total_count_ = 01071 this->offered_deadline_missed_status_.total_count; 01072 01073 status = offered_deadline_missed_status_; 01074 01075 this->offered_deadline_missed_status_.total_count_change = 0; 01076 01077 return DDS::RETCODE_OK; 01078 }
DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::get_offered_incompatible_qos_status | ( | DDS::OfferedIncompatibleQosStatus & | status | ) | [virtual] |
Definition at line 1081 of file DataWriterImpl.cpp.
References DDS::OFFERED_INCOMPATIBLE_QOS_STATUS, offered_incompatible_qos_status_, DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), and DDS::OfferedIncompatibleQosStatus::total_count_change.
01083 { 01084 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 01085 guard, 01086 this->lock_, 01087 DDS::RETCODE_ERROR); 01088 set_status_changed_flag(DDS::OFFERED_INCOMPATIBLE_QOS_STATUS, false); 01089 status = offered_incompatible_qos_status_; 01090 offered_incompatible_qos_status_.total_count_change = 0; 01091 return DDS::RETCODE_OK; 01092 }
CORBA::Long OpenDDS::DCPS::DataWriterImpl::get_priority_value | ( | const AssociationData & | ) | const [inline, private, virtual] |
Implements OpenDDS::DCPS::TransportClient.
Definition at line 578 of file DataWriterImpl.h.
00578 { 00579 return this->qos_.transport_priority.value; 00580 }
RepoId OpenDDS::DCPS::DataWriterImpl::get_publication_id | ( | ) |
Accessor of the repository id of this datawriter/publication.
Definition at line 1919 of file DataWriterImpl.cpp.
References publication_id_.
Referenced by add_association(), OpenDDS::DCPS::PublisherImpl::delete_contained_entities(), OpenDDS::DCPS::PublisherImpl::delete_datawriter(), OpenDDS::Federator::ManagerImpl::initialize(), OpenDDS::DCPS::DWPeriodicMonitorImpl::report(), OpenDDS::DCPS::DWMonitorImpl::report(), and OpenDDS::DCPS::PublisherImpl::writer_enabled().
01920 { 01921 return publication_id_; 01922 }
DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::get_publication_matched_status | ( | DDS::PublicationMatchedStatus & | status | ) | [virtual] |
Definition at line 1095 of file DataWriterImpl.cpp.
References DDS::PublicationMatchedStatus::current_count_change, publication_match_status_, DDS::PUBLICATION_MATCHED_STATUS, DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), and DDS::PublicationMatchedStatus::total_count_change.
01097 { 01098 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 01099 guard, 01100 this->lock_, 01101 DDS::RETCODE_ERROR); 01102 set_status_changed_flag(DDS::PUBLICATION_MATCHED_STATUS, false); 01103 status = publication_match_status_; 01104 publication_match_status_.total_count_change = 0; 01105 publication_match_status_.current_count_change = 0; 01106 return DDS::RETCODE_OK; 01107 }
DDS::Publisher_ptr OpenDDS::DCPS::DataWriterImpl::get_publisher | ( | ) | [virtual] |
Implements DDS::DataWriter.
Definition at line 1035 of file DataWriterImpl.cpp.
References publisher_servant_.
Referenced by OpenDDS::DCPS::PublisherImpl::delete_datawriter(), OpenDDS::DCPS::StaticDiscovery::pre_writer(), and OpenDDS::DCPS::DWMonitorImpl::report().
01036 { 01037 return DDS::Publisher::_duplicate(publisher_servant_); 01038 }
DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::get_qos | ( | DDS::DataWriterQos & | qos | ) | [virtual] |
Definition at line 965 of file DataWriterImpl.cpp.
References qos_, and DDS::RETCODE_OK.
Referenced by OpenDDS::DCPS::StaticDiscovery::pre_writer().
00966 { 00967 qos = qos_; 00968 return DDS::RETCODE_OK; 00969 }
void OpenDDS::DCPS::DataWriterImpl::get_readers | ( | RepoIdSet & | readers | ) |
Definition at line 2618 of file DataWriterImpl.cpp.
Referenced by OpenDDS::DCPS::DWMonitorImpl::report().
02619 { 02620 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_); 02621 readers = this->readers_; 02622 }
const RepoId& OpenDDS::DCPS::DataWriterImpl::get_repo_id | ( | ) | const [inline, private, virtual] |
Implements OpenDDS::DCPS::TransportClient.
Definition at line 571 of file DataWriterImpl.h.
00571 { 00572 return this->publication_id_; 00573 }
SendStateDataSampleList OpenDDS::DCPS::DataWriterImpl::get_resend_data | ( | ) | [inline] |
Definition at line 280 of file DataWriterImpl.h.
Referenced by association_complete_i().
00280 { 00281 return data_container_->get_resend_data(); 00282 }
DDS::Topic_ptr OpenDDS::DCPS::DataWriterImpl::get_topic | ( | ) | [virtual] |
Implements DDS::DataWriter.
Definition at line 988 of file DataWriterImpl.cpp.
References topic_objref_.
Referenced by OpenDDS::DCPS::DWMonitorImpl::report().
00989 { 00990 return DDS::Topic::_duplicate(topic_objref_.in()); 00991 }
const char * OpenDDS::DCPS::DataWriterImpl::get_topic_name | ( | ) |
Accessor of the associated topic name.
Definition at line 1931 of file DataWriterImpl.cpp.
References topic_name_.
Referenced by OpenDDS::DCPS::PublisherImpl::delete_datawriter(), and enable().
01932 { 01933 return topic_name_.in(); 01934 }
char const * OpenDDS::DCPS::DataWriterImpl::get_type_name | ( | ) | const |
Get associated topic type name.
Definition at line 1937 of file DataWriterImpl.cpp.
References type_name_.
Referenced by enable().
01938 { 01939 return type_name_.in(); 01940 }
ACE_UINT64 OpenDDS::DCPS::DataWriterImpl::get_unsent_data | ( | SendStateDataSampleList & | list | ) | [inline] |
Retrieve the unsent data from the WriteDataContainer.
Definition at line 276 of file DataWriterImpl.h.
Referenced by send_all_to_flush_control(), and write().
00276 { 00277 return data_container_->get_unsent_data(list); 00278 }
int OpenDDS::DCPS::DataWriterImpl::handle_close | ( | ACE_HANDLE | , | |
ACE_Reactor_Mask | ||||
) | [virtual] |
int OpenDDS::DCPS::DataWriterImpl::handle_timeout | ( | const ACE_Time_Value & | tv, | |
const void * | arg | |||
) | [virtual] |
Handle the assert liveliness timeout.
Definition at line 2299 of file DataWriterImpl.cpp.
References DDS::AUTOMATIC_LIVELINESS_QOS, OpenDDS::DCPS::duration_to_time_value(), last_liveliness_activity_time_, last_liveliness_check_time_, listener_for(), DDS::DataWriterQos::liveliness, liveliness_asserted_, liveliness_check_interval_, liveliness_lost_, DDS::LIVELINESS_LOST_STATUS, liveliness_lost_status_, DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS, DDS::MANUAL_BY_TOPIC_LIVELINESS_QOS, qos_, reactor_, DDS::LivelinessLostStatus::total_count, and DDS::LivelinessLostStatus::total_count_change.
02301 { 02302 const ACE_Time_Value delta = tv - last_liveliness_check_time_; 02303 if (delta < liveliness_check_interval_) { 02304 // Too early. Reschedule. 02305 if (reactor_->cancel_timer(this) == -1) { 02306 ACE_ERROR((LM_ERROR, 02307 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::handle_timeout: %p.\n"), 02308 ACE_TEXT("cancel_timer"))); 02309 } 02310 if (reactor_->schedule_timer(this, 0, liveliness_check_interval_ - delta, liveliness_check_interval_) == -1) { 02311 ACE_ERROR((LM_ERROR, 02312 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::handle_timeout: %p.\n"), 02313 ACE_TEXT("schedule_timer"))); 02314 } 02315 return 0; 02316 } 02317 02318 bool liveliness_lost = false; 02319 02320 ACE_Time_Value elapsed = tv - last_liveliness_activity_time_; 02321 02322 // Do we need to send a liveliness message? 02323 if (elapsed >= liveliness_check_interval_) { 02324 switch (this->qos_.liveliness.kind) { 02325 case DDS::AUTOMATIC_LIVELINESS_QOS: 02326 if (this->send_liveliness(tv) == false) { 02327 liveliness_lost = true; 02328 } 02329 break; 02330 02331 case DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS: 02332 if (liveliness_asserted_) { 02333 if (this->send_liveliness(tv) == false) { 02334 liveliness_lost = true; 02335 } 02336 } 02337 break; 02338 02339 case DDS::MANUAL_BY_TOPIC_LIVELINESS_QOS: 02340 // Do nothing. 02341 break; 02342 } 02343 } 02344 02345 liveliness_asserted_ = false; 02346 last_liveliness_check_time_ = tv; 02347 elapsed = tv - last_liveliness_activity_time_; 02348 02349 // Have we lost liveliness? 02350 if (elapsed >= duration_to_time_value(qos_.liveliness.lease_duration)) { 02351 liveliness_lost = true; 02352 } 02353 02354 if (!this->liveliness_lost_ && liveliness_lost) { 02355 ++ this->liveliness_lost_status_.total_count; 02356 ++ this->liveliness_lost_status_.total_count_change; 02357 02358 DDS::DataWriterListener_var listener = 02359 listener_for(DDS::LIVELINESS_LOST_STATUS); 02360 02361 if (!CORBA::is_nil(listener.in())) { 02362 listener->on_liveliness_lost(this->dw_local_objref_.in(), 02363 this->liveliness_lost_status_); 02364 } 02365 } 02366 02367 this->liveliness_lost_ = liveliness_lost; 02368 return 0; 02369 }
void OpenDDS::DCPS::DataWriterImpl::inconsistent_topic | ( | ) | [virtual] |
Implements OpenDDS::DCPS::DataWriterCallbacks.
Definition at line 886 of file DataWriterImpl.cpp.
References OpenDDS::DCPS::TopicImpl::inconsistent_topic(), and topic_servant_.
00887 { 00888 topic_servant_->inconsistent_topic(); 00889 }
void OpenDDS::DCPS::DataWriterImpl::init | ( | DDS::Topic_ptr | topic, | |
TopicImpl * | topic_servant, | |||
const DDS::DataWriterQos & | qos, | |||
DDS::DataWriterListener_ptr | a_listener, | |||
const DDS::StatusMask & | mask, | |||
OpenDDS::DCPS::DomainParticipantImpl * | participant_servant, | |||
OpenDDS::DCPS::PublisherImpl * | publisher_servant, | |||
DDS::DataWriter_ptr | dw_local | |||
) | [virtual] |
Initialize the data members.
Definition at line 155 of file DataWriterImpl.cpp.
References OpenDDS::DCPS::TopicImpl::add_entity_ref(), OpenDDS::DCPS::BUILT_IN_PARTICIPANT_TOPIC, OpenDDS::DCPS::BUILT_IN_PUBLICATION_TOPIC, OpenDDS::DCPS::BUILT_IN_SUBSCRIPTION_TOPIC, OpenDDS::DCPS::BUILT_IN_TOPIC_TOPIC, DBG_ENTRY_LVL, domain_id_, dw_local_objref_, OpenDDS::DCPS::DomainParticipantImpl::get_domain_id(), OpenDDS::DCPS::TopicImpl::get_id(), OpenDDS::DCPS::TopicDescriptionImpl::get_name(), OpenDDS::DCPS::TopicDescriptionImpl::get_type_name(), initialized_, is_bit_, listener_, listener_mask_, participant_servant_, publisher_servant_, qos_, reactor_, TheServiceParticipant, topic_id_, topic_name_, topic_objref_, topic_servant_, and type_name_.
Referenced by OpenDDS::DCPS::PublisherImpl::create_datawriter(), and OpenDDS::DCPS::DataWriterImpl_T< MessageType >::init().
00164 { 00165 DBG_ENTRY_LVL("DataWriterImpl","init",6); 00166 topic_objref_ = DDS::Topic::_duplicate(topic); 00167 topic_servant_ = topic_servant; 00168 topic_servant_->_add_ref(); 00169 topic_servant_->add_entity_ref(); 00170 topic_name_ = topic_servant_->get_name(); 00171 topic_id_ = topic_servant_->get_id(); 00172 type_name_ = topic_servant_->get_type_name(); 00173 00174 #if !defined (DDS_HAS_MINIMUM_BIT) 00175 is_bit_ = ACE_OS::strcmp(topic_name_.in(), BUILT_IN_PARTICIPANT_TOPIC) == 0 00176 || ACE_OS::strcmp(topic_name_.in(), BUILT_IN_TOPIC_TOPIC) == 0 00177 || ACE_OS::strcmp(topic_name_.in(), BUILT_IN_SUBSCRIPTION_TOPIC) == 0 00178 || ACE_OS::strcmp(topic_name_.in(), BUILT_IN_PUBLICATION_TOPIC) == 0; 00179 #endif // !defined (DDS_HAS_MINIMUM_BIT) 00180 00181 qos_ = qos; 00182 00183 //Note: OK to _duplicate(nil). 00184 listener_ = DDS::DataWriterListener::_duplicate(a_listener); 00185 listener_mask_ = mask; 00186 00187 // Only store the participant pointer, since it is our "grand" 00188 // parent, we will exist as long as it does. 00189 participant_servant_ = participant_servant; 00190 domain_id_ = participant_servant_->get_domain_id(); 00191 00192 // Only store the publisher pointer, since it is our parent, we will 00193 // exist as long as it does. 00194 publisher_servant_ = publisher_servant; 00195 dw_local_objref_ = DDS::DataWriter::_duplicate(dw_local); 00196 00197 this->reactor_ = TheServiceParticipant->timer(); 00198 00199 initialized_ = true; 00200 }
DDS::DataWriterListener_ptr OpenDDS::DCPS::DataWriterImpl::listener_for | ( | DDS::StatusKind | kind | ) |
This is used to retrieve the listener for a certain status change.
If this datawriter has a registered listener and the status kind is in the listener mask then the listener is returned. Otherwise, the query for the listener is propagated up to the factory/publisher.
Definition at line 2285 of file DataWriterImpl.cpp.
References listener_, OpenDDS::DCPS::PublisherImpl::listener_for(), listener_mask_, and publisher_servant_.
Referenced by association_complete_i(), OpenDDS::DCPS::OfferedDeadlineWatchdog::execute(), handle_timeout(), and update_incompatible_qos().
02286 { 02287 // per 2.1.4.3.1 Listener Access to Plain Communication Status 02288 // use this entities factory if listener is mask not enabled 02289 // for this kind. 02290 if (CORBA::is_nil(listener_.in()) || (listener_mask_ & kind) == 0) { 02291 return publisher_servant_->listener_for(kind); 02292 02293 } else { 02294 return DDS::DataWriterListener::_duplicate(listener_.in()); 02295 } 02296 }
ACE_Time_Value OpenDDS::DCPS::DataWriterImpl::liveliness_check_interval | ( | DDS::LivelinessQosPolicyKind | kind | ) |
Definition at line 1142 of file DataWriterImpl.cpp.
References liveliness_check_interval_.
Referenced by OpenDDS::DCPS::DomainParticipantImpl::LivelinessTimer::add_adjust().
01143 { 01144 if (this->qos_.liveliness.kind == kind) { 01145 return liveliness_check_interval_; 01146 } else { 01147 return ACE_Time_Value::max_time; 01148 } 01149 }
bool OpenDDS::DCPS::DataWriterImpl::lookup_instance_handles | ( | const ReaderIdSeq & | ids, | |
DDS::InstanceHandleSeq & | hdls | |||
) | [private] |
Lookup the instance handles by the subscription repo ids.
Definition at line 2547 of file DataWriterImpl.cpp.
References OpenDDS::DCPS::DCPS_debug_level, and OPENDDS_STRING.
Referenced by notify_publication_disconnected(), and notify_publication_lost().
02549 { 02550 if (DCPS_debug_level > 9) { 02551 CORBA::ULong const size = ids.length(); 02552 OPENDDS_STRING separator; 02553 OPENDDS_STRING buffer; 02554 02555 for (unsigned long i = 0; i < size; ++i) { 02556 buffer += separator + OPENDDS_STRING(GuidConverter(ids[i])); 02557 separator = ", "; 02558 } 02559 02560 ACE_DEBUG((LM_DEBUG, 02561 ACE_TEXT("(%P|%t) DataWriterImpl::lookup_instance_handles: ") 02562 ACE_TEXT("searching for handles for reader Ids: %C.\n"), 02563 buffer.c_str())); 02564 } 02565 02566 CORBA::ULong const num_rds = ids.length(); 02567 hdls.length(num_rds); 02568 02569 for (CORBA::ULong i = 0; i < num_rds; ++i) { 02570 hdls[i] = this->participant_servant_->id_to_handle(ids[i]); 02571 } 02572 02573 return true; 02574 }
bool OpenDDS::DCPS::DataWriterImpl::need_sequence_repair | ( | ) | [private] |
Definition at line 2633 of file DataWriterImpl.cpp.
References need_sequence_repair_i().
Referenced by create_control_message(), and create_sample_data_message().
02634 { 02635 ACE_GUARD_RETURN(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_, false); 02636 return need_sequence_repair_i(); 02637 }
bool OpenDDS::DCPS::DataWriterImpl::need_sequence_repair_i | ( | ) | const [private] |
Definition at line 2640 of file DataWriterImpl.cpp.
References reader_info_, and sequence_number_.
Referenced by need_sequence_repair().
02641 { 02642 for (RepoIdToReaderInfoMap::const_iterator it = reader_info_.begin(), 02643 end = reader_info_.end(); it != end; ++it) { 02644 if (it->second.expected_sequence_ != sequence_number_) { 02645 return true; 02646 } 02647 } 02648 02649 return false; 02650 }
void OpenDDS::DCPS::DataWriterImpl::notify_connection_deleted | ( | const RepoId & | peerId | ) | [virtual] |
Implements OpenDDS::DCPS::TransportSendListener.
Definition at line 2533 of file DataWriterImpl.cpp.
References DBG_ENTRY_LVL, and OpenDDS::DCPS::TransportClient::on_notification_of_connection_deletion().
02534 { 02535 DBG_ENTRY_LVL("DataWriterImpl","notify_connection_deleted",6); 02536 on_notification_of_connection_deletion(peerId); 02537 // Narrow to DDS::DCPS::DataWriterListener. If a DDS::DataWriterListener 02538 // is given to this DataWriter then narrow() fails. 02539 DataWriterListener_var the_listener = 02540 DataWriterListener::_narrow(this->listener_.in()); 02541 02542 if (!CORBA::is_nil(the_listener.in())) 02543 the_listener->on_connection_deleted(this->dw_local_objref_.in()); 02544 }
void OpenDDS::DCPS::DataWriterImpl::notify_publication_disconnected | ( | const ReaderIdSeq & | subids | ) | [virtual] |
Implements OpenDDS::DCPS::TransportSendListener.
Definition at line 2425 of file DataWriterImpl.cpp.
References DBG_ENTRY_LVL, is_bit_, lookup_instance_handles(), and OpenDDS::DCPS::PublicationLostStatus::subscription_handles.
02426 { 02427 DBG_ENTRY_LVL("DataWriterImpl","notify_publication_disconnected",6); 02428 02429 if (!is_bit_) { 02430 // Narrow to DDS::DCPS::DataWriterListener. If a DDS::DataWriterListener 02431 // is given to this DataWriter then narrow() fails. 02432 DataWriterListener_var the_listener = 02433 DataWriterListener::_narrow(this->listener_.in()); 02434 02435 if (!CORBA::is_nil(the_listener.in())) { 02436 PublicationDisconnectedStatus status; 02437 // Since this callback may come after remove_association which 02438 // removes the reader from id_to_handle map, we can ignore this 02439 // error. 02440 this->lookup_instance_handles(subids, 02441 status.subscription_handles); 02442 the_listener->on_publication_disconnected(this->dw_local_objref_.in(), 02443 status); 02444 } 02445 } 02446 }
void OpenDDS::DCPS::DataWriterImpl::notify_publication_lost | ( | const DDS::InstanceHandleSeq & | handles | ) | [private] |
Definition at line 2505 of file DataWriterImpl.cpp.
References DBG_ENTRY_LVL, is_bit_, and OpenDDS::DCPS::PublicationLostStatus::subscription_handles.
02506 { 02507 DBG_ENTRY_LVL("DataWriterImpl","notify_publication_lost",6); 02508 02509 if (!is_bit_) { 02510 // Narrow to DDS::DCPS::DataWriterListener. If a 02511 // DDS::DataWriterListener is given to this DataWriter then 02512 // narrow() fails. 02513 DataWriterListener_var the_listener = 02514 DataWriterListener::_narrow(this->listener_.in()); 02515 02516 if (!CORBA::is_nil(the_listener.in())) { 02517 PublicationLostStatus status; 02518 02519 CORBA::ULong len = handles.length(); 02520 status.subscription_handles.length(len); 02521 02522 for (CORBA::ULong i = 0; i < len; ++ i) { 02523 status.subscription_handles[i] = handles[i]; 02524 } 02525 02526 the_listener->on_publication_lost(this->dw_local_objref_.in(), 02527 status); 02528 } 02529 } 02530 }
void OpenDDS::DCPS::DataWriterImpl::notify_publication_lost | ( | const ReaderIdSeq & | subids | ) | [virtual] |
Implements OpenDDS::DCPS::TransportSendListener.
Definition at line 2480 of file DataWriterImpl.cpp.
References DBG_ENTRY_LVL, is_bit_, lookup_instance_handles(), and OpenDDS::DCPS::PublicationLostStatus::subscription_handles.
02481 { 02482 DBG_ENTRY_LVL("DataWriterImpl","notify_publication_lost",6); 02483 02484 if (!is_bit_) { 02485 // Narrow to DDS::DCPS::DataWriterListener. If a 02486 // DDS::DataWriterListener is given to this DataWriter then 02487 // narrow() fails. 02488 DataWriterListener_var the_listener = 02489 DataWriterListener::_narrow(this->listener_.in()); 02490 02491 if (!CORBA::is_nil(the_listener.in())) { 02492 PublicationLostStatus status; 02493 02494 // Since this callback may come after remove_association which removes 02495 // the reader from id_to_handle map, we can ignore this error. 02496 this->lookup_instance_handles(subids, 02497 status.subscription_handles); 02498 the_listener->on_publication_lost(this->dw_local_objref_.in(), 02499 status); 02500 } 02501 } 02502 }
void OpenDDS::DCPS::DataWriterImpl::notify_publication_reconnected | ( | const ReaderIdSeq & | subids | ) | [virtual] |
Implements OpenDDS::DCPS::TransportSendListener.
Definition at line 2449 of file DataWriterImpl.cpp.
References DBG_ENTRY_LVL, is_bit_, and OpenDDS::DCPS::PublicationLostStatus::subscription_handles.
02450 { 02451 DBG_ENTRY_LVL("DataWriterImpl","notify_publication_reconnected",6); 02452 02453 if (!is_bit_) { 02454 // Narrow to DDS::DCPS::DataWriterListener. If a 02455 // DDS::DataWriterListener is given to this DataWriter then 02456 // narrow() fails. 02457 DataWriterListener_var the_listener = 02458 DataWriterListener::_narrow(this->listener_.in()); 02459 02460 if (!CORBA::is_nil(the_listener.in())) { 02461 PublicationDisconnectedStatus status; 02462 02463 // If it's reconnected then the reader should be in id_to_handle 02464 // map otherwise log with an error. 02465 if (this->lookup_instance_handles(subids, 02466 status.subscription_handles) == false) { 02467 ACE_ERROR((LM_ERROR, 02468 "(%P|%t) ERROR: DataWriterImpl::" 02469 "notify_publication_reconnected: " 02470 "lookup_instance_handles failed\n")); 02471 } 02472 02473 the_listener->on_publication_reconnected(this->dw_local_objref_.in(), 02474 status); 02475 } 02476 } 02477 }
DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::num_samples | ( | DDS::InstanceHandle_t | handle, | |
size_t & | size | |||
) |
Return the number of samples for a given instance.
Definition at line 1900 of file DataWriterImpl.cpp.
References data_container_, and OpenDDS::DCPS::WriteDataContainer::num_samples().
01902 { 01903 return data_container_->num_samples(handle, size); 01904 }
typedef OpenDDS::DCPS::DataWriterImpl::OPENDDS_MAP_CMP | ( | RepoId | , | |
DDS::InstanceHandle_t | , | |||
GUID_tKeyLessThan | ||||
) | [private] |
typedef OpenDDS::DCPS::DataWriterImpl::OPENDDS_MAP_CMP | ( | RepoId | , | |
ReaderInfo | , | |||
GUID_tKeyLessThan | ||||
) | [protected] |
typedef OpenDDS::DCPS::DataWriterImpl::OPENDDS_MAP_CMP | ( | RepoId | , | |
SequenceNumber | , | |||
GUID_tKeyLessThan | ||||
) |
typedef OpenDDS::DCPS::DataWriterImpl::OPENDDS_VECTOR | ( | DDS::InstanceHandle_t | ) |
EntityImpl * OpenDDS::DCPS::DataWriterImpl::parent | ( | ) | const [virtual] |
Reimplemented from OpenDDS::DCPS::EntityImpl.
Definition at line 2143 of file DataWriterImpl.cpp.
References publisher_servant_.
02144 { 02145 return this->publisher_servant_; 02146 }
bool OpenDDS::DCPS::DataWriterImpl::participant_liveliness_activity_after | ( | const ACE_Time_Value & | tv | ) |
Definition at line 1152 of file DataWriterImpl.cpp.
References last_liveliness_activity_time_, and DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS.
01153 { 01154 if (this->qos_.liveliness.kind == DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS) { 01155 return last_liveliness_activity_time_ > tv; 01156 } else { 01157 return false; 01158 } 01159 }
bool OpenDDS::DCPS::DataWriterImpl::pending_control | ( | ) | [protected] |
Answer if transport of all control messages is pending.
Definition at line 2593 of file DataWriterImpl.cpp.
References controlTracker, and OpenDDS::DCPS::MessageTracker::pending_messages().
02594 { 02595 return controlTracker.pending_messages(); 02596 }
bool OpenDDS::DCPS::DataWriterImpl::persist_data | ( | ) |
Make sent data available beyond the lifetime of this DataWriter
.
Definition at line 2578 of file DataWriterImpl.cpp.
References data_container_, and OpenDDS::DCPS::WriteDataContainer::persist_data().
Referenced by OpenDDS::DCPS::PublisherImpl::delete_datawriter().
02579 { 02580 return this->data_container_->persist_data(); 02581 }
void OpenDDS::DCPS::DataWriterImpl::prepare_to_delete | ( | ) | [protected] |
Definition at line 2406 of file DataWriterImpl.cpp.
References OpenDDS::DCPS::EntityImpl::set_deleted(), and OpenDDS::DCPS::TransportClient::stop_associating().
Referenced by OpenDDS::DCPS::PublisherImpl::delete_datawriter().
02407 { 02408 this->set_deleted(true); 02409 this->stop_associating(); 02410 }
void OpenDDS::DCPS::DataWriterImpl::register_for_reader | ( | const RepoId & | participant, | |
const RepoId & | writerid, | |||
const RepoId & | readerid, | |||
const TransportLocatorSeq & | locators, | |||
DiscoveryListener * | listener | |||
) | [virtual] |
Reimplemented from OpenDDS::DCPS::DataWriterCallbacks.
Definition at line 803 of file DataWriterImpl.cpp.
References OpenDDS::DCPS::TransportClient::register_for_reader().
00808 { 00809 TransportClient::register_for_reader(participant, writerid, readerid, locators, listener); 00810 }
DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::register_instance_from_durable_data | ( | DDS::InstanceHandle_t & | handle, | |
DataSample * | data, | |||
const DDS::Time_t & | source_timestamp | |||
) |
Delegate to the WriteDataContainer to register and tell the transport to broadcast the registered instance.
Definition at line 1529 of file DataWriterImpl.cpp.
References DBG_ENTRY_LVL, get_lock(), register_instance_i(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, and send_all_to_flush_control().
Referenced by OpenDDS::DCPS::DataDurabilityCache::get_data().
01532 { 01533 DBG_ENTRY_LVL("DataWriterImpl","register_instance_from_durable_data",6); 01534 01535 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 01536 guard, 01537 get_lock(), 01538 ::DDS::RETCODE_ERROR); 01539 01540 DDS::ReturnCode_t ret = register_instance_i(handle, data, source_timestamp); 01541 if (ret != DDS::RETCODE_OK) { 01542 ACE_ERROR_RETURN((LM_ERROR, 01543 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::register_instance_from_durable_data: ") 01544 ACE_TEXT("register instance with container failed.\n")), 01545 ret); 01546 } 01547 01548 send_all_to_flush_control(guard); 01549 01550 return ret; 01551 }
DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::register_instance_i | ( | DDS::InstanceHandle_t & | handle, | |
DataSample * | data, | |||
const DDS::Time_t & | source_timestamp | |||
) |
Delegate to the WriteDataContainer to register Must tell the transport to broadcast the registered instance upon returning.
Definition at line 1469 of file DataWriterImpl.cpp.
References create_control_message(), data_container_, DBG_ENTRY_LVL, OpenDDS::DCPS::EntityImpl::enabled_, OpenDDS::DCPS::WriteDataContainer::enqueue_control(), OpenDDS::DCPS::DataSampleElement::get_header(), OpenDDS::DCPS::INSTANCE_REGISTRATION, monitor_, OpenDDS::DCPS::WriteDataContainer::obtain_buffer_for_control(), OpenDDS::DCPS::WriteDataContainer::register_instance(), OpenDDS::DCPS::Monitor::report(), DDS::RETCODE_NOT_ENABLED, DDS::RETCODE_OK, and OpenDDS::DCPS::DataSampleElement::set_sample().
Referenced by register_instance_from_durable_data().
01472 { 01473 DBG_ENTRY_LVL("DataWriterImpl","register_instance_i",6); 01474 01475 if (enabled_ == false) { 01476 ACE_ERROR_RETURN((LM_ERROR, 01477 ACE_TEXT("(%P|%t) ERROR: ") 01478 ACE_TEXT("DataWriterImpl::register_instance_i: ") 01479 ACE_TEXT(" Entity is not enabled. \n")), 01480 DDS::RETCODE_NOT_ENABLED); 01481 } 01482 01483 DDS::ReturnCode_t ret = 01484 this->data_container_->register_instance(handle, data); 01485 01486 if (ret != DDS::RETCODE_OK) { 01487 ACE_ERROR_RETURN((LM_ERROR, 01488 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::register_instance_i: ") 01489 ACE_TEXT("register instance with container failed.\n")), 01490 ret); 01491 } 01492 01493 if (this->monitor_) { 01494 this->monitor_->report(); 01495 } 01496 01497 DataSampleElement* element = 0; 01498 ret = this->data_container_->obtain_buffer_for_control(element); 01499 01500 if (ret != DDS::RETCODE_OK) { 01501 ACE_ERROR_RETURN((LM_ERROR, 01502 ACE_TEXT("(%P|%t) ERROR: ") 01503 ACE_TEXT("DataWriterImpl::register_instance_i: ") 01504 ACE_TEXT("obtain_buffer_for_control returned %d.\n"), 01505 ret), 01506 ret); 01507 } 01508 01509 // Add header with the registration sample data. 01510 element->set_sample(create_control_message(INSTANCE_REGISTRATION, 01511 element->get_header(), 01512 data, 01513 source_timestamp)); 01514 01515 ret = this->data_container_->enqueue_control(element); 01516 01517 if (ret != DDS::RETCODE_OK) { 01518 ACE_ERROR_RETURN((LM_ERROR, 01519 ACE_TEXT("(%P|%t) ERROR: ") 01520 ACE_TEXT("DataWriterImpl::register_instance_i: ") 01521 ACE_TEXT("enqueue_control failed.\n")), 01522 ret); 01523 } 01524 01525 return ret; 01526 }
void OpenDDS::DCPS::DataWriterImpl::remove_all_associations | ( | ) |
Definition at line 751 of file DataWriterImpl.cpp.
References DBG_ENTRY_LVL, lock_, remove_associations(), and OpenDDS::DCPS::TransportClient::stop_associating().
Referenced by OpenDDS::DCPS::PublisherImpl::delete_datawriter().
00752 { 00753 DBG_ENTRY_LVL("DataWriterImpl", "remove_all_associations", 6); 00754 // stop pending associations 00755 this->stop_associating(); 00756 00757 OpenDDS::DCPS::ReaderIdSeq readers; 00758 CORBA::ULong size; 00759 CORBA::ULong num_pending_readers; 00760 { 00761 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, lock_); 00762 00763 num_pending_readers = static_cast<CORBA::ULong>(pending_readers_.size()); 00764 size = static_cast<CORBA::ULong>(readers_.size()) + num_pending_readers; 00765 readers.length(size); 00766 00767 RepoIdSet::iterator itEnd = readers_.end(); 00768 int i = 0; 00769 00770 for (RepoIdSet::iterator it = readers_.begin(); it != itEnd; ++it) { 00771 readers[i ++] = *it; 00772 } 00773 00774 itEnd = pending_readers_.end(); 00775 00776 for (RepoIdSet::iterator it = pending_readers_.begin(); it != itEnd; ++it) { 00777 readers[i ++] = *it; 00778 } 00779 00780 if (num_pending_readers > 0) { 00781 ACE_DEBUG((LM_WARNING, 00782 ACE_TEXT("(%P|%t) WARNING: DataWriterImpl::remove_all_associations() - ") 00783 ACE_TEXT("%d subscribers were pending and never fully associated.\n"), 00784 num_pending_readers)); 00785 } 00786 } 00787 00788 try { 00789 if (0 < size) { 00790 CORBA::Boolean dont_notify_lost = false; 00791 00792 this->remove_associations(readers, dont_notify_lost); 00793 } 00794 00795 } catch (const CORBA::Exception&) { 00796 ACE_DEBUG((LM_WARNING, 00797 ACE_TEXT("(%P|%t) WARNING: DataWriterImpl::remove_all_associations() - ") 00798 ACE_TEXT("caught exception from remove_associations.\n"))); 00799 } 00800 }
virtual void OpenDDS::DCPS::DataWriterImpl::remove_associations | ( | const ReaderIdSeq & | readers, | |
bool | callback | |||
) | [virtual] |
void OpenDDS::DCPS::DataWriterImpl::reschedule_deadline | ( | ) |
Definition at line 2585 of file DataWriterImpl.cpp.
References data_container_, and OpenDDS::DCPS::WriteDataContainer::reschedule_deadline().
Referenced by OpenDDS::DCPS::OfferedDeadlineWatchdog::reschedule_deadline().
02586 { 02587 if (this->watchdog_ != 0) { 02588 this->data_container_->reschedule_deadline(); 02589 } 02590 }
void OpenDDS::DCPS::DataWriterImpl::retrieve_inline_qos_data | ( | TransportSendListener::InlineQosData & | qos_data | ) | const [virtual] |
Reimplemented from OpenDDS::DCPS::TransportSendListener.
Definition at line 2625 of file DataWriterImpl.cpp.
References OpenDDS::DCPS::TransportSendListener::InlineQosData::dw_qos, OpenDDS::DCPS::PublisherImpl::get_qos(), OpenDDS::DCPS::TransportSendListener::InlineQosData::pub_qos, publisher_servant_, qos_, OpenDDS::DCPS::TransportSendListener::InlineQosData::topic_name, and topic_name_.
02626 { 02627 this->publisher_servant_->get_qos(qos_data.pub_qos); 02628 qos_data.dw_qos = this->qos_; 02629 qos_data.topic_name = this->topic_name_.in(); 02630 }
void OpenDDS::DCPS::DataWriterImpl::send_all_to_flush_control | ( | ACE_Guard< ACE_Recursive_Thread_Mutex > & | guard | ) |
Definition at line 1452 of file DataWriterImpl.cpp.
References controlTracker, DBG_ENTRY_LVL, get_unsent_data(), OpenDDS::DCPS::MessageTracker::message_sent(), and OpenDDS::DCPS::TransportClient::send().
Referenced by dispose(), dispose_and_unregister(), register_instance_from_durable_data(), and unregister_instance_i().
01453 { 01454 DBG_ENTRY_LVL("DataWriterImpl","send_all_to_flush_control",6); 01455 01456 SendStateDataSampleList list; 01457 01458 ACE_UINT64 transaction_id = this->get_unsent_data(list); 01459 01460 controlTracker.message_sent(); 01461 01462 //need to release guard to call down to transport 01463 guard.release(); 01464 01465 this->send(list, transaction_id); 01466 }
SendControlStatus OpenDDS::DCPS::DataWriterImpl::send_control | ( | const DataSampleHeader & | header, | |
ACE_Message_Block * | msg | |||
) | [protected, virtual] |
Reimplemented from OpenDDS::DCPS::TransportClient.
Definition at line 2653 of file DataWriterImpl.cpp.
References controlTracker, header, OpenDDS::DCPS::MessageTracker::message_dropped(), OpenDDS::DCPS::MessageTracker::message_sent(), OpenDDS::DCPS::TransportClient::send_control(), and OpenDDS::DCPS::SEND_CONTROL_OK.
02655 { 02656 controlTracker.message_sent(); 02657 02658 SendControlStatus status = TransportClient::send_control(header, msg); 02659 02660 if (status != SEND_CONTROL_OK) { 02661 controlTracker.message_dropped(); 02662 } 02663 02664 return status; 02665 }
DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::send_end_historic_samples | ( | const RepoId & | readerId | ) | [private] |
bool OpenDDS::DCPS::DataWriterImpl::send_liveliness | ( | const ACE_Time_Value & | now | ) | [private] |
Send the liveliness message.
Definition at line 2380 of file DataWriterImpl.cpp.
References create_control_message(), OpenDDS::DCPS::DATAWRITER_LIVELINESS, domain_id_, header, last_liveliness_activity_time_, DDS::MANUAL_BY_TOPIC_LIVELINESS_QOS, OpenDDS::DCPS::SEND_CONTROL_ERROR, TheServiceParticipant, and OpenDDS::DCPS::time_value_to_time().
02381 { 02382 if (this->qos_.liveliness.kind == DDS::MANUAL_BY_TOPIC_LIVELINESS_QOS || 02383 !TheServiceParticipant->get_discovery(domain_id_)->supports_liveliness()) { 02384 DDS::Time_t t = time_value_to_time(now); 02385 DataSampleHeader header; 02386 ACE_Message_Block* liveliness_msg = 02387 this->create_control_message(DATAWRITER_LIVELINESS, header, 0, t); 02388 02389 if (this->send_control(header, liveliness_msg) == SEND_CONTROL_ERROR) { 02390 ACE_ERROR_RETURN((LM_ERROR, 02391 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::send_liveliness: ") 02392 ACE_TEXT(" send_control failed. \n")), 02393 false); 02394 02395 } else { 02396 last_liveliness_activity_time_ = now; 02397 return true; 02398 } 02399 } else { 02400 last_liveliness_activity_time_ = now; 02401 return true; 02402 } 02403 }
void OpenDDS::DCPS::DataWriterImpl::send_suspended_data | ( | ) |
Called by the PublisherImpl to indicate that the Publisher is now resumed and any data collected while it was suspended should now be sent.
Definition at line 1822 of file DataWriterImpl.cpp.
References available_data_list_, max_suspended_transaction_id_, min_suspended_transaction_id_, OpenDDS::DCPS::SendStateDataSampleList::reset(), and OpenDDS::DCPS::TransportClient::send().
01823 { 01824 //this serves to get TransportClient's max_transaction_id_seen_ 01825 //to the correct value for this list of transactions 01826 if (max_suspended_transaction_id_ != 0) { 01827 this->send(this->available_data_list_, max_suspended_transaction_id_); 01828 max_suspended_transaction_id_ = 0; 01829 } 01830 01831 //this serves to actually have the send proceed in 01832 //sending the samples to the datalinks by passing it 01833 //the min_suspended_transaction_id_ which should be the 01834 //TransportClient's expected_transaction_id_ 01835 this->send(this->available_data_list_, min_suspended_transaction_id_); 01836 min_suspended_transaction_id_ = 0; 01837 this->available_data_list_.reset(); 01838 }
DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::set_listener | ( | DDS::DataWriterListener_ptr | a_listener, | |
DDS::StatusMask | mask | |||
) | [virtual] |
Definition at line 972 of file DataWriterImpl.cpp.
References listener_, listener_mask_, and DDS::RETCODE_OK.
00974 { 00975 listener_mask_ = mask; 00976 //note: OK to duplicate a nil object ref 00977 listener_ = DDS::DataWriterListener::_duplicate(a_listener); 00978 return DDS::RETCODE_OK; 00979 }
DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::set_qos | ( | const DDS::DataWriterQos & | qos | ) | [virtual] |
Definition at line 892 of file DataWriterImpl.cpp.
References OpenDDS::DCPS::Watchdog::cancel_all(), OpenDDS::DCPS::Qos_Helper::changeable(), OpenDDS::DCPS::Qos_Helper::consistent(), OpenDDS::DCPS::ReactorInterceptor::destroy(), domain_id_, DDS::DURATION_INFINITE_NSEC, DDS::DURATION_INFINITE_SEC, OpenDDS::DCPS::duration_to_time_value(), OpenDDS::DCPS::EntityImpl::enabled_, OpenDDS::DCPS::DomainParticipantImpl::get_id(), OpenDDS::DCPS::PublisherImpl::get_qos(), last_deadline_missed_total_count_, offered_deadline_missed_status_, OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK, OPENDDS_NO_DURABILITY_SERVICE_COMPATIBILITY_CHECK, OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK, OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK, OPENDDS_NO_OWNERSHIP_STRENGTH_COMPATIBILITY_CHECK, participant_servant_, publication_id_, publisher_servant_, qos_, OpenDDS::DCPS::Watchdog::reset_interval(), DDS::RETCODE_ERROR, DDS::RETCODE_IMMUTABLE_POLICY, DDS::RETCODE_INCONSISTENT_POLICY, DDS::RETCODE_OK, DDS::RETCODE_UNSUPPORTED, TheServiceParticipant, OpenDDS::DCPS::Qos_Helper::valid(), and watchdog_.
00893 { 00894 00895 OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED); 00896 OPENDDS_NO_OWNERSHIP_STRENGTH_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED); 00897 OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED); 00898 OPENDDS_NO_DURABILITY_SERVICE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED); 00899 OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED); 00900 00901 if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) { 00902 if (qos_ == qos) 00903 return DDS::RETCODE_OK; 00904 00905 if (!Qos_Helper::changeable(qos_, qos) && enabled_ == true) { 00906 return DDS::RETCODE_IMMUTABLE_POLICY; 00907 00908 } else { 00909 Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_); 00910 DDS::PublisherQos publisherQos; 00911 this->publisher_servant_->get_qos(publisherQos); 00912 const bool status 00913 = disco->update_publication_qos(this->participant_servant_->get_domain_id(), 00914 this->participant_servant_->get_id(), 00915 this->publication_id_, 00916 qos, 00917 publisherQos); 00918 00919 if (!status) { 00920 ACE_ERROR_RETURN((LM_ERROR, 00921 ACE_TEXT("(%P|%t) DataWriterImpl::set_qos, ") 00922 ACE_TEXT("qos not updated. \n")), 00923 DDS::RETCODE_ERROR); 00924 } 00925 } 00926 00927 if (!(qos_ == qos)) { 00928 // Reset the deadline timer if the period has changed. 00929 if (qos_.deadline.period.sec != qos.deadline.period.sec 00930 || qos_.deadline.period.nanosec != qos.deadline.period.nanosec) { 00931 if (qos_.deadline.period.sec == DDS::DURATION_INFINITE_SEC 00932 && qos_.deadline.period.nanosec == DDS::DURATION_INFINITE_NSEC) { 00933 this->watchdog_ = 00934 new OfferedDeadlineWatchdog( 00935 this->lock_, 00936 qos.deadline, 00937 this, 00938 this->dw_local_objref_.in(), 00939 this->offered_deadline_missed_status_, 00940 this->last_deadline_missed_total_count_); 00941 00942 } else if (qos.deadline.period.sec == DDS::DURATION_INFINITE_SEC 00943 && qos.deadline.period.nanosec == DDS::DURATION_INFINITE_NSEC) { 00944 this->watchdog_->cancel_all(); 00945 this->watchdog_->destroy(); 00946 this->watchdog_ = 0; 00947 00948 } else { 00949 this->watchdog_->reset_interval( 00950 duration_to_time_value(qos.deadline.period)); 00951 } 00952 } 00953 00954 qos_ = qos; 00955 } 00956 00957 return DDS::RETCODE_OK; 00958 00959 } else { 00960 return DDS::RETCODE_INCONSISTENT_POLICY; 00961 } 00962 }
bool OpenDDS::DCPS::DataWriterImpl::should_ack | ( | ) | const |
Does this writer have samples to be acknowledged?
Definition at line 994 of file DataWriterImpl.cpp.
00995 { 00996 // N.B. It may be worthwhile to investigate a more efficient 00997 // heuristic for determining if a writer should send SAMPLE_ACK 00998 // control samples. Perhaps based on a sequence number delta? 00999 return this->readers_.size() != 0; 01000 }
void OpenDDS::DCPS::DataWriterImpl::track_sequence_number | ( | GUIDSeq * | filter_out | ) | [private] |
Definition at line 1789 of file DataWriterImpl.cpp.
References reader_info_, and sequence_number_.
Referenced by write().
01790 { 01791 ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_); 01792 01793 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC 01794 // Track individual expected sequence numbers in ReaderInfo 01795 RepoIdSet excluded; 01796 01797 if (filter_out && !reader_info_.empty()) { 01798 const GUID_t* buf = filter_out->get_buffer(); 01799 excluded.insert(buf, buf + filter_out->length()); 01800 } 01801 01802 for (RepoIdToReaderInfoMap::iterator iter = reader_info_.begin(), 01803 end = reader_info_.end(); iter != end; ++iter) { 01804 // If not excluding this reader, update expected sequence 01805 if (excluded.count(iter->first) == 0) { 01806 iter->second.expected_sequence_ = sequence_number_; 01807 } 01808 } 01809 01810 #else 01811 ACE_UNUSED_ARG(filter_out); 01812 for (RepoIdToReaderInfoMap::iterator iter = reader_info_.begin(), 01813 end = reader_info_.end(); iter != end; ++iter) { 01814 iter->second.expected_sequence_ = sequence_number_; 01815 } 01816 01817 #endif // OPENDDS_NO_CONTENT_FILTERED_TOPIC 01818 01819 }
void OpenDDS::DCPS::DataWriterImpl::transport_assoc_done | ( | int | flags, | |
const RepoId & | remote_id | |||
) | [virtual] |
Reimplemented from OpenDDS::DCPS::TransportClient.
Definition at line 279 of file DataWriterImpl.cpp.
References OpenDDS::DCPS::TransportClient::ASSOC_ACTIVE, assoc_complete_readers_, OpenDDS::DCPS::TransportClient::ASSOC_OK, association_complete_i(), DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, domain_id_, OpenDDS::DCPS::DomainParticipantImpl::get_id(), OpenDDS::DCPS::insert(), lock_, OPENDDS_STRING, participant_servant_, pending_readers_, publication_id_, and TheServiceParticipant.
00280 { 00281 DBG_ENTRY_LVL("DataWriterImpl", "transport_assoc_done", 6); 00282 00283 if (!(flags & ASSOC_OK)) { 00284 if (DCPS_debug_level) { 00285 const GuidConverter conv(remote_id); 00286 ACE_DEBUG((LM_ERROR, 00287 ACE_TEXT("(%P|%t) DataWriterImpl::transport_assoc_done: ") 00288 ACE_TEXT("ERROR: transport layer failed to associate %C\n"), 00289 OPENDDS_STRING(conv).c_str())); 00290 } 00291 00292 return; 00293 } 00294 if (DCPS_debug_level) { 00295 const GuidConverter writer_conv(publication_id_); 00296 const GuidConverter conv(remote_id); 00297 ACE_DEBUG((LM_INFO, 00298 ACE_TEXT("(%P|%t) DataWriterImpl::transport_assoc_done: ") 00299 ACE_TEXT(" writer %C succeeded in associating with reader %C\n"), 00300 OPENDDS_STRING(writer_conv).c_str(), 00301 OPENDDS_STRING(conv).c_str())); 00302 } 00303 if (flags & ASSOC_ACTIVE) { 00304 00305 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, lock_); 00306 00307 // Have we already received an association_complete() callback? 00308 if (assoc_complete_readers_.count(remote_id)) { 00309 if (DCPS_debug_level) { 00310 const GuidConverter writer_conv(publication_id_); 00311 const GuidConverter converter(remote_id); 00312 ACE_DEBUG((LM_DEBUG, 00313 ACE_TEXT("(%P|%t) DataWriterImpl::transport_assoc_done: ") 00314 ACE_TEXT("writer %C found assoc_complete_reader %C, continue with association_complete_i\n"), 00315 OPENDDS_STRING(writer_conv).c_str(), 00316 OPENDDS_STRING(converter).c_str())); 00317 } 00318 assoc_complete_readers_.erase(remote_id); 00319 association_complete_i(remote_id); 00320 00321 // Add to pending_readers_ -> pending means we are waiting 00322 // for the association_complete() callback. 00323 00324 } else if (OpenDDS::DCPS::insert(pending_readers_, remote_id) == -1) { 00325 const GuidConverter converter(remote_id); 00326 ACE_ERROR((LM_ERROR, 00327 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::transport_assoc_done: ") 00328 ACE_TEXT("failed to mark %C as pending.\n"), 00329 OPENDDS_STRING(converter).c_str())); 00330 00331 } else { 00332 if (DCPS_debug_level) { 00333 const GuidConverter converter(remote_id); 00334 ACE_DEBUG((LM_DEBUG, 00335 ACE_TEXT("(%P|%t) DataWriterImpl::transport_assoc_done: ") 00336 ACE_TEXT("marked %C as pending.\n"), 00337 OPENDDS_STRING(converter).c_str())); 00338 } 00339 } 00340 00341 } else { 00342 // In the current implementation, DataWriter is always active, so this 00343 // code will not be applicable. 00344 if (DCPS_debug_level) { 00345 const GuidConverter conv(publication_id_); 00346 ACE_DEBUG((LM_ERROR, 00347 ACE_TEXT("(%P|%t) DataWriterImpl::transport_assoc_done: ") 00348 ACE_TEXT("ERROR: DataWriter (%C) should always be active in current implementation\n"), 00349 OPENDDS_STRING(conv).c_str())); 00350 } 00351 Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_); 00352 disco->association_complete(domain_id_, participant_servant_->get_id(), 00353 publication_id_, remote_id); 00354 } 00355 }
void OpenDDS::DCPS::DataWriterImpl::unregister_all | ( | ) |
Delegate to WriteDataContainer to unregister all instances.
Definition at line 1907 of file DataWriterImpl.cpp.
References cancel_timer_, data_container_, reactor_, and OpenDDS::DCPS::WriteDataContainer::unregister_all().
Referenced by OpenDDS::DCPS::PublisherImpl::delete_datawriter().
01908 { 01909 if (cancel_timer_) { 01910 // The cancel_timer will call handle_close to remove_ref. 01911 (void) reactor_->cancel_timer(this, 0); 01912 cancel_timer_ = false; 01913 } 01914 01915 data_container_->unregister_all(); 01916 }
void OpenDDS::DCPS::DataWriterImpl::unregister_for_reader | ( | const RepoId & | participant, | |
const RepoId & | writerid, | |||
const RepoId & | readerid | |||
) | [virtual] |
Reimplemented from OpenDDS::DCPS::DataWriterCallbacks.
Definition at line 813 of file DataWriterImpl.cpp.
References OpenDDS::DCPS::TransportClient::unregister_for_reader().
00816 { 00817 TransportClient::unregister_for_reader(participant, writerid, readerid); 00818 }
DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::unregister_instance_i | ( | DDS::InstanceHandle_t | handle, | |
const DDS::Time_t & | source_timestamp | |||
) |
Delegate to the WriteDataContainer to unregister and tell the transport to broadcast the unregistered instance.
Definition at line 1554 of file DataWriterImpl.cpp.
References create_control_message(), data_container_, DBG_ENTRY_LVL, dispose_and_unregister(), OpenDDS::DCPS::EntityImpl::enabled_, OpenDDS::DCPS::WriteDataContainer::enqueue_control(), OpenDDS::DCPS::DataSampleElement::get_header(), get_lock(), OpenDDS::DCPS::WriteDataContainer::obtain_buffer_for_control(), DDS::RETCODE_ERROR, DDS::RETCODE_NOT_ENABLED, DDS::RETCODE_OK, send_all_to_flush_control(), OpenDDS::DCPS::DataSampleElement::set_sample(), OpenDDS::DCPS::WriteDataContainer::unregister(), and OpenDDS::DCPS::UNREGISTER_INSTANCE.
Referenced by OpenDDS::DCPS::DataWriterImpl_T< MessageType >::unregister_instance_w_timestamp().
01556 { 01557 DBG_ENTRY_LVL("DataWriterImpl","unregister_instance_i",6); 01558 01559 if (enabled_ == false) { 01560 ACE_ERROR_RETURN((LM_ERROR, 01561 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::unregister_instance_i: ") 01562 ACE_TEXT(" Entity is not enabled.\n")), 01563 DDS::RETCODE_NOT_ENABLED); 01564 } 01565 01566 // According to spec 1.2, autodispose_unregistered_instances true causes 01567 // dispose on the instance prior to calling unregister operation. 01568 if (this->qos_.writer_data_lifecycle.autodispose_unregistered_instances) { 01569 return this->dispose_and_unregister(handle, source_timestamp); 01570 } 01571 01572 DDS::ReturnCode_t ret = DDS::RETCODE_ERROR; 01573 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, get_lock(), ret); 01574 DataSample* unregistered_sample_data = 0; 01575 ret = this->data_container_->unregister(handle, unregistered_sample_data); 01576 01577 if (ret != DDS::RETCODE_OK) { 01578 ACE_ERROR_RETURN((LM_ERROR, 01579 ACE_TEXT("(%P|%t) ERROR: ") 01580 ACE_TEXT("DataWriterImpl::unregister_instance_i: ") 01581 ACE_TEXT(" unregister with container failed. \n")), 01582 ret); 01583 } 01584 01585 DataSampleElement* element = 0; 01586 ret = this->data_container_->obtain_buffer_for_control(element); 01587 01588 if (ret != DDS::RETCODE_OK) { 01589 ACE_ERROR_RETURN((LM_ERROR, 01590 ACE_TEXT("(%P|%t) ERROR: ") 01591 ACE_TEXT("DataWriterImpl::unregister_instance_i: ") 01592 ACE_TEXT("obtain_buffer_for_control returned %d.\n"), 01593 ret), 01594 ret); 01595 } 01596 01597 element->set_sample(create_control_message(UNREGISTER_INSTANCE, 01598 element->get_header(), 01599 unregistered_sample_data, 01600 source_timestamp)); 01601 ret = this->data_container_->enqueue_control(element); 01602 01603 if (ret != DDS::RETCODE_OK) { 01604 ACE_ERROR_RETURN((LM_ERROR, 01605 ACE_TEXT("(%P|%t) ERROR: ") 01606 ACE_TEXT("DataWriterImpl::unregister_instance_i: ") 01607 ACE_TEXT("enqueue_control failed.\n")), 01608 ret); 01609 } 01610 01611 send_all_to_flush_control(guard); 01612 return DDS::RETCODE_OK; 01613 }
void OpenDDS::DCPS::DataWriterImpl::unregister_instances | ( | const DDS::Time_t & | source_timestamp | ) |
Unregister all registered instances and tell the transport to broadcast the unregistered instances.
Definition at line 1677 of file DataWriterImpl.cpp.
References data_container_, OpenDDS::DCPS::WriteDataContainer::instances_, and sync_unreg_rem_assocs_lock_.
Referenced by OpenDDS::DCPS::PublisherImpl::delete_datawriter().
01678 { 01679 { 01680 ACE_GUARD(ACE_Thread_Mutex, guard, sync_unreg_rem_assocs_lock_); 01681 01682 PublicationInstanceMapType::iterator it = 01683 this->data_container_->instances_.begin(); 01684 01685 while (it != this->data_container_->instances_.end()) { 01686 DDS::InstanceHandle_t handle = it->first; 01687 ++it; // avoid mangling the iterator 01688 01689 this->unregister_instance_i(handle, source_timestamp); 01690 } 01691 } 01692 }
virtual void OpenDDS::DCPS::DataWriterImpl::unregistered | ( | DDS::InstanceHandle_t | instance_handle | ) | [pure virtual] |
This method is called when an instance is unregistered from the WriteDataContainer.
The subclass must provide the implementation to unregister the instance from its own map.
Referenced by OpenDDS::DCPS::WriteDataContainer::unregister().
void OpenDDS::DCPS::DataWriterImpl::update_incompatible_qos | ( | const IncompatibleQosStatus & | status | ) | [virtual] |
Implements OpenDDS::DCPS::DataWriterCallbacks.
Definition at line 821 of file DataWriterImpl.cpp.
References OpenDDS::DCPS::IncompatibleQosStatus::count_since_last_send, dw_local_objref_, OpenDDS::DCPS::IncompatibleQosStatus::last_policy_id, DDS::OfferedIncompatibleQosStatus::last_policy_id, listener_for(), OpenDDS::DCPS::EntityImpl::notify_status_condition(), DDS::OFFERED_INCOMPATIBLE_QOS_STATUS, offered_incompatible_qos_status_, OpenDDS::DCPS::IncompatibleQosStatus::policies, DDS::OfferedIncompatibleQosStatus::policies, OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), DDS::OfferedIncompatibleQosStatus::total_count, OpenDDS::DCPS::IncompatibleQosStatus::total_count, and DDS::OfferedIncompatibleQosStatus::total_count_change.
00822 { 00823 DDS::DataWriterListener_var listener = 00824 listener_for(DDS::OFFERED_INCOMPATIBLE_QOS_STATUS); 00825 00826 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_); 00827 00828 #if 0 00829 00830 if (this->offered_incompatible_qos_status_.total_count == status.total_count) { 00831 // This test should make the method idempotent. 00832 return; 00833 } 00834 00835 #endif 00836 00837 set_status_changed_flag(DDS::OFFERED_INCOMPATIBLE_QOS_STATUS, true); 00838 00839 // copy status and increment change 00840 offered_incompatible_qos_status_.total_count = status.total_count; 00841 offered_incompatible_qos_status_.total_count_change += 00842 status.count_since_last_send; 00843 offered_incompatible_qos_status_.last_policy_id = status.last_policy_id; 00844 offered_incompatible_qos_status_.policies = status.policies; 00845 00846 if (!CORBA::is_nil(listener.in())) { 00847 listener->on_offered_incompatible_qos(dw_local_objref_.in(), 00848 offered_incompatible_qos_status_); 00849 00850 // TBD - Why does the spec say to change this but not change the 00851 // ChangeFlagStatus after a listener call? 00852 offered_incompatible_qos_status_.total_count_change = 0; 00853 } 00854 00855 notify_status_condition(); 00856 }
void OpenDDS::DCPS::DataWriterImpl::update_subscription_params | ( | const RepoId & | readerId, | |
const DDS::StringSeq & | params | |||
) | [virtual] |
Implements OpenDDS::DCPS::DataWriterCallbacks.
Definition at line 859 of file DataWriterImpl.cpp.
References OpenDDS::DCPS::DCPS_debug_level, OPENDDS_STRING, reader_info_, and TheServiceParticipant.
00861 { 00862 #ifdef OPENDDS_NO_CONTENT_FILTERED_TOPIC 00863 ACE_UNUSED_ARG(readerId); 00864 ACE_UNUSED_ARG(params); 00865 #else 00866 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_); 00867 ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_); 00868 RepoIdToReaderInfoMap::iterator iter = reader_info_.find(readerId); 00869 00870 if (iter != reader_info_.end()) { 00871 iter->second.expression_params_ = params; 00872 00873 } else if (DCPS_debug_level > 4 && 00874 TheServiceParticipant->publisher_content_filter()) { 00875 GuidConverter pubConv(this->publication_id_), subConv(readerId); 00876 ACE_DEBUG((LM_WARNING, 00877 ACE_TEXT("(%P|%t) WARNING: DataWriterImpl::update_subscription_params()") 00878 ACE_TEXT(" - writer: %C has no info about reader: %C\n"), 00879 OPENDDS_STRING(pubConv).c_str(), OPENDDS_STRING(subConv).c_str())); 00880 } 00881 00882 #endif 00883 }
void OpenDDS::DCPS::DataWriterImpl::wait_control_pending | ( | ) |
Wait until pending control elements have either been delivered or dropped.
Definition at line 2599 of file DataWriterImpl.cpp.
References controlTracker, OPENDDS_STRING, and OpenDDS::DCPS::MessageTracker::wait_messages_pending().
Referenced by OpenDDS::DCPS::PublisherImpl::delete_datawriter().
02600 { 02601 OPENDDS_STRING caller_string("DataWriterImpl::wait_control_pending"); 02602 controlTracker.wait_messages_pending(caller_string); 02603 }
DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::wait_for_acknowledgments | ( | const DDS::Duration_t & | max_wait | ) | [virtual] |
Definition at line 1015 of file DataWriterImpl.cpp.
References create_ack_token(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::SequenceNumber::getValue(), DDS::RELIABLE_RELIABILITY_QOS, DDS::RETCODE_OK, OpenDDS::DCPS::DataWriterImpl::AckToken::sequence_, and wait_for_specific_ack().
01016 { 01017 if (this->qos_.reliability.kind != DDS::RELIABLE_RELIABILITY_QOS) 01018 return DDS::RETCODE_OK; 01019 DataWriterImpl::AckToken token = create_ack_token(max_wait); 01020 if (DCPS_debug_level) { 01021 ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) DataWriterImpl::wait_for_acknowledgments") 01022 ACE_TEXT(" waiting for acknowledgment of sequence %q at %T\n"), 01023 token.sequence_.getValue())); 01024 } 01025 return wait_for_specific_ack(token); 01026 }
DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::wait_for_specific_ack | ( | const AckToken & | token | ) | [protected] |
Definition at line 1029 of file DataWriterImpl.cpp.
References data_container_, OpenDDS::DCPS::DataWriterImpl::AckToken::deadline(), OpenDDS::DCPS::DataWriterImpl::AckToken::sequence_, and OpenDDS::DCPS::WriteDataContainer::wait_ack_of_seq().
Referenced by wait_for_acknowledgments().
01030 { 01031 return this->data_container_->wait_ack_of_seq(token.deadline(), token.sequence_); 01032 }
void OpenDDS::DCPS::DataWriterImpl::wait_pending | ( | ) |
Wait for pending samples to drain.
Definition at line 2606 of file DataWriterImpl.cpp.
References data_container_, and OpenDDS::DCPS::WriteDataContainer::wait_pending().
Referenced by OpenDDS::DCPS::PublisherImpl::delete_datawriter().
02607 { 02608 this->data_container_->wait_pending(); 02609 }
DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::write | ( | DataSample * | sample, | |
DDS::InstanceHandle_t | handle, | |||
const DDS::Time_t & | source_timestamp, | |||
GUIDSeq * | filter_out | |||
) |
Delegate to the WriteDataContainer to queue the instance sample and finally tell the transport to send the sample.
filter_out | can either be null (if the writer can't or won't evaluate the filters), or a list of associated reader RepoIds that should NOT get the data sample due to content filtering. |
Definition at line 1695 of file DataWriterImpl.cpp.
References available_data_list_, coherent_samples_, create_sample_data_message(), data_container_, DBG_ENTRY_LVL, OpenDDS::DCPS::EntityImpl::enabled_, OpenDDS::DCPS::WriteDataContainer::enqueue(), OpenDDS::DCPS::SendStateDataSampleList::enqueue_tail(), OpenDDS::DCPS::DataSampleElement::get_header(), get_lock(), get_unsent_data(), last_liveliness_activity_time_, max_suspended_transaction_id_, min_suspended_transaction_id_, OpenDDS::DCPS::WriteDataContainer::obtain_buffer(), DDS::RETCODE_ERROR, DDS::RETCODE_NOT_ENABLED, DDS::RETCODE_OK, DDS::RETCODE_TIMEOUT, OpenDDS::DCPS::TransportClient::send(), OpenDDS::DCPS::DataSampleElement::set_filter_out(), OpenDDS::DCPS::DataSampleElement::set_sample(), and track_sequence_number().
Referenced by OpenDDS::DCPS::DataDurabilityCache::get_data(), and OpenDDS::DCPS::DataWriterImpl_T< MessageType >::write_w_timestamp().
01699 { 01700 DBG_ENTRY_LVL("DataWriterImpl","write",6); 01701 01702 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, 01703 guard, 01704 get_lock (), 01705 ::DDS::RETCODE_ERROR); 01706 01707 // take ownership of sequence allocated in FooDWImpl::write_w_timestamp() 01708 GUIDSeq_var filter_out_var(filter_out); 01709 01710 if (enabled_ == false) { 01711 ACE_ERROR_RETURN((LM_ERROR, 01712 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::write: ") 01713 ACE_TEXT(" Entity is not enabled. \n")), 01714 DDS::RETCODE_NOT_ENABLED); 01715 } 01716 01717 DataSampleElement* element = 0; 01718 DDS::ReturnCode_t ret = this->data_container_->obtain_buffer(element, handle); 01719 01720 if (ret == DDS::RETCODE_TIMEOUT) { 01721 return ret; // silent for timeout 01722 01723 } else if (ret != DDS::RETCODE_OK) { 01724 ACE_ERROR_RETURN((LM_ERROR, 01725 ACE_TEXT("(%P|%t) ERROR: ") 01726 ACE_TEXT("DataWriterImpl::write: ") 01727 ACE_TEXT("obtain_buffer returned %d.\n"), 01728 ret), 01729 ret); 01730 } 01731 01732 DataSample* temp; 01733 ret = create_sample_data_message(data, 01734 handle, 01735 element->get_header(), 01736 temp, 01737 source_timestamp, 01738 (filter_out != 0)); 01739 element->set_sample(temp); 01740 01741 if (ret != DDS::RETCODE_OK) { 01742 return ret; 01743 } 01744 01745 element->set_filter_out(filter_out_var._retn()); // ownership passed to element 01746 01747 ret = this->data_container_->enqueue(element, handle); 01748 01749 if (ret != DDS::RETCODE_OK) { 01750 ACE_ERROR_RETURN((LM_ERROR, 01751 ACE_TEXT("(%P|%t) ERROR: ") 01752 ACE_TEXT("DataWriterImpl::write: ") 01753 ACE_TEXT("enqueue failed.\n")), 01754 ret); 01755 } 01756 this->last_liveliness_activity_time_ = ACE_OS::gettimeofday(); 01757 01758 track_sequence_number(filter_out); 01759 01760 if (this->coherent_) { 01761 ++this->coherent_samples_; 01762 } 01763 SendStateDataSampleList list; 01764 01765 ACE_UINT64 transaction_id = this->get_unsent_data(list); 01766 01767 if (this->publisher_servant_->is_suspended()) { 01768 if (min_suspended_transaction_id_ == 0) { 01769 //provides transaction id for lower bound of suspended transactions 01770 //or transaction id for single suspended write transaction 01771 min_suspended_transaction_id_ = transaction_id; 01772 } else { 01773 //when multiple write transactions have suspended, provides the upper bound 01774 //for suspended transactions. 01775 max_suspended_transaction_id_ = transaction_id; 01776 } 01777 this->available_data_list_.enqueue_tail(list); 01778 01779 } else { 01780 guard.release(); 01781 01782 this->send(list, transaction_id); 01783 } 01784 01785 return DDS::RETCODE_OK; 01786 }
friend class ::DDS_TEST [friend] |
Reimplemented from OpenDDS::DCPS::TransportClient.
Reimplemented in OpenDDS::DCPS::DataWriterImpl_T< MessageType >.
Definition at line 584 of file DataWriterImpl.h.
friend class PublisherImpl [friend] |
Definition at line 85 of file DataWriterImpl.h.
friend class WriteDataContainer [friend] |
RepoIdSet OpenDDS::DCPS::DataWriterImpl::assoc_complete_readers_ [private] |
Definition at line 683 of file DataWriterImpl.h.
Referenced by association_complete(), and transport_assoc_done().
size_t OpenDDS::DCPS::DataWriterImpl::association_chunk_multiplier_ [protected] |
The multiplier for allocators affected by associations.
Definition at line 489 of file DataWriterImpl.h.
Referenced by enable().
bool OpenDDS::DCPS::DataWriterImpl::cancel_timer_ [private] |
The flag indicates whether the liveliness timer is scheduled and needs be cancelled.
Definition at line 674 of file DataWriterImpl.h.
Referenced by cleanup(), enable(), and unregister_all().
bool OpenDDS::DCPS::DataWriterImpl::coherent_ [private] |
Flag indicating DataWriter current belongs to a coherent change set.
Definition at line 612 of file DataWriterImpl.h.
Referenced by begin_coherent_changes(), coherent_changes_pending(), create_sample_data_message(), and end_coherent_changes().
ACE_UINT32 OpenDDS::DCPS::DataWriterImpl::coherent_samples_ [private] |
The number of samples belonging to the current coherent change set.
Definition at line 615 of file DataWriterImpl.h.
Referenced by end_coherent_changes(), and write().
Definition at line 423 of file DataWriterImpl.h.
Referenced by association_complete_i(), control_delivered(), control_dropped(), OpenDDS::DCPS::WriteDataContainer::data_delivered(), OpenDDS::DCPS::WriteDataContainer::data_dropped(), pending_control(), send_all_to_flush_control(), send_control(), and wait_control_pending().
The sample data container.
Definition at line 617 of file DataWriterImpl.h.
Referenced by association_complete_i(), create_sample_data_message(), data_delivered(), data_dropped(), dispose(), dispose_and_unregister(), enable(), get_handle_instance(), get_instance_handles(), num_samples(), persist_data(), register_instance_i(), reschedule_deadline(), unregister_all(), unregister_instance_i(), unregister_instances(), wait_for_specific_ack(), wait_pending(), write(), and ~DataWriterImpl().
The data block allocator.
Reimplemented in OpenDDS::DCPS::DataWriterImpl_T< MessageType >.
Definition at line 653 of file DataWriterImpl.h.
Referenced by create_control_message(), create_sample_data_message(), enable(), and ~DataWriterImpl().
Definition at line 697 of file DataWriterImpl.h.
Referenced by DataWriterImpl(), and ~DataWriterImpl().
The domain id.
Definition at line 601 of file DataWriterImpl.h.
Referenced by enable(), init(), send_liveliness(), set_qos(), and transport_assoc_done().
DDS::DataWriter_var OpenDDS::DCPS::DataWriterImpl::dw_local_objref_ [private] |
the object reference of the local datawriter
Definition at line 605 of file DataWriterImpl.h.
Referenced by association_complete_i(), cleanup(), init(), and update_incompatible_qos().
The header data allocator.
Definition at line 655 of file DataWriterImpl.h.
Referenced by create_sample_data_message(), enable(), and ~DataWriterImpl().
RepoIdToHandleMap OpenDDS::DCPS::DataWriterImpl::id_to_handle_map_ [private] |
Definition at line 624 of file DataWriterImpl.h.
Referenced by association_complete_i(), and get_matched_subscriptions().
bool OpenDDS::DCPS::DataWriterImpl::initialized_ [private] |
Flag indicates that the init() is called.
Definition at line 681 of file DataWriterImpl.h.
Referenced by init(), and ~DataWriterImpl().
bool OpenDDS::DCPS::DataWriterImpl::is_bit_ [private] |
Flag indicates that this datawriter is a builtin topic datawriter.
Definition at line 678 of file DataWriterImpl.h.
Referenced by add_association(), association_complete(), association_complete_i(), init(), notify_publication_disconnected(), notify_publication_lost(), and notify_publication_reconnected().
CORBA::Long OpenDDS::DCPS::DataWriterImpl::last_deadline_missed_total_count_ [private] |
Total number of offered deadlines missed during last offered deadline status check.
Definition at line 668 of file DataWriterImpl.h.
Referenced by enable(), get_offered_deadline_missed_status(), and set_qos().
ACE_Time_Value OpenDDS::DCPS::DataWriterImpl::last_liveliness_activity_time_ [private] |
Timestamp of last write/dispose/assert_liveliness.
Definition at line 663 of file DataWriterImpl.h.
Referenced by handle_timeout(), participant_liveliness_activity_after(), send_liveliness(), and write().
ACE_Time_Value OpenDDS::DCPS::DataWriterImpl::last_liveliness_check_time_ [private] |
Timestamp of the last time liveliness was checked.
Definition at line 665 of file DataWriterImpl.h.
Referenced by enable(), and handle_timeout().
DDS::DataWriterListener_var OpenDDS::DCPS::DataWriterImpl::listener_ [private] |
Used to notify the entity for relevant events.
Definition at line 599 of file DataWriterImpl.h.
Referenced by get_listener(), init(), listener_for(), and set_listener().
The StatusKind bit mask indicates which status condition change can be notified by the listener of this entity.
Definition at line 597 of file DataWriterImpl.h.
Referenced by init(), listener_for(), and set_listener().
bool OpenDDS::DCPS::DataWriterImpl::liveliness_asserted_ [private] |
Definition at line 706 of file DataWriterImpl.h.
Referenced by assert_liveliness_by_participant(), and handle_timeout().
ACE_Time_Value OpenDDS::DCPS::DataWriterImpl::liveliness_check_interval_ [private] |
The time interval for sending liveliness message.
Definition at line 661 of file DataWriterImpl.h.
Referenced by enable(), handle_timeout(), and liveliness_check_interval().
bool OpenDDS::DCPS::DataWriterImpl::liveliness_lost_ [private] |
True if the writer failed to actively signal its liveliness within its offered liveliness period.
Definition at line 636 of file DataWriterImpl.h.
Referenced by handle_timeout().
Status conditions.
Definition at line 629 of file DataWriterImpl.h.
Referenced by DataWriterImpl(), get_liveliness_lost_status(), and handle_timeout().
ACE_Recursive_Thread_Mutex OpenDDS::DCPS::DataWriterImpl::lock_ [private] |
The lock to protect the activate subscriptions and status changes.
Reimplemented from OpenDDS::DCPS::EntityImpl.
Definition at line 620 of file DataWriterImpl.h.
Referenced by remove_all_associations(), and transport_assoc_done().
ACE_UINT64 OpenDDS::DCPS::DataWriterImpl::max_suspended_transaction_id_ [private] |
The message block allocator.
Reimplemented in OpenDDS::DCPS::DataWriterImpl_T< MessageType >.
Definition at line 651 of file DataWriterImpl.h.
Referenced by create_control_message(), create_sample_data_message(), enable(), and ~DataWriterImpl().
ACE_UINT64 OpenDDS::DCPS::DataWriterImpl::min_suspended_transaction_id_ [private] |
The cached available data while suspending and associated transaction ids.
Definition at line 686 of file DataWriterImpl.h.
Referenced by send_suspended_data(), and write().
Monitor* OpenDDS::DCPS::DataWriterImpl::monitor_ [private] |
Monitor object for this entity.
Definition at line 691 of file DataWriterImpl.h.
Referenced by association_complete_i(), DataWriterImpl(), enable(), and register_instance_i().
size_t OpenDDS::DCPS::DataWriterImpl::n_chunks_ [protected] |
The number of chunks for the cached allocator.
Definition at line 486 of file DataWriterImpl.h.
Referenced by DataWriterImpl(), and enable().
DDS::OfferedDeadlineMissedStatus OpenDDS::DCPS::DataWriterImpl::offered_deadline_missed_status_ [private] |
Definition at line 630 of file DataWriterImpl.h.
Referenced by DataWriterImpl(), enable(), get_offered_deadline_missed_status(), and set_qos().
DDS::OfferedIncompatibleQosStatus OpenDDS::DCPS::DataWriterImpl::offered_incompatible_qos_status_ [private] |
Definition at line 631 of file DataWriterImpl.h.
Referenced by DataWriterImpl(), get_offered_incompatible_qos_status(), and update_incompatible_qos().
The participant servant which creats the publisher that creates this datawriter.
Definition at line 505 of file DataWriterImpl.h.
Referenced by add_association(), assert_liveliness(), association_complete_i(), enable(), get_dp_id(), get_instance_handle(), get_matched_subscription_data(), get_next_handle(), init(), set_qos(), and transport_assoc_done().
RepoIdSet OpenDDS::DCPS::DataWriterImpl::pending_readers_ [private] |
Definition at line 683 of file DataWriterImpl.h.
Referenced by association_complete(), and transport_assoc_done().
Periodic Monitor object for this entity.
Definition at line 694 of file DataWriterImpl.h.
Referenced by DataWriterImpl().
The repository id of this datawriter/publication.
Definition at line 607 of file DataWriterImpl.h.
Referenced by add_association(), create_control_message(), create_sample_data_message(), data_delivered(), enable(), get_instance_handle(), get_publication_id(), set_qos(), and transport_assoc_done().
Definition at line 632 of file DataWriterImpl.h.
Referenced by association_complete_i(), DataWriterImpl(), and get_publication_matched_status().
The publisher servant which creates this datawriter.
Definition at line 603 of file DataWriterImpl.h.
Referenced by create_control_message(), create_sample_data_message(), enable(), end_coherent_changes(), get_publisher(), init(), listener_for(), parent(), retrieve_inline_qos_data(), and set_qos().
The qos policy list of this datawriter.
Definition at line 501 of file DataWriterImpl.h.
Referenced by add_association(), create_sample_data_message(), enable(), get_qos(), handle_timeout(), init(), retrieve_inline_qos_data(), and set_qos().
ACE_Reactor_Timer_Interface* OpenDDS::DCPS::DataWriterImpl::reactor_ [private] |
The orb's reactor to be used to register the liveliness timer.
Definition at line 659 of file DataWriterImpl.h.
Referenced by cleanup(), enable(), handle_timeout(), init(), and unregister_all().
RepoIdToReaderInfoMap OpenDDS::DCPS::DataWriterImpl::reader_info_ [protected] |
Definition at line 526 of file DataWriterImpl.h.
Referenced by add_association(), association_complete_i(), create_control_message(), need_sequence_repair_i(), track_sequence_number(), and update_subscription_params().
ACE_Thread_Mutex OpenDDS::DCPS::DataWriterImpl::reader_info_lock_ [protected] |
Definition at line 508 of file DataWriterImpl.h.
RepoIdSet OpenDDS::DCPS::DataWriterImpl::readers_ [private] |
The sequence number unique in DataWriter scope.
Definition at line 609 of file DataWriterImpl.h.
Referenced by create_control_message(), create_sample_data_message(), end_coherent_changes(), need_sequence_repair_i(), and track_sequence_number().
ACE_Thread_Mutex OpenDDS::DCPS::DataWriterImpl::sync_unreg_rem_assocs_lock_ [private] |
The associated topic repository id.
Definition at line 589 of file DataWriterImpl.h.
Referenced by init().
CORBA::String_var OpenDDS::DCPS::DataWriterImpl::topic_name_ [private] |
The name of associated topic.
Definition at line 587 of file DataWriterImpl.h.
Referenced by enable(), get_topic_name(), init(), and retrieve_inline_qos_data().
DDS::Topic_var OpenDDS::DCPS::DataWriterImpl::topic_objref_ [private] |
The object reference of the associated topic.
Definition at line 591 of file DataWriterImpl.h.
Referenced by cleanup(), get_topic(), and init().
The topic servant.
Definition at line 593 of file DataWriterImpl.h.
Referenced by cleanup(), enable(), filter_out(), inconsistent_topic(), and init().
CORBA::String_var OpenDDS::DCPS::DataWriterImpl::type_name_ [protected] |
The type name of associated topic.
Definition at line 498 of file DataWriterImpl.h.
Referenced by get_type_name(), and init().
Watchdog responsible for reporting missed offered deadlines.
Definition at line 671 of file DataWriterImpl.h.
Referenced by OpenDDS::DCPS::WriteDataContainer::dispose(), enable(), OpenDDS::DCPS::WriteDataContainer::enqueue(), OpenDDS::DCPS::WriteDataContainer::register_instance(), set_qos(), and OpenDDS::DCPS::WriteDataContainer::unregister().