OpenDDS::DCPS::DataWriterImpl Class Reference

Implements the OpenDDS::DCPS::DataWriterRemote interfaces and DDS::DataWriter interfaces. More...

#include <DataWriterImpl.h>

Inheritance diagram for OpenDDS::DCPS::DataWriterImpl:

Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::DataWriterImpl:

Collaboration graph
[legend]
List of all members.

Public Member Functions

typedef OPENDDS_MAP_CMP (RepoId, SequenceNumber, GUID_tKeyLessThan) RepoIdToSequenceMap
 DataWriterImpl ()
 Constructor.
virtual ~DataWriterImpl ()
 Destructor.
virtual DDS::InstanceHandle_t get_instance_handle ()
virtual DDS::ReturnCode_t set_qos (const DDS::DataWriterQos &qos)
virtual DDS::ReturnCode_t get_qos (DDS::DataWriterQos &qos)
virtual DDS::ReturnCode_t set_listener (DDS::DataWriterListener_ptr a_listener, DDS::StatusMask mask)
virtual DDS::DataWriterListener_ptr get_listener ()
virtual DDS::Topic_ptr get_topic ()
virtual DDS::ReturnCode_t wait_for_acknowledgments (const DDS::Duration_t &max_wait)
virtual DDS::Publisher_ptr get_publisher ()
virtual DDS::ReturnCode_t get_liveliness_lost_status (DDS::LivelinessLostStatus &status)
virtual DDS::ReturnCode_t get_offered_deadline_missed_status (DDS::OfferedDeadlineMissedStatus &status)
virtual DDS::ReturnCode_t get_offered_incompatible_qos_status (DDS::OfferedIncompatibleQosStatus &status)
virtual DDS::ReturnCode_t get_publication_matched_status (DDS::PublicationMatchedStatus &status)
ACE_Time_Value liveliness_check_interval (DDS::LivelinessQosPolicyKind kind)
bool participant_liveliness_activity_after (const ACE_Time_Value &tv)
virtual DDS::ReturnCode_t assert_liveliness ()
virtual DDS::ReturnCode_t assert_liveliness_by_participant ()
typedef OPENDDS_VECTOR (DDS::InstanceHandle_t) InstanceHandleVec
void get_instance_handles (InstanceHandleVec &instance_handles)
void get_readers (RepoIdSet &readers)
virtual DDS::ReturnCode_t get_matched_subscriptions (DDS::InstanceHandleSeq &subscription_handles)
virtual DDS::ReturnCode_t get_matched_subscription_data (DDS::SubscriptionBuiltinTopicData &subscription_data, DDS::InstanceHandle_t subscription_handle)
virtual DDS::ReturnCode_t enable ()
virtual void add_association (const RepoId &yourId, const ReaderAssociation &reader, bool active)
virtual void transport_assoc_done (int flags, const RepoId &remote_id)
virtual void association_complete (const RepoId &remote_id)
virtual void remove_associations (const ReaderIdSeq &readers, bool callback)
virtual void update_incompatible_qos (const IncompatibleQosStatus &status)
virtual void update_subscription_params (const RepoId &readerId, const DDS::StringSeq &params)
virtual void inconsistent_topic ()
void cleanup ()
virtual void init (DDS::Topic_ptr topic, TopicImpl *topic_servant, const DDS::DataWriterQos &qos, DDS::DataWriterListener_ptr a_listener, const DDS::StatusMask &mask, OpenDDS::DCPS::DomainParticipantImpl *participant_servant, OpenDDS::DCPS::PublisherImpl *publisher_servant, DDS::DataWriter_ptr dw_local)
void send_all_to_flush_control (ACE_Guard< ACE_Recursive_Thread_Mutex > &guard)
DDS::ReturnCode_t register_instance_i (DDS::InstanceHandle_t &handle, DataSample *data, const DDS::Time_t &source_timestamp)
DDS::ReturnCode_t register_instance_from_durable_data (DDS::InstanceHandle_t &handle, DataSample *data, const DDS::Time_t &source_timestamp)
DDS::ReturnCode_t unregister_instance_i (DDS::InstanceHandle_t handle, const DDS::Time_t &source_timestamp)
void unregister_instances (const DDS::Time_t &source_timestamp)
DDS::ReturnCode_t write (DataSample *sample, DDS::InstanceHandle_t handle, const DDS::Time_t &source_timestamp, GUIDSeq *filter_out)
DDS::ReturnCode_t dispose (DDS::InstanceHandle_t handle, const DDS::Time_t &source_timestamp)
DDS::ReturnCode_t num_samples (DDS::InstanceHandle_t handle, size_t &size)
ACE_UINT64 get_unsent_data (SendStateDataSampleList &list)
SendStateDataSampleList get_resend_data ()
RepoId get_publication_id ()
RepoId get_dp_id ()
void unregister_all ()
void data_delivered (const DataSampleElement *sample)
void control_delivered (ACE_Message_Block *sample)
bool should_ack () const
 Does this writer have samples to be acknowledged?
AckToken create_ack_token (DDS::Duration_t max_wait) const
 Create an AckToken for ack operations.
virtual void retrieve_inline_qos_data (TransportSendListener::InlineQosData &qos_data) const
virtual bool check_transport_qos (const TransportInst &inst)
bool coherent_changes_pending ()
 Are coherent changes pending?
void begin_coherent_changes ()
 Starts a coherent change set; should only be called once.
void end_coherent_changes (const GroupCoherentSamples &group_samples)
 Ends a coherent change set; should only be called once.
const char * get_topic_name ()
char const * get_type_name () const
void data_dropped (const DataSampleElement *element, bool dropped_by_transport)
void control_dropped (ACE_Message_Block *sample, bool dropped_by_transport)
ACE_INLINE ACE_Recursive_Thread_Mutex & get_lock ()
virtual void unregistered (DDS::InstanceHandle_t instance_handle)=0
DDS::DataWriterListener_ptr listener_for (DDS::StatusKind kind)
virtual int handle_timeout (const ACE_Time_Value &tv, const void *arg)
 Handle the assert liveliness timeout.
virtual int handle_close (ACE_HANDLE, ACE_Reactor_Mask)
void send_suspended_data ()
void remove_all_associations ()
virtual void register_for_reader (const RepoId &participant, const RepoId &writerid, const RepoId &readerid, const TransportLocatorSeq &locators, DiscoveryListener *listener)
virtual void unregister_for_reader (const RepoId &participant, const RepoId &writerid, const RepoId &readerid)
void notify_publication_disconnected (const ReaderIdSeq &subids)
void notify_publication_reconnected (const ReaderIdSeq &subids)
void notify_publication_lost (const ReaderIdSeq &subids)
virtual void notify_connection_deleted (const RepoId &peerId)
DDS::ReturnCode_t create_sample_data_message (DataSample *data, DDS::InstanceHandle_t instance_handle, DataSampleHeader &header_data, ACE_Message_Block *&message, const DDS::Time_t &source_timestamp, bool content_filter)
bool persist_data ()
void reschedule_deadline ()
void wait_pending ()
 Wait for pending samples to drain.
DDS::InstanceHandle_t get_next_handle ()
virtual EntityImplparent () const
bool filter_out (const DataSampleElement &elt, const OPENDDS_STRING &filterClassName, const FilterEvaluator &evaluator, const DDS::StringSeq &expression_params) const
void wait_control_pending ()
DataBlockLockPool::DataBlockLockget_db_lock ()

Public Attributes

int data_dropped_count_
 Statistics counter.
int data_delivered_count_
MessageTracker controlTracker

Protected Member Functions

DDS::ReturnCode_t wait_for_specific_ack (const AckToken &token)
void prepare_to_delete ()
virtual DDS::ReturnCode_t enable_specific ()=0
PublicationInstanceget_handle_instance (DDS::InstanceHandle_t handle)
typedef OPENDDS_MAP_CMP (RepoId, ReaderInfo, GUID_tKeyLessThan) RepoIdToReaderInfoMap
virtual SendControlStatus send_control (const DataSampleHeader &header, ACE_Message_Block *msg)
bool pending_control ()

Protected Attributes

size_t n_chunks_
 The number of chunks for the cached allocator.
size_t association_chunk_multiplier_
 The multiplier for allocators affected by associations.
CORBA::String_var type_name_
 The type name of associated topic.
DDS::DataWriterQos qos_
 The qos policy list of this datawriter.
DomainParticipantImplparticipant_servant_
ACE_Thread_Mutex reader_info_lock_
RepoIdToReaderInfoMap reader_info_

Private Member Functions

void track_sequence_number (GUIDSeq *filter_out)
void notify_publication_lost (const DDS::InstanceHandleSeq &handles)
DDS::ReturnCode_t dispose_and_unregister (DDS::InstanceHandle_t handle, const DDS::Time_t &timestamp)
ACE_Message_Block * create_control_message (MessageId message_id, DataSampleHeader &header, ACE_Message_Block *data, const DDS::Time_t &source_timestamp)
bool send_liveliness (const ACE_Time_Value &now)
 Send the liveliness message.
bool lookup_instance_handles (const ReaderIdSeq &ids, DDS::InstanceHandleSeq &hdls)
 Lookup the instance handles by the subscription repo ids.
const RepoIdget_repo_id () const
DDS::DomainId_t domain_id () const
CORBA::Long get_priority_value (const AssociationData &) const
void association_complete_i (const RepoId &remote_id)
typedef OPENDDS_MAP_CMP (RepoId, DDS::InstanceHandle_t, GUID_tKeyLessThan) RepoIdToHandleMap
bool need_sequence_repair ()
bool need_sequence_repair_i () const
DDS::ReturnCode_t send_end_historic_samples (const RepoId &readerId)

Private Attributes

CORBA::String_var topic_name_
 The name of associated topic.
RepoId topic_id_
 The associated topic repository id.
DDS::Topic_var topic_objref_
 The object reference of the associated topic.
TopicImpltopic_servant_
 The topic servant.
DDS::StatusMask listener_mask_
DDS::DataWriterListener_var listener_
 Used to notify the entity for relevant events.
DDS::DomainId_t domain_id_
 The domain id.
PublisherImplpublisher_servant_
 The publisher servant which creates this datawriter.
DDS::DataWriter_var dw_local_objref_
 the object reference of the local datawriter
PublicationId publication_id_
 The repository id of this datawriter/publication.
SequenceNumber sequence_number_
 The sequence number unique in DataWriter scope.
bool coherent_
ACE_UINT32 coherent_samples_
WriteDataContainerdata_container_
 The sample data container.
ACE_Recursive_Thread_Mutex lock_
RepoIdToHandleMap id_to_handle_map_
RepoIdSet readers_
DDS::LivelinessLostStatus liveliness_lost_status_
 Status conditions.
DDS::OfferedDeadlineMissedStatus offered_deadline_missed_status_
DDS::OfferedIncompatibleQosStatus offered_incompatible_qos_status_
DDS::PublicationMatchedStatus publication_match_status_
bool liveliness_lost_
MessageBlockAllocatormb_allocator_
 The message block allocator.
DataBlockAllocatordb_allocator_
 The data block allocator.
DataSampleHeaderAllocatorheader_allocator_
 The header data allocator.
ACE_Reactor_Timer_Interface * reactor_
ACE_Time_Value liveliness_check_interval_
 The time interval for sending liveliness message.
ACE_Time_Value last_liveliness_activity_time_
 Timestamp of last write/dispose/assert_liveliness.
ACE_Time_Value last_liveliness_check_time_
 Timestamp of the last time liveliness was checked.
CORBA::Long last_deadline_missed_total_count_
OfferedDeadlineWatchdogwatchdog_
bool cancel_timer_
bool is_bit_
bool initialized_
 Flag indicates that the init() is called.
RepoIdSet pending_readers_
RepoIdSet assoc_complete_readers_
ACE_UINT64 min_suspended_transaction_id_
 The cached available data while suspending and associated transaction ids.
ACE_UINT64 max_suspended_transaction_id_
SendStateDataSampleList available_data_list_
Monitormonitor_
 Monitor object for this entity.
Monitorperiodic_monitor_
 Periodic Monitor object for this entity.
DataBlockLockPooldb_lock_pool_
bool liveliness_asserted_
ACE_Thread_Mutex sync_unreg_rem_assocs_lock_

Friends

class WriteDataContainer
class PublisherImpl
class ::DDS_TEST

Classes

struct  AckCustomization
struct  AckToken
struct  ReaderInfo

Detailed Description

Implements the OpenDDS::DCPS::DataWriterRemote interfaces and DDS::DataWriter interfaces.

See the DDS specification, OMG formal/04-12-02, for a description of the interface this class is implementing.

This class must be inherited by the type-specific datawriter which is specific to the data-type associated with the topic.

Note:
: This class is responsible for allocating memory for the header message block (MessageBlock + DataBlock + DataSampleHeader) and the DataSampleElement. The data-type datawriter is responsible for allocating memory for the sample data message block. (e.g. MessageBlock + DataBlock + Foo data). But it gives up ownership to this WriteDataContainer.

Definition at line 76 of file DataWriterImpl.h.


Constructor & Destructor Documentation

OpenDDS::DCPS::DataWriterImpl::DataWriterImpl (  ) 

Constructor.

Definition at line 56 of file DataWriterImpl.cpp.

References DDS::PublicationMatchedStatus::current_count, DDS::PublicationMatchedStatus::current_count_change, db_lock_pool_, DDS::HANDLE_NIL, DDS::OfferedDeadlineMissedStatus::last_instance_handle, DDS::OfferedIncompatibleQosStatus::last_policy_id, DDS::PublicationMatchedStatus::last_subscription_handle, liveliness_lost_status_, monitor_, n_chunks_, offered_deadline_missed_status_, offered_incompatible_qos_status_, periodic_monitor_, DDS::OfferedIncompatibleQosStatus::policies, publication_match_status_, TheServiceParticipant, DDS::PublicationMatchedStatus::total_count, DDS::OfferedIncompatibleQosStatus::total_count, DDS::OfferedDeadlineMissedStatus::total_count, DDS::LivelinessLostStatus::total_count, DDS::PublicationMatchedStatus::total_count_change, DDS::OfferedIncompatibleQosStatus::total_count_change, DDS::OfferedDeadlineMissedStatus::total_count_change, and DDS::LivelinessLostStatus::total_count_change.

00057   : data_dropped_count_(0),
00058     data_delivered_count_(0),
00059     controlTracker("DataWriterImpl"),
00060     n_chunks_(TheServiceParticipant->n_chunks()),
00061     association_chunk_multiplier_(TheServiceParticipant->association_chunk_multiplier()),
00062     qos_(TheServiceParticipant->initial_DataWriterQos()),
00063     participant_servant_(0),
00064     topic_id_(GUID_UNKNOWN),
00065     topic_servant_(0),
00066     listener_mask_(DEFAULT_STATUS_MASK),
00067     domain_id_(0),
00068     publisher_servant_(0),
00069     publication_id_(GUID_UNKNOWN),
00070     sequence_number_(SequenceNumber::SEQUENCENUMBER_UNKNOWN()),
00071     coherent_(false),
00072     coherent_samples_(0),
00073     data_container_(0),
00074     liveliness_lost_(false),
00075     mb_allocator_(0),
00076     db_allocator_(0),
00077     header_allocator_(0),
00078     reactor_(0),
00079     liveliness_check_interval_(ACE_Time_Value::max_time),
00080     last_liveliness_activity_time_(ACE_Time_Value::zero),
00081     last_deadline_missed_total_count_(0),
00082     watchdog_(),
00083     cancel_timer_(false),
00084     is_bit_(false),
00085     initialized_(false),
00086     min_suspended_transaction_id_(0),
00087     max_suspended_transaction_id_(0),
00088     monitor_(0),
00089     periodic_monitor_(0),
00090     db_lock_pool_(0),
00091     liveliness_asserted_(false)
00092 {
00093   liveliness_lost_status_.total_count = 0;
00094   liveliness_lost_status_.total_count_change = 0;
00095 
00096   offered_deadline_missed_status_.total_count = 0;
00097   offered_deadline_missed_status_.total_count_change = 0;
00098   offered_deadline_missed_status_.last_instance_handle = DDS::HANDLE_NIL;
00099 
00100   offered_incompatible_qos_status_.total_count = 0;
00101   offered_incompatible_qos_status_.total_count_change = 0;
00102   offered_incompatible_qos_status_.last_policy_id = 0;
00103   offered_incompatible_qos_status_.policies.length(0);
00104 
00105   publication_match_status_.total_count = 0;
00106   publication_match_status_.total_count_change = 0;
00107   publication_match_status_.current_count = 0;
00108   publication_match_status_.current_count_change = 0;
00109   publication_match_status_.last_subscription_handle = DDS::HANDLE_NIL;
00110 
00111   monitor_ =
00112     TheServiceParticipant->monitor_factory_->create_data_writer_monitor(this);
00113   periodic_monitor_ =
00114     TheServiceParticipant->monitor_factory_->create_data_writer_periodic_monitor(this);
00115 
00116   db_lock_pool_ = new DataBlockLockPool((unsigned long)n_chunks_);
00117 }

OpenDDS::DCPS::DataWriterImpl::~DataWriterImpl (  )  [virtual]

Destructor.

Definition at line 121 of file DataWriterImpl.cpp.

References data_container_, db_allocator_, db_lock_pool_, DBG_ENTRY_LVL, header_allocator_, initialized_, and mb_allocator_.

00122 {
00123   DBG_ENTRY_LVL("DataWriterImpl","~DataWriterImpl",6);
00124 
00125   if (initialized_) {
00126     delete data_container_;
00127     delete mb_allocator_;
00128     delete db_allocator_;
00129     delete header_allocator_;
00130   }
00131   delete db_lock_pool_;
00132 }


Member Function Documentation

void OpenDDS::DCPS::DataWriterImpl::add_association ( const RepoId yourId,
const ReaderAssociation reader,
bool  active 
) [virtual]

Implements OpenDDS::DCPS::DataWriterCallbacks.

Definition at line 215 of file DataWriterImpl.cpp.

References OpenDDS::DCPS::TransportClient::associate(), DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, DDS::DataReaderQos::durability, OpenDDS::DCPS::EntityImpl::entity_deleted_, OpenDDS::DCPS::ReaderAssociation::exprParams, OpenDDS::DCPS::ReaderAssociation::filterClassName, OpenDDS::DCPS::ReaderAssociation::filterExpression, get_publication_id(), OpenDDS::DCPS::GUID_UNKNOWN, is_bit_, OPENDDS_STRING, participant_servant_, publication_id_, qos_, reader_info_, OpenDDS::DCPS::ReaderAssociation::readerId, OpenDDS::DCPS::ReaderAssociation::readerQos, OpenDDS::DCPS::ReaderAssociation::readerTransInfo, DDS::DataReaderQos::reliability, DDS::RELIABLE_RELIABILITY_QOS, OpenDDS::DCPS::AssociationData::remote_data_, OpenDDS::DCPS::AssociationData::remote_durable_, OpenDDS::DCPS::AssociationData::remote_id_, OpenDDS::DCPS::AssociationData::remote_reliable_, TheServiceParticipant, DDS::DataWriterQos::transport_priority, and DDS::VOLATILE_DURABILITY_QOS.

00218 {
00219   DBG_ENTRY_LVL("DataWriterImpl", "add_association", 6);
00220 
00221   if (DCPS_debug_level) {
00222     GuidConverter writer_converter(yourId);
00223     GuidConverter reader_converter(reader.readerId);
00224     ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DataWriterImpl::add_association - ")
00225                ACE_TEXT("bit %d local %C remote %C\n"), is_bit_,
00226                OPENDDS_STRING(writer_converter).c_str(),
00227                OPENDDS_STRING(reader_converter).c_str()));
00228   }
00229 
00230   if (entity_deleted_.value()) {
00231     if (DCPS_debug_level)
00232       ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DataWriterImpl::add_association")
00233                  ACE_TEXT(" This is a deleted datawriter, ignoring add.\n")));
00234 
00235     return;
00236   }
00237 
00238   if (GUID_UNKNOWN == publication_id_) {
00239     publication_id_ = yourId;
00240   }
00241 
00242   {
00243     ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
00244     reader_info_.insert(std::make_pair(reader.readerId,
00245                                        ReaderInfo(reader.filterClassName,
00246                                                   TheServiceParticipant->publisher_content_filter() ? reader.filterExpression : "",
00247                                                   reader.exprParams, participant_servant_,
00248                                                   reader.readerQos.durability.kind > DDS::VOLATILE_DURABILITY_QOS)));
00249   }
00250 
00251   if (DCPS_debug_level > 4) {
00252     GuidConverter converter(get_publication_id());
00253     ACE_DEBUG((LM_DEBUG,
00254                ACE_TEXT("(%P|%t) DataWriterImpl::add_association(): ")
00255                ACE_TEXT("adding subscription to publication %C with priority %d.\n"),
00256                OPENDDS_STRING(converter).c_str(),
00257                qos_.transport_priority.value));
00258   }
00259 
00260   AssociationData data;
00261   data.remote_id_ = reader.readerId;
00262   data.remote_data_ = reader.readerTransInfo;
00263   data.remote_reliable_ =
00264     (reader.readerQos.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS);
00265   data.remote_durable_ =
00266     (reader.readerQos.durability.kind > DDS::VOLATILE_DURABILITY_QOS);
00267 
00268   if (!associate(data, active)) {
00269     //FUTURE: inform inforepo and try again as passive peer
00270     if (DCPS_debug_level) {
00271       ACE_DEBUG((LM_ERROR,
00272                  ACE_TEXT("(%P|%t) DataWriterImpl::add_association: ")
00273                  ACE_TEXT("ERROR: transport layer failed to associate.\n")));
00274     }
00275   }
00276 }

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::assert_liveliness (  )  [virtual]

Implements DDS::DataWriter.

Definition at line 1110 of file DataWriterImpl.cpp.

References OpenDDS::DCPS::DomainParticipantImpl::assert_liveliness(), DDS::AUTOMATIC_LIVELINESS_QOS, DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS, DDS::MANUAL_BY_TOPIC_LIVELINESS_QOS, participant_servant_, DDS::RETCODE_ERROR, and DDS::RETCODE_OK.

01111 {
01112   switch (this->qos_.liveliness.kind) {
01113   case DDS::AUTOMATIC_LIVELINESS_QOS:
01114     // Do nothing.
01115     break;
01116   case DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS:
01117     return participant_servant_->assert_liveliness();
01118   case DDS::MANUAL_BY_TOPIC_LIVELINESS_QOS:
01119     if (this->send_liveliness(ACE_OS::gettimeofday()) == false) {
01120       return DDS::RETCODE_ERROR;
01121     }
01122     break;
01123   }
01124 
01125   return DDS::RETCODE_OK;
01126 }

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::assert_liveliness_by_participant (  )  [virtual]

Definition at line 1129 of file DataWriterImpl.cpp.

References liveliness_asserted_, DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS, and DDS::RETCODE_OK.

01130 {
01131   // This operation is called by participant.
01132 
01133   if (this->qos_.liveliness.kind == DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS) {
01134     // Set a flag indicating that we should send a liveliness message on the timer if necessary.
01135     liveliness_asserted_ = true;
01136   }
01137 
01138   return DDS::RETCODE_OK;
01139 }

void OpenDDS::DCPS::DataWriterImpl::association_complete ( const RepoId remote_id  )  [virtual]

Implements OpenDDS::DCPS::DataWriterCallbacks.

Definition at line 395 of file DataWriterImpl.cpp.

References assoc_complete_readers_, association_complete_i(), DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, is_bit_, OPENDDS_STRING, pending_readers_, and OpenDDS::DCPS::remove().

00396 {
00397   DBG_ENTRY_LVL("DataWriterImpl", "association_complete", 6);
00398 
00399   if (DCPS_debug_level >= 1) {
00400     GuidConverter writer_converter(this->publication_id_);
00401     GuidConverter reader_converter(remote_id);
00402     ACE_DEBUG((LM_DEBUG,
00403                ACE_TEXT("(%P|%t) DataWriterImpl::association_complete - ")
00404                ACE_TEXT("bit %d local %C remote %C\n"),
00405                is_bit_,
00406                OPENDDS_STRING(writer_converter).c_str(),
00407                OPENDDS_STRING(reader_converter).c_str()));
00408   }
00409 
00410   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00411 
00412   if (OpenDDS::DCPS::remove(pending_readers_, remote_id) == -1) {
00413     if (DCPS_debug_level) {
00414       GuidConverter writer_converter(this->publication_id_);
00415       GuidConverter reader_converter(remote_id);
00416       ACE_DEBUG((LM_DEBUG,
00417                  ACE_TEXT("(%P|%t) DataWriterImpl::association_complete - ")
00418                  ACE_TEXT("bit %d local %C did not find pending reader: %C")
00419                  ACE_TEXT("defer association_complete_i until add_association resumes\n"),
00420                  is_bit_,
00421                  OPENDDS_STRING(writer_converter).c_str(),
00422                  OPENDDS_STRING(reader_converter).c_str()));
00423     }
00424     // Not found in pending_readers_, defer calling association_complete_i()
00425     // until add_association() resumes and sees this ID in assoc_complete_readers_.
00426     assoc_complete_readers_.insert(remote_id);
00427 
00428   } else {
00429     association_complete_i(remote_id);
00430   }
00431 }

void OpenDDS::DCPS::DataWriterImpl::association_complete_i ( const RepoId remote_id  )  [private]

Definition at line 434 of file DataWriterImpl.cpp.

References OpenDDS::DCPS::bind(), controlTracker, create_control_message(), DDS::PublicationMatchedStatus::current_count, DDS::PublicationMatchedStatus::current_count_change, data_container_, DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, dw_local_objref_, OpenDDS::DCPS::END_HISTORIC_SAMPLES, OpenDDS::DCPS::gen_find_size(), get_db_lock(), get_resend_data(), header, OpenDDS::DCPS::DomainParticipantImpl::id_to_handle(), id_to_handle_map_, OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::insert(), is_bit_, DDS::PublicationMatchedStatus::last_subscription_handle, listener_for(), OpenDDS::DCPS::MessageTracker::message_dropped(), OpenDDS::DCPS::MessageTracker::message_sent(), monitor_, OpenDDS::DCPS::EntityImpl::notify_status_condition(), OPENDDS_STRING, participant_servant_, publication_match_status_, DDS::PUBLICATION_MATCHED_STATUS, reader_info_, readers_, OpenDDS::DCPS::WriteDataContainer::reenqueue_all(), OpenDDS::DCPS::Monitor::report(), OpenDDS::DCPS::SEND_CONTROL_ERROR, OpenDDS::DCPS::TransportClient::send_w_control(), OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), OpenDDS::DCPS::time_value_to_time(), DDS::PublicationMatchedStatus::total_count, and DDS::PublicationMatchedStatus::total_count_change.

Referenced by association_complete(), and transport_assoc_done().

00435 {
00436   DBG_ENTRY_LVL("DataWriterImpl", "association_complete_i", 6);
00437 
00438   if (DCPS_debug_level >= 1) {
00439     GuidConverter writer_converter(this->publication_id_);
00440     GuidConverter reader_converter(remote_id);
00441     ACE_DEBUG((LM_DEBUG,
00442                ACE_TEXT("(%P|%t) DataWriterImpl::association_complete_i - ")
00443                ACE_TEXT("bit %d local %C remote %C\n"),
00444                is_bit_,
00445                OPENDDS_STRING(writer_converter).c_str(),
00446                OPENDDS_STRING(reader_converter).c_str()));
00447   }
00448 
00449   bool reader_durable = false;
00450 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00451   OPENDDS_STRING filterClassName;
00452   RcHandle<FilterEvaluator> eval;
00453   DDS::StringSeq expression_params;
00454 #endif
00455   {
00456     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00457 
00458     if (OpenDDS::DCPS::insert(readers_, remote_id) == -1) {
00459       GuidConverter converter(remote_id);
00460       ACE_ERROR((LM_ERROR,
00461                  ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::association_complete_i: ")
00462                  ACE_TEXT("insert %C from pending failed.\n"),
00463                  OPENDDS_STRING(converter).c_str()));
00464     }
00465   }
00466   {
00467     ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
00468     RepoIdToReaderInfoMap::const_iterator it = reader_info_.find(remote_id);
00469 
00470     if (it != reader_info_.end()) {
00471       reader_durable = it->second.durable_;
00472 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00473       filterClassName = it->second.filter_class_name_;
00474       eval = it->second.eval_;
00475       expression_params = it->second.expression_params_;
00476 #endif
00477     }
00478   }
00479 
00480   if (this->monitor_) {
00481     this->monitor_->report();
00482   }
00483 
00484   if (!is_bit_) {
00485 
00486     DDS::InstanceHandle_t handle =
00487       this->participant_servant_->id_to_handle(remote_id);
00488 
00489     {
00490       // protect publication_match_status_ and status changed flags.
00491       ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00492 
00493       // update the publication_match_status_
00494       ++publication_match_status_.total_count;
00495       ++publication_match_status_.total_count_change;
00496       ++publication_match_status_.current_count;
00497       ++publication_match_status_.current_count_change;
00498 
00499       if (OpenDDS::DCPS::bind(id_to_handle_map_, remote_id, handle) != 0) {
00500         GuidConverter converter(remote_id);
00501         ACE_DEBUG((LM_WARNING,
00502                    ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::association_complete_i: ")
00503                    ACE_TEXT("id_to_handle_map_%C = 0x%x failed.\n"),
00504                    OPENDDS_STRING(converter).c_str(),
00505                    handle));
00506         return;
00507 
00508       } else if (DCPS_debug_level > 4) {
00509         GuidConverter converter(remote_id);
00510         ACE_DEBUG((LM_DEBUG,
00511                    ACE_TEXT("(%P|%t) DataWriterImpl::association_complete_i: ")
00512                    ACE_TEXT("id_to_handle_map_%C = 0x%x.\n"),
00513                    OPENDDS_STRING(converter).c_str(),
00514                    handle));
00515       }
00516 
00517       publication_match_status_.last_subscription_handle = handle;
00518 
00519       set_status_changed_flag(DDS::PUBLICATION_MATCHED_STATUS, true);
00520     }
00521 
00522     DDS::DataWriterListener_var listener =
00523       listener_for(DDS::PUBLICATION_MATCHED_STATUS);
00524 
00525     if (!CORBA::is_nil(listener.in())) {
00526 
00527       listener->on_publication_matched(dw_local_objref_.in(),
00528                                        publication_match_status_);
00529 
00530       // TBD - why does the spec say to change this but not
00531       // change the ChangeFlagStatus after a listener call?
00532       publication_match_status_.total_count_change = 0;
00533       publication_match_status_.current_count_change = 0;
00534     }
00535 
00536     notify_status_condition();
00537   }
00538 
00539   // Support DURABILITY QoS
00540   if (reader_durable) {
00541     // Tell the WriteDataContainer to resend all sending/sent
00542     // samples.
00543     this->data_container_->reenqueue_all(remote_id, this->qos_.lifespan
00544 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00545                                          , filterClassName, eval.in(), expression_params
00546 #endif
00547                                         );
00548 
00549     // Acquire the data writer container lock to avoid deadlock. The
00550     // thread calling association_complete() has to acquire lock in the
00551     // same order as the write()/register() operation.
00552 
00553     // Since the thread calling association_complete() is the ORB
00554     // thread, it may have some performance penalty. If the
00555     // performance is an issue, we may need a new thread to handle the
00556     // data_available() calls.
00557     ACE_GUARD(ACE_Recursive_Thread_Mutex,
00558               guard,
00559               this->get_lock());
00560 
00561     SendStateDataSampleList list = this->get_resend_data();
00562     {
00563       ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
00564       // Update the reader's expected sequence
00565       SequenceNumber& seq =
00566         reader_info_.find(remote_id)->second.expected_sequence_;
00567 
00568       for (SendStateDataSampleList::iterator list_el = list.begin();
00569            list_el != list.end(); ++list_el) {
00570         list_el->get_header().historic_sample_ = true;
00571 
00572         if (list_el->get_header().sequence_ > seq) {
00573           seq = list_el->get_header().sequence_;
00574         }
00575       }
00576     }
00577 
00578     if (this->publisher_servant_->is_suspended()) {
00579       this->available_data_list_.enqueue_tail(list);
00580 
00581     } else {
00582       if (DCPS_debug_level >= 4) {
00583         ACE_DEBUG((LM_INFO, "(%P|%t) Sending historic samples\n"));
00584       }
00585 
00586       size_t size = 0, padding = 0;
00587       gen_find_size(remote_id, size, padding);
00588       ACE_Message_Block* const data =
00589         new ACE_Message_Block(size, ACE_Message_Block::MB_DATA, 0, 0, 0,
00590                               get_db_lock());
00591       Serializer ser(data);
00592       ser << remote_id;
00593 
00594       const DDS::Time_t timestamp = time_value_to_time(ACE_OS::gettimeofday());
00595       DataSampleHeader header;
00596       ACE_Message_Block* const end_historic_samples =
00597         create_control_message(END_HISTORIC_SAMPLES, header, data, timestamp);
00598 
00599       this->controlTracker.message_sent();
00600       guard.release();
00601       SendControlStatus ret = send_w_control(list, header, end_historic_samples, remote_id);
00602       if (ret == SEND_CONTROL_ERROR) {
00603         ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00604                              ACE_TEXT("DataWriterImpl::association_complete_i: ")
00605                              ACE_TEXT("send_w_control failed.\n")));
00606         this->controlTracker.message_dropped();
00607       }
00608     }
00609   }
00610 }

void OpenDDS::DCPS::DataWriterImpl::begin_coherent_changes (  ) 

Starts a coherent change set; should only be called once.

Definition at line 2198 of file DataWriterImpl.cpp.

References coherent_, and get_lock().

02199 {
02200   ACE_GUARD(ACE_Recursive_Thread_Mutex,
02201             guard,
02202             get_lock());
02203 
02204   this->coherent_ = true;
02205 }

bool OpenDDS::DCPS::DataWriterImpl::check_transport_qos ( const TransportInst inst  )  [virtual]

Implements OpenDDS::DCPS::TransportClient.

Definition at line 2177 of file DataWriterImpl.cpp.

02178 {
02179   // DataWriter does not impose any constraints on which transports
02180   // may be used based on QoS.
02181   return true;
02182 }

void OpenDDS::DCPS::DataWriterImpl::cleanup (  ) 

cleanup the DataWriter.

Definition at line 136 of file DataWriterImpl.cpp.

References cancel_timer_, dw_local_objref_, reactor_, OpenDDS::DCPS::TopicImpl::remove_entity_ref(), topic_objref_, and topic_servant_.

Referenced by OpenDDS::DCPS::PublisherImpl::delete_datawriter().

00137 {
00138   if (cancel_timer_) {
00139     // The cancel_timer will call handle_close to
00140     // remove_ref.
00141     (void) reactor_->cancel_timer(this, 0);
00142     cancel_timer_ = false;
00143   }
00144 
00145   // release our Topic_var
00146   topic_objref_ = DDS::Topic::_nil();
00147   topic_servant_->remove_entity_ref();
00148   topic_servant_->_remove_ref();
00149   topic_servant_ = 0;
00150 
00151   dw_local_objref_ = DDS::DataWriter::_nil();
00152 }

bool OpenDDS::DCPS::DataWriterImpl::coherent_changes_pending (  ) 

Are coherent changes pending?

Definition at line 2187 of file DataWriterImpl.cpp.

References coherent_, and get_lock().

02188 {
02189   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
02190                    guard,
02191                    get_lock(),
02192                    false);
02193 
02194   return this->coherent_;
02195 }

void OpenDDS::DCPS::DataWriterImpl::control_delivered ( ACE_Message_Block *  sample  )  [virtual]

This is called by transport to notify that the control message is delivered.

Reimplemented from OpenDDS::DCPS::TransportSendListener.

Definition at line 2135 of file DataWriterImpl.cpp.

References controlTracker, DBG_ENTRY_LVL, and OpenDDS::DCPS::MessageTracker::message_delivered().

02136 {
02137   DBG_ENTRY_LVL("DataWriterImpl","control_delivered",6);
02138   sample->release();
02139   controlTracker.message_delivered();
02140 }

void OpenDDS::DCPS::DataWriterImpl::control_dropped ( ACE_Message_Block *  sample,
bool  dropped_by_transport 
) [virtual]

This is called by transport to notify that the control message is dropped.

Reimplemented from OpenDDS::DCPS::TransportSendListener.

Definition at line 2276 of file DataWriterImpl.cpp.

References controlTracker, DBG_ENTRY_LVL, and OpenDDS::DCPS::MessageTracker::message_dropped().

02278 {
02279   DBG_ENTRY_LVL("DataWriterImpl","control_dropped",6);
02280   sample->release();
02281   controlTracker.message_dropped();
02282 }

DataWriterImpl::AckToken OpenDDS::DCPS::DataWriterImpl::create_ack_token ( DDS::Duration_t  max_wait  )  const

Create an AckToken for ack operations.

Definition at line 1003 of file DataWriterImpl.cpp.

References OpenDDS::DCPS::DCPS_debug_level.

Referenced by wait_for_acknowledgments().

01004 {
01005   if (DCPS_debug_level > 0) {
01006     ACE_DEBUG((LM_DEBUG,
01007                ACE_TEXT("(%P|%t) DataWriterImpl::create_ack_token() - ")
01008                ACE_TEXT("for sequence %q \n"),
01009                this->sequence_number_.getValue()));
01010   }
01011   return AckToken(max_wait, this->sequence_number_);
01012 }

ACE_Message_Block * OpenDDS::DCPS::DataWriterImpl::create_control_message ( MessageId  message_id,
DataSampleHeader header,
ACE_Message_Block *  data,
const DDS::Time_t source_timestamp 
) [private]

This method create a header message block and chain with the registered sample. The header contains the information needed. e.g. message id, length of whole message... The fast allocator is not used for the header.

Definition at line 1943 of file DataWriterImpl.cpp.

References OpenDDS::DCPS::DataSampleHeader::byte_order_, OpenDDS::DCPS::DataSampleHeader::coherent_change_, db_allocator_, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::DISPOSE_INSTANCE, OpenDDS::DCPS::DISPOSE_UNREGISTER_INSTANCE, get_db_lock(), OpenDDS::DCPS::INSTANCE_REGISTRATION, OpenDDS::DCPS::DataSampleHeader::key_fields_only_, OpenDDS::DCPS::Cached_Allocator_With_Overflow< T, ACE_LOCK >::malloc(), OpenDDS::DCPS::DataSampleHeader::max_marshaled_size(), mb_allocator_, OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::DataSampleHeader::message_length_, DDS::Time_t::nanosec, need_sequence_repair(), OPENDDS_STRING, publication_id_, OpenDDS::DCPS::DataSampleHeader::publication_id_, OpenDDS::DCPS::PublisherImpl::publisher_id_, OpenDDS::DCPS::DataSampleHeader::publisher_id_, publisher_servant_, reader_info_, DDS::Time_t::sec, OpenDDS::DCPS::DataSampleHeader::sequence_, sequence_number_, OpenDDS::DCPS::DataSampleHeader::sequence_repair_, OpenDDS::DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN(), OpenDDS::RTPS::SEQUENCENUMBER_UNKNOWN, OpenDDS::DCPS::DataSampleHeader::source_timestamp_nanosec_, OpenDDS::DCPS::DataSampleHeader::source_timestamp_sec_, OpenDDS::DCPS::TransportClient::swap_bytes(), OpenDDS::DCPS::to_string(), and OpenDDS::DCPS::UNREGISTER_INSTANCE.

Referenced by association_complete_i(), dispose(), dispose_and_unregister(), end_coherent_changes(), register_instance_i(), send_liveliness(), and unregister_instance_i().

01947 {
01948   header_data.message_id_ = message_id;
01949   header_data.byte_order_ =
01950     this->swap_bytes() ? !ACE_CDR_BYTE_ORDER : ACE_CDR_BYTE_ORDER;
01951   header_data.coherent_change_ = 0;
01952 
01953   if (data) {
01954     header_data.message_length_ = static_cast<ACE_UINT32>(data->total_length());
01955 
01956     if (header_data.message_length_ == 0) {
01957       data->release();
01958     }
01959   }
01960 
01961   header_data.sequence_ = SequenceNumber::SEQUENCENUMBER_UNKNOWN();
01962   header_data.sequence_repair_ = false; // set below
01963   header_data.source_timestamp_sec_ = source_timestamp.sec;
01964   header_data.source_timestamp_nanosec_ = source_timestamp.nanosec;
01965   header_data.publication_id_ = publication_id_;
01966   header_data.publisher_id_ = this->publisher_servant_->publisher_id_;
01967 
01968   if (message_id == INSTANCE_REGISTRATION
01969       || message_id == DISPOSE_INSTANCE
01970       || message_id == UNREGISTER_INSTANCE
01971       || message_id == DISPOSE_UNREGISTER_INSTANCE) {
01972 
01973     header_data.sequence_repair_ = need_sequence_repair();
01974 
01975     // Use the sequence number here for the sake of RTPS (where these
01976     // control messages map onto the Data Submessage).
01977     if (this->sequence_number_ == SequenceNumber::SEQUENCENUMBER_UNKNOWN()) {
01978       this->sequence_number_ = SequenceNumber();
01979 
01980     } else {
01981       ++this->sequence_number_;
01982     }
01983 
01984     header_data.sequence_ = this->sequence_number_;
01985     header_data.key_fields_only_ = true;
01986   }
01987 
01988   ACE_Message_Block* message = 0;
01989   ACE_NEW_MALLOC_RETURN(message,
01990                         static_cast<ACE_Message_Block*>(
01991                           mb_allocator_->malloc(sizeof(ACE_Message_Block))),
01992                         ACE_Message_Block(
01993                           DataSampleHeader::max_marshaled_size(),
01994                           ACE_Message_Block::MB_DATA,
01995                           header_data.message_length_ ? data : 0, //cont
01996                           0, //data
01997                           0, //allocator_strategy
01998                           get_db_lock(), //locking_strategy
01999                           ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
02000                           ACE_Time_Value::zero,
02001                           ACE_Time_Value::max_time,
02002                           db_allocator_,
02003                           mb_allocator_),
02004                         0);
02005 
02006   *message << header_data;
02007 
02008   // If we incremented sequence number for this control message
02009   if (header_data.sequence_ != SequenceNumber::SEQUENCENUMBER_UNKNOWN()) {
02010     ACE_GUARD_RETURN(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_, 0);
02011     // Update the expected sequence number for all readers
02012     RepoIdToReaderInfoMap::iterator reader;
02013 
02014     for (reader = reader_info_.begin(); reader != reader_info_.end(); ++reader) {
02015       reader->second.expected_sequence_ = sequence_number_;
02016     }
02017   }
02018   if (DCPS_debug_level >= 4) {
02019     const GuidConverter converter(publication_id_);
02020     ACE_DEBUG((LM_DEBUG,
02021                ACE_TEXT("(%P|%t) DataWriterImpl::create_control_message: ")
02022                ACE_TEXT("from publication %C sending control sample: %C .\n"),
02023                OPENDDS_STRING(converter).c_str(),
02024                to_string(header_data).c_str()));
02025   }
02026   return message;
02027 }

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::create_sample_data_message ( DataSample data,
DDS::InstanceHandle_t  instance_handle,
DataSampleHeader header_data,
ACE_Message_Block *&  message,
const DDS::Time_t source_timestamp,
bool  content_filter 
)

This method create a header message block and chain with the sample data. The header contains the information needed. e.g. message id, length of whole message... The fast allocator is used to allocate the message block, data block and header.

Definition at line 2030 of file DataWriterImpl.cpp.

References OpenDDS::DCPS::DataSampleHeader::byte_order_, OpenDDS::DCPS::TransportClient::cdr_encapsulation(), OpenDDS::DCPS::DataSampleHeader::cdr_encapsulation_, coherent_, OpenDDS::DCPS::DataSampleHeader::coherent_change_, OpenDDS::DCPS::DataSampleHeader::content_filter_, data_container_, db_allocator_, OpenDDS::DCPS::DCPS_debug_level, DDS::DURATION_INFINITE_NSEC, DDS::DURATION_INFINITE_SEC, get_db_lock(), OpenDDS::DCPS::WriteDataContainer::get_handle_instance(), OpenDDS::DCPS::DataSampleHeader::group_coherent_, DDS::GROUP_PRESENTATION_QOS, header_allocator_, DDS::DataWriterQos::lifespan, OpenDDS::DCPS::DataSampleHeader::lifespan_duration_, OpenDDS::DCPS::DataSampleHeader::lifespan_duration_nanosec_, OpenDDS::DCPS::DataSampleHeader::lifespan_duration_sec_, OpenDDS::DCPS::Cached_Allocator_With_Overflow< T, ACE_LOCK >::malloc(), OpenDDS::DCPS::DataSampleHeader::max_marshaled_size(), max_marshaled_size(), mb_allocator_, OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::DataSampleHeader::message_length_, DDS::Time_t::nanosec, need_sequence_repair(), OPENDDS_STRING, DDS::PublisherQos::presentation, publication_id_, OpenDDS::DCPS::DataSampleHeader::publication_id_, OpenDDS::DCPS::PublisherImpl::publisher_id_, OpenDDS::DCPS::DataSampleHeader::publisher_id_, publisher_servant_, qos_, OpenDDS::DCPS::PublisherImpl::qos_, DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::SAMPLE_DATA, DDS::Time_t::sec, OpenDDS::DCPS::DataSampleHeader::sequence_, sequence_number_, OpenDDS::DCPS::DataSampleHeader::sequence_repair_, OpenDDS::DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN(), OpenDDS::DCPS::DataSampleHeader::source_timestamp_nanosec_, OpenDDS::DCPS::DataSampleHeader::source_timestamp_sec_, OpenDDS::DCPS::TransportClient::swap_bytes(), and OpenDDS::DCPS::to_string().

Referenced by write().

02036 {
02037   PublicationInstance* const instance =
02038     data_container_->get_handle_instance(instance_handle);
02039 
02040   if (0 == instance) {
02041     ACE_ERROR_RETURN((LM_ERROR,
02042                       ACE_TEXT("(%P|%t) DataWriterImpl::create_sample_data_message ")
02043                       ACE_TEXT("failed to find instance for handle %d\n"),
02044                       instance_handle),
02045                      DDS::RETCODE_ERROR);
02046   }
02047 
02048   header_data.message_id_ = SAMPLE_DATA;
02049   header_data.byte_order_ =
02050     this->swap_bytes() ? !ACE_CDR_BYTE_ORDER : ACE_CDR_BYTE_ORDER;
02051   header_data.coherent_change_ = this->coherent_;
02052 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
02053   header_data.group_coherent_ =
02054     this->publisher_servant_->qos_.presentation.access_scope
02055     == DDS::GROUP_PRESENTATION_QOS;
02056 #endif
02057   header_data.content_filter_ = content_filter;
02058   header_data.cdr_encapsulation_ = this->cdr_encapsulation();
02059   header_data.message_length_ = static_cast<ACE_UINT32>(data->total_length());
02060   header_data.sequence_repair_ = need_sequence_repair();
02061 
02062   if (this->sequence_number_ == SequenceNumber::SEQUENCENUMBER_UNKNOWN()) {
02063     this->sequence_number_ = SequenceNumber();
02064 
02065   } else {
02066     ++this->sequence_number_;
02067   }
02068 
02069   header_data.sequence_ = this->sequence_number_;
02070   header_data.source_timestamp_sec_ = source_timestamp.sec;
02071   header_data.source_timestamp_nanosec_ = source_timestamp.nanosec;
02072 
02073   if (qos_.lifespan.duration.sec != DDS::DURATION_INFINITE_SEC
02074       || qos_.lifespan.duration.nanosec != DDS::DURATION_INFINITE_NSEC) {
02075     header_data.lifespan_duration_ = true;
02076     header_data.lifespan_duration_sec_ = qos_.lifespan.duration.sec;
02077     header_data.lifespan_duration_nanosec_ = qos_.lifespan.duration.nanosec;
02078   }
02079 
02080   header_data.publication_id_ = publication_id_;
02081   header_data.publisher_id_ = this->publisher_servant_->publisher_id_;
02082   size_t max_marshaled_size = header_data.max_marshaled_size();
02083 
02084   ACE_NEW_MALLOC_RETURN(message,
02085                         static_cast<ACE_Message_Block*>(
02086                           mb_allocator_->malloc(sizeof(ACE_Message_Block))),
02087                         ACE_Message_Block(max_marshaled_size,
02088                                           ACE_Message_Block::MB_DATA,
02089                                           data, //cont
02090                                           0, //data
02091                                           header_allocator_, //alloc_strategy
02092                                           get_db_lock(), //locking_strategy
02093                                           ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
02094                                           ACE_Time_Value::zero,
02095                                           ACE_Time_Value::max_time,
02096                                           db_allocator_,
02097                                           mb_allocator_),
02098                         DDS::RETCODE_ERROR);
02099 
02100   *message << header_data;
02101   if (DCPS_debug_level >= 4) {
02102     const GuidConverter converter(publication_id_);
02103     ACE_DEBUG((LM_DEBUG,
02104                ACE_TEXT("(%P|%t) DataWriterImpl::create_sample_data_message: ")
02105                ACE_TEXT("from publication %C sending data sample: %C .\n"),
02106                OPENDDS_STRING(converter).c_str(),
02107                to_string(header_data).c_str()));
02108   }
02109   return DDS::RETCODE_OK;
02110 }

void OpenDDS::DCPS::DataWriterImpl::data_delivered ( const DataSampleElement sample  )  [virtual]

This is called by transport to notify that the sample is delivered and it is delegated to WriteDataContainer to adjust the internal data sample threads.

Reimplemented from OpenDDS::DCPS::TransportSendListener.

Definition at line 2113 of file DataWriterImpl.cpp.

References data_container_, OpenDDS::DCPS::WriteDataContainer::data_delivered(), data_delivered_count_, DBG_ENTRY_LVL, OpenDDS::DCPS::DataSampleElement::get_pub_id(), OPENDDS_STRING, and publication_id_.

02114 {
02115   DBG_ENTRY_LVL("DataWriterImpl","data_delivered",6);
02116 
02117   if (!(sample->get_pub_id() == this->publication_id_)) {
02118     GuidConverter sample_converter(sample->get_pub_id());
02119     GuidConverter writer_converter(publication_id_);
02120     ACE_ERROR((LM_ERROR,
02121                ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::data_delivered: ")
02122                ACE_TEXT(" The publication id %C from delivered element ")
02123                ACE_TEXT("does not match the datawriter's id %C\n"),
02124                OPENDDS_STRING(sample_converter).c_str(),
02125                OPENDDS_STRING(writer_converter).c_str()));
02126     return;
02127   }
02128   //provided for statistics tracking in tests
02129   ++data_delivered_count_;
02130 
02131   this->data_container_->data_delivered(sample);
02132 }

void OpenDDS::DCPS::DataWriterImpl::data_dropped ( const DataSampleElement element,
bool  dropped_by_transport 
) [virtual]

This mothod is called by transport to notify the instance sample is dropped and it delegates to WriteDataContainer to update the internal list.

Reimplemented from OpenDDS::DCPS::TransportSendListener.

Definition at line 2264 of file DataWriterImpl.cpp.

References data_container_, OpenDDS::DCPS::WriteDataContainer::data_dropped(), data_dropped_count_, and DBG_ENTRY_LVL.

02266 {
02267   DBG_ENTRY_LVL("DataWriterImpl","data_dropped",6);
02268 
02269   //provided for statistics tracking in tests
02270   ++data_dropped_count_;
02271 
02272   this->data_container_->data_dropped(element, dropped_by_transport);
02273 }

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::dispose ( DDS::InstanceHandle_t  handle,
const DDS::Time_t source_timestamp 
)

Delegate to the WriteDataContainer to dispose all data samples for a given instance and tell the transport to broadcast the disposed instance.

Definition at line 1841 of file DataWriterImpl.cpp.

References create_control_message(), data_container_, DBG_ENTRY_LVL, OpenDDS::DCPS::WriteDataContainer::dispose(), OpenDDS::DCPS::DISPOSE_INSTANCE, OpenDDS::DCPS::EntityImpl::enabled_, OpenDDS::DCPS::WriteDataContainer::enqueue_control(), OpenDDS::DCPS::DataSampleElement::get_header(), get_lock(), OpenDDS::DCPS::WriteDataContainer::obtain_buffer_for_control(), DDS::RETCODE_ERROR, DDS::RETCODE_NOT_ENABLED, DDS::RETCODE_OK, send_all_to_flush_control(), and OpenDDS::DCPS::DataSampleElement::set_sample().

Referenced by OpenDDS::DCPS::DataWriterImpl_T< MessageType >::dispose_w_timestamp().

01843 {
01844   DBG_ENTRY_LVL("DataWriterImpl","dispose",6);
01845 
01846   if (enabled_ == false) {
01847     ACE_ERROR_RETURN((LM_ERROR,
01848                       ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::dispose: ")
01849                       ACE_TEXT(" Entity is not enabled. \n")),
01850                      DDS::RETCODE_NOT_ENABLED);
01851   }
01852 
01853   DDS::ReturnCode_t ret = ::DDS::RETCODE_ERROR;
01854 
01855   ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, guard, get_lock(), ret);
01856 
01857   DataSample* registered_sample_data = 0;
01858   ret = this->data_container_->dispose(handle, registered_sample_data);
01859 
01860   if (ret != DDS::RETCODE_OK) {
01861     ACE_ERROR_RETURN((LM_ERROR,
01862                       ACE_TEXT("(%P|%t) ERROR: ")
01863                       ACE_TEXT("DataWriterImpl::dispose: ")
01864                       ACE_TEXT("dispose failed.\n")),
01865                      ret);
01866   }
01867 
01868   DataSampleElement* element = 0;
01869   ret = this->data_container_->obtain_buffer_for_control(element);
01870 
01871   if (ret != DDS::RETCODE_OK) {
01872     ACE_ERROR_RETURN((LM_ERROR,
01873                       ACE_TEXT("(%P|%t) ERROR: ")
01874                       ACE_TEXT("DataWriterImpl::dispose: ")
01875                       ACE_TEXT("obtain_buffer_for_control returned %d.\n"),
01876                       ret),
01877                      ret);
01878   }
01879 
01880   element->set_sample(create_control_message(DISPOSE_INSTANCE,
01881                                              element->get_header(),
01882                                              registered_sample_data,
01883                                              source_timestamp));
01884   ret = this->data_container_->enqueue_control(element);
01885 
01886   if (ret != DDS::RETCODE_OK) {
01887     ACE_ERROR_RETURN((LM_ERROR,
01888                       ACE_TEXT("(%P|%t) ERROR: ")
01889                       ACE_TEXT("DataWriterImpl::dispose: ")
01890                       ACE_TEXT("enqueue_control failed.\n")),
01891                      ret);
01892   }
01893 
01894   send_all_to_flush_control(guard);
01895 
01896   return DDS::RETCODE_OK;
01897 }

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::dispose_and_unregister ( DDS::InstanceHandle_t  handle,
const DDS::Time_t timestamp 
) [private]

Definition at line 1616 of file DataWriterImpl.cpp.

References create_control_message(), data_container_, DBG_ENTRY_LVL, OpenDDS::DCPS::WriteDataContainer::dispose(), OpenDDS::DCPS::DISPOSE_UNREGISTER_INSTANCE, OpenDDS::DCPS::WriteDataContainer::enqueue_control(), OpenDDS::DCPS::DataSampleElement::get_header(), get_lock(), OpenDDS::DCPS::WriteDataContainer::obtain_buffer_for_control(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, send_all_to_flush_control(), OpenDDS::DCPS::DataSampleElement::set_sample(), and OpenDDS::DCPS::WriteDataContainer::unregister().

Referenced by unregister_instance_i().

01618 {
01619   DBG_ENTRY_LVL("DataWriterImpl", "dispose_and_unregister", 6);
01620 
01621   DDS::ReturnCode_t ret = DDS::RETCODE_ERROR;
01622   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, get_lock(), ret);
01623 
01624   DataSample* data_sample = 0;
01625   ret = this->data_container_->dispose(handle, data_sample);
01626 
01627   if (ret != DDS::RETCODE_OK) {
01628     ACE_ERROR_RETURN((LM_ERROR,
01629                       ACE_TEXT("(%P|%t) ERROR: ")
01630                       ACE_TEXT("DataWriterImpl::dispose_and_unregister: ")
01631                       ACE_TEXT("dispose on container failed. \n")),
01632                      ret);
01633   }
01634 
01635   ret = this->data_container_->unregister(handle, data_sample, false);
01636 
01637   if (ret != DDS::RETCODE_OK) {
01638     ACE_ERROR_RETURN((LM_ERROR,
01639                       ACE_TEXT("(%P|%t) ERROR: ")
01640                       ACE_TEXT("DataWriterImpl::dispose_and_unregister: ")
01641                       ACE_TEXT("unregister with container failed. \n")),
01642                      ret);
01643   }
01644 
01645   DataSampleElement* element = 0;
01646   ret = this->data_container_->obtain_buffer_for_control(element);
01647 
01648   if (ret != DDS::RETCODE_OK) {
01649     ACE_ERROR_RETURN((LM_ERROR,
01650                       ACE_TEXT("(%P|%t) ERROR: ")
01651                       ACE_TEXT("DataWriterImpl::dispose_and_unregister: ")
01652                       ACE_TEXT("obtain_buffer_for_control returned %d.\n"),
01653                       ret),
01654                      ret);
01655   }
01656 
01657   element->set_sample(create_control_message(DISPOSE_UNREGISTER_INSTANCE,
01658                                              element->get_header(),
01659                                              data_sample,
01660                                              source_timestamp));
01661 
01662   ret = this->data_container_->enqueue_control(element);
01663 
01664   if (ret != DDS::RETCODE_OK) {
01665     ACE_ERROR_RETURN((LM_ERROR,
01666                       ACE_TEXT("(%P|%t) ERROR: ")
01667                       ACE_TEXT("DataWriterImpl::dispose_and_unregister: ")
01668                       ACE_TEXT("enqueue_control failed.\n")),
01669                      ret);
01670   }
01671 
01672   send_all_to_flush_control(guard);
01673   return DDS::RETCODE_OK;
01674 }

DDS::DomainId_t OpenDDS::DCPS::DataWriterImpl::domain_id (  )  const [inline, private, virtual]

Implements OpenDDS::DCPS::TransportClient.

Definition at line 574 of file DataWriterImpl.h.

00574                                   {
00575     return this->domain_id_;
00576   }

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::enable (  )  [virtual]

Implements DDS::Entity.

Definition at line 1228 of file DataWriterImpl.cpp.

References OpenDDS::DCPS::DomainParticipantImpl::add_adjust_liveliness_timers(), association_chunk_multiplier_, cancel_timer_, OpenDDS::DCPS::TransportClient::connection_info(), data_container_, db_allocator_, OpenDDS::DCPS::DCPS_debug_level, DDS::DataWriterQos::deadline, domain_id_, DDS::DataWriterQos::durability, DDS::DataWriterQos::durability_service, DDS::DURATION_INFINITE_NSEC, DDS::DURATION_INFINITE_SEC, OpenDDS::DCPS::duration_to_time_value(), enable_specific(), OpenDDS::DCPS::TransportClient::enable_transport(), OpenDDS::DCPS::TopicImpl::get_id(), OpenDDS::DCPS::get_instance_sample_list_depth(), OpenDDS::DCPS::PublisherImpl::get_qos(), get_topic_name(), get_type_name(), OpenDDS::DCPS::GUID_UNKNOWN, header_allocator_, DDS::DataWriterQos::history, last_deadline_missed_total_count_, last_liveliness_check_time_, DDS::LENGTH_UNLIMITED, DDS::DataWriterQos::liveliness, liveliness_check_interval_, mb_allocator_, monitor_, n_chunks_, DDS::Duration_t::nanosec, offered_deadline_missed_status_, participant_servant_, OpenDDS::DCPS::WriteDataContainer::publication_id_, publication_id_, publisher_servant_, qos_, reactor_, DDS::DataWriterQos::reliability, DDS::RELIABLE_RELIABILITY_QOS, OpenDDS::DCPS::Monitor::report(), DDS::DataWriterQos::resource_limits, DDS::RETCODE_ERROR, DDS::RETCODE_OK, DDS::RETCODE_PRECONDITION_NOT_MET, DDS::Duration_t::sec, OpenDDS::DCPS::EntityImpl::set_enabled(), TheServiceParticipant, topic_name_, topic_servant_, DDS::VOLATILE_DURABILITY_QOS, watchdog_, WriteDataContainer, and OpenDDS::DCPS::PublisherImpl::writer_enabled().

Referenced by OpenDDS::DCPS::PublisherImpl::create_datawriter().

01229 {
01230   //According spec:
01231   // - Calling enable on an already enabled Entity returns OK and has no
01232   // effect.
01233   // - Calling enable on an Entity whose factory is not enabled will fail
01234   // and return PRECONDITION_NOT_MET.
01235 
01236   if (this->is_enabled()) {
01237     return DDS::RETCODE_OK;
01238   }
01239 
01240   if (this->publisher_servant_->is_enabled() == false) {
01241     return DDS::RETCODE_PRECONDITION_NOT_MET;
01242   }
01243 
01244   // Note: do configuration based on QoS in enable() because
01245   //       before enable is called the QoS can be changed -- even
01246   //       for Changeable=NO
01247 
01248   // Configure WriteDataContainer constructor parameters from qos.
01249 
01250   const bool reliable = qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS;
01251 
01252   CORBA::Long const depth =
01253     get_instance_sample_list_depth(
01254       qos_.history.kind,
01255       qos_.history.depth,
01256       qos_.resource_limits.max_samples_per_instance);
01257 
01258   bool resource_blocking = false;
01259 
01260   if (qos_.resource_limits.max_samples != DDS::LENGTH_UNLIMITED) {
01261     n_chunks_ = qos_.resource_limits.max_samples;
01262 
01263     if (qos_.resource_limits.max_instances == DDS::LENGTH_UNLIMITED) {
01264       resource_blocking = true;
01265 
01266     } else {
01267       resource_blocking =
01268         (qos_.resource_limits.max_samples < qos_.resource_limits.max_instances)
01269         || (qos_.resource_limits.max_samples <
01270             (qos_.resource_limits.max_instances * depth));
01271     }
01272   }
01273 
01274   //else using value from Service_Participant
01275 
01276   // enable the type specific part of this DataWriter
01277   this->enable_specific();
01278 
01279   CORBA::Long max_instances = 0;
01280 
01281   if (reliable && qos_.resource_limits.max_instances != DDS::LENGTH_UNLIMITED)
01282     max_instances = qos_.resource_limits.max_instances;
01283 
01284   CORBA::Long max_total_samples = 0;
01285 
01286   if (reliable && resource_blocking)
01287     max_total_samples = qos_.resource_limits.max_samples;
01288 
01289 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
01290   // Get data durability cache if DataWriter QoS requires durable
01291   // samples.  Publisher servant retains ownership of the cache.
01292   DataDurabilityCache* const durability_cache =
01293     TheServiceParticipant->get_data_durability_cache(qos_.durability);
01294 #endif
01295 
01296   //Note: the QoS used to set n_chunks_ is Changable=No so
01297   // it is OK that we cannot change the size of our allocators.
01298   data_container_ = new WriteDataContainer(this,
01299                                            depth,
01300                                            qos_.reliability.max_blocking_time,
01301                                            n_chunks_,
01302                                            domain_id_,
01303                                            get_topic_name(),
01304                                            get_type_name(),
01305 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
01306                                            durability_cache,
01307                                            qos_.durability_service,
01308 #endif
01309                                            max_instances,
01310                                            max_total_samples);
01311 
01312   // +1 because we might allocate one before releasing another
01313   // TBD - see if this +1 can be removed.
01314   mb_allocator_ = new MessageBlockAllocator(n_chunks_ * association_chunk_multiplier_);
01315   db_allocator_ = new DataBlockAllocator(n_chunks_+1);
01316   header_allocator_ = new DataSampleHeaderAllocator(n_chunks_+1);
01317 
01318   if (DCPS_debug_level >= 2) {
01319     ACE_DEBUG((LM_DEBUG,
01320                "(%P|%t) DataWriterImpl::enable-mb"
01321                " Cached_Allocator_With_Overflow %x with %d chunks\n",
01322                mb_allocator_,
01323                n_chunks_));
01324 
01325     ACE_DEBUG((LM_DEBUG,
01326                "(%P|%t) DataWriterImpl::enable-db"
01327                " Cached_Allocator_With_Overflow %x with %d chunks\n",
01328                db_allocator_,
01329                n_chunks_));
01330 
01331     ACE_DEBUG((LM_DEBUG,
01332                "(%P|%t) DataWriterImpl::enable-header"
01333                " Cached_Allocator_With_Overflow %x with %d chunks\n",
01334                header_allocator_,
01335                n_chunks_));
01336   }
01337 
01338   if (qos_.liveliness.lease_duration.sec != DDS::DURATION_INFINITE_SEC &&
01339       qos_.liveliness.lease_duration.nanosec != DDS::DURATION_INFINITE_NSEC) {
01340     liveliness_check_interval_ = duration_to_time_value(qos_.liveliness.lease_duration);
01341     liveliness_check_interval_ *= TheServiceParticipant->liveliness_factor()/100.0;
01342     // Must be at least 1 micro second.
01343     if (liveliness_check_interval_ == ACE_Time_Value::zero) {
01344       liveliness_check_interval_ = ACE_Time_Value (0, 1);
01345     }
01346 
01347     last_liveliness_check_time_ = ACE_OS::gettimeofday();
01348 
01349     if (reactor_->schedule_timer(this,
01350                                  0,
01351                                  liveliness_check_interval_,
01352                                  liveliness_check_interval_) == -1) {
01353       ACE_ERROR((LM_ERROR,
01354                  ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::enable: %p.\n"),
01355                  ACE_TEXT("schedule_timer")));
01356 
01357     } else {
01358       cancel_timer_ = true;
01359       this->_add_ref();
01360     }
01361   }
01362 
01363   participant_servant_->add_adjust_liveliness_timers(this);
01364 
01365   // Setup the offered deadline watchdog if the configured deadline
01366   // period is not the default (infinite).
01367   DDS::Duration_t const deadline_period = this->qos_.deadline.period;
01368 
01369   if (deadline_period.sec != DDS::DURATION_INFINITE_SEC
01370       || deadline_period.nanosec != DDS::DURATION_INFINITE_NSEC) {
01371     this->watchdog_ = new OfferedDeadlineWatchdog(
01372                          this->lock_,
01373                          this->qos_.deadline,
01374                          this,
01375                          this->dw_local_objref_.in(),
01376                          this->offered_deadline_missed_status_,
01377                          this->last_deadline_missed_total_count_);
01378   }
01379 
01380   Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
01381   disco->pre_writer(this);
01382 
01383   this->set_enabled();
01384 
01385   try {
01386     this->enable_transport(reliable,
01387                            this->qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS);
01388 
01389   } catch (const Transport::Exception&) {
01390     ACE_ERROR((LM_ERROR,
01391                ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::enable, ")
01392                ACE_TEXT("Transport Exception.\n")));
01393     return DDS::RETCODE_ERROR;
01394 
01395   }
01396 
01397   const TransportLocatorSeq& trans_conf_info = connection_info();
01398 
01399   DDS::PublisherQos pub_qos;
01400   this->publisher_servant_->get_qos(pub_qos);
01401 
01402   this->publication_id_ =
01403     disco->add_publication(this->domain_id_,
01404                            this->participant_servant_->get_id(),
01405                            this->topic_servant_->get_id(),
01406                            this,
01407                            this->qos_,
01408                            trans_conf_info,
01409                            pub_qos);
01410 
01411   if (this->publication_id_ == GUID_UNKNOWN) {
01412     ACE_ERROR((LM_ERROR,
01413                ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::enable, ")
01414                ACE_TEXT("add_publication returned invalid id. \n")));
01415     return DDS::RETCODE_ERROR;
01416   }
01417 
01418   this->data_container_->publication_id_ = this->publication_id_;
01419 
01420   const DDS::ReturnCode_t writer_enabled_result =
01421     publisher_servant_->writer_enabled(topic_name_.in(), this);
01422 
01423   if (this->monitor_) {
01424     this->monitor_->report();
01425   }
01426 
01427 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
01428 
01429   // Move cached data from the durability cache to the unsent data
01430   // queue.
01431   if (durability_cache != 0) {
01432 
01433     if (!durability_cache->get_data(this->domain_id_,
01434                                     get_topic_name(),
01435                                     get_type_name(),
01436                                     this,
01437                                     this->mb_allocator_,
01438                                     this->db_allocator_,
01439                                     this->qos_.lifespan)) {
01440       ACE_ERROR((LM_ERROR,
01441                  ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::enable: ")
01442                  ACE_TEXT("unable to retrieve durable data\n")));
01443     }
01444   }
01445 
01446 #endif
01447 
01448   return writer_enabled_result;
01449 }

virtual DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::enable_specific (  )  [protected, pure virtual]

Implemented in OpenDDS::DCPS::DataWriterImpl_T< MessageType >.

Referenced by enable().

void OpenDDS::DCPS::DataWriterImpl::end_coherent_changes ( const GroupCoherentSamples &  group_samples  ) 

Ends a coherent change set; should only be called once.

Definition at line 2208 of file DataWriterImpl.cpp.

References coherent_, coherent_samples_, OpenDDS::DCPS::CoherentChangeControl::coherent_samples_, create_control_message(), OpenDDS::DCPS::END_COHERENT_CHANGES, get_db_lock(), get_lock(), OpenDDS::DCPS::CoherentChangeControl::group_coherent_, OpenDDS::DCPS::CoherentChangeControl::group_coherent_samples_, DDS::GROUP_PRESENTATION_QOS, header, OpenDDS::DCPS::WriterCoherentSample::last_sample_, OpenDDS::DCPS::CoherentChangeControl::max_marshaled_size(), max_marshaled_size(), OpenDDS::DCPS::WriterCoherentSample::num_samples_, DDS::PublisherQos::presentation, OpenDDS::DCPS::PublisherImpl::publisher_id_, OpenDDS::DCPS::CoherentChangeControl::publisher_id_, publisher_servant_, OpenDDS::DCPS::PublisherImpl::qos_, OpenDDS::DCPS::SEND_CONTROL_ERROR, sequence_number_, and OpenDDS::DCPS::time_value_to_time().

02209 {
02210   // PublisherImpl::pi_lock_ should be held.
02211   ACE_GUARD(ACE_Recursive_Thread_Mutex,
02212             guard,
02213             get_lock());
02214 
02215   CoherentChangeControl end_msg;
02216   end_msg.coherent_samples_.num_samples_ = this->coherent_samples_;
02217   end_msg.coherent_samples_.last_sample_ = this->sequence_number_;
02218   end_msg.group_coherent_
02219     = this->publisher_servant_->qos_.presentation.access_scope == DDS::GROUP_PRESENTATION_QOS;
02220 
02221   if (end_msg.group_coherent_) {
02222     end_msg.publisher_id_ = this->publisher_servant_->publisher_id_;
02223     end_msg.group_coherent_samples_ = group_samples;
02224   }
02225 
02226   ACE_Message_Block* data = 0;
02227   size_t max_marshaled_size = end_msg.max_marshaled_size();
02228 
02229   ACE_NEW(data, ACE_Message_Block(max_marshaled_size,
02230                                   ACE_Message_Block::MB_DATA,
02231                                   0, //cont
02232                                   0, //data
02233                                   0, //alloc_strategy
02234                                   get_db_lock()));
02235 
02236   Serializer serializer(
02237     data,
02238     this->swap_bytes());
02239 
02240   serializer << end_msg;
02241 
02242   DDS::Time_t source_timestamp =
02243     time_value_to_time(ACE_OS::gettimeofday());
02244 
02245   DataSampleHeader header;
02246   ACE_Message_Block* control =
02247     create_control_message(END_COHERENT_CHANGES, header, data, source_timestamp);
02248 
02249 
02250   this->coherent_ = false;
02251   this->coherent_samples_ = 0;
02252 
02253   guard.release();
02254   if (this->send_control(header, control) == SEND_CONTROL_ERROR) {
02255     ACE_ERROR((LM_ERROR,
02256                ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::end_coherent_changes:")
02257                ACE_TEXT(" unable to send END_COHERENT_CHANGES control message!\n")));
02258   }
02259 }

bool OpenDDS::DCPS::DataWriterImpl::filter_out ( const DataSampleElement elt,
const OPENDDS_STRING &  filterClassName,
const FilterEvaluator evaluator,
const DDS::StringSeq expression_params 
) const

Definition at line 2150 of file DataWriterImpl.cpp.

References OpenDDS::DCPS::DataSampleHeader::byte_order_, OpenDDS::DCPS::DataSampleHeader::cdr_encapsulation_, OpenDDS::DCPS::FilterEvaluator::eval(), OpenDDS::DCPS::DataSampleElement::get_header(), OpenDDS::DCPS::DataSampleElement::get_sample(), OpenDDS::DCPS::TopicDescriptionImpl::get_type_support(), OpenDDS::DCPS::TypeSupportImpl::getMetaStructForType(), and topic_servant_.

Referenced by OpenDDS::DCPS::WriteDataContainer::copy_and_append().

02154 {
02155   TypeSupportImpl* const typesupport =
02156     dynamic_cast<TypeSupportImpl*>(topic_servant_->get_type_support());
02157 
02158   if (!typesupport) {
02159     ACE_ERROR((LM_ERROR, "(%P|%t) ERROR DataWriterImpl::filter_out - Could not cast type support, not filtering\n"));
02160     return false;
02161   }
02162 
02163   if (filterClassName == "DDSSQL" ||
02164       filterClassName == "OPENDDSSQL") {
02165     return !evaluator.eval(elt.get_sample()->cont(),
02166                            elt.get_header().byte_order_ != ACE_CDR_BYTE_ORDER,
02167                            elt.get_header().cdr_encapsulation_, typesupport->getMetaStructForType(),
02168                            expression_params);
02169   }
02170   else {
02171     return false;
02172   }
02173 }

DataBlockLockPool::DataBlockLock* OpenDDS::DCPS::DataWriterImpl::get_db_lock (  )  [inline]

Definition at line 472 of file DataWriterImpl.h.

Referenced by association_complete_i(), create_control_message(), create_sample_data_message(), end_coherent_changes(), and OpenDDS::DCPS::DataDurabilityCache::get_data().

00472                                                 {
00473     return db_lock_pool_->get_lock();
00474   }

RepoId OpenDDS::DCPS::DataWriterImpl::get_dp_id (  ) 

Accessor of the repository id of the domain participant.

Definition at line 1925 of file DataWriterImpl.cpp.

References OpenDDS::DCPS::DomainParticipantImpl::get_id(), and participant_servant_.

Referenced by OpenDDS::DCPS::DWMonitorImpl::report().

01926 {
01927   return participant_servant_->get_id();
01928 }

PublicationInstance * OpenDDS::DCPS::DataWriterImpl::get_handle_instance ( DDS::InstanceHandle_t  handle  )  [protected]

Attempt to locate an existing instance for the given handle.

Definition at line 2413 of file DataWriterImpl.cpp.

References data_container_, and OpenDDS::DCPS::WriteDataContainer::get_handle_instance().

02414 {
02415   PublicationInstance* instance = 0;
02416 
02417   if (0 != data_container_) {
02418     instance = data_container_->get_handle_instance(handle);
02419   }
02420 
02421   return instance;
02422 }

DDS::InstanceHandle_t OpenDDS::DCPS::DataWriterImpl::get_instance_handle (  )  [virtual]

Implements OpenDDS::DCPS::EntityImpl.

Definition at line 203 of file DataWriterImpl.cpp.

References OpenDDS::DCPS::DomainParticipantImpl::id_to_handle(), participant_servant_, and publication_id_.

00204 {
00205   return this->participant_servant_->id_to_handle(publication_id_);
00206 }

void OpenDDS::DCPS::DataWriterImpl::get_instance_handles ( InstanceHandleVec &  instance_handles  ) 

Definition at line 2612 of file DataWriterImpl.cpp.

References data_container_, and OpenDDS::DCPS::WriteDataContainer::get_instance_handles().

Referenced by OpenDDS::DCPS::DWMonitorImpl::report().

02613 {
02614   this->data_container_->get_instance_handles(instance_handles);
02615 }

DDS::DataWriterListener_ptr OpenDDS::DCPS::DataWriterImpl::get_listener (  )  [virtual]

Implements DDS::DataWriter.

Definition at line 982 of file DataWriterImpl.cpp.

References listener_.

00983 {
00984   return DDS::DataWriterListener::_duplicate(listener_.in());
00985 }

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::get_liveliness_lost_status ( DDS::LivelinessLostStatus status  )  [virtual]

Definition at line 1041 of file DataWriterImpl.cpp.

References DDS::LIVELINESS_LOST_STATUS, liveliness_lost_status_, DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), and DDS::LivelinessLostStatus::total_count_change.

01043 {
01044   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01045                    guard,
01046                    this->lock_,
01047                    DDS::RETCODE_ERROR);
01048   set_status_changed_flag(DDS::LIVELINESS_LOST_STATUS, false);
01049   status = liveliness_lost_status_;
01050   liveliness_lost_status_.total_count_change = 0;
01051   return DDS::RETCODE_OK;
01052 }

ACE_INLINE ACE_Recursive_Thread_Mutex& OpenDDS::DCPS::DataWriterImpl::get_lock (  )  [inline]

Accessor of the WriterDataContainer's lock.

Definition at line 366 of file DataWriterImpl.h.

Referenced by begin_coherent_changes(), coherent_changes_pending(), dispose(), dispose_and_unregister(), end_coherent_changes(), register_instance_from_durable_data(), unregister_instance_i(), and write().

00366                                          {
00367     return data_container_->lock_;
00368   }

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::get_matched_subscription_data ( DDS::SubscriptionBuiltinTopicData subscription_data,
DDS::InstanceHandle_t  subscription_handle 
) [virtual]

Definition at line 1195 of file DataWriterImpl.cpp.

References OpenDDS::DCPS::BUILT_IN_SUBSCRIPTION_TOPIC, OpenDDS::DCPS::EntityImpl::enabled_, participant_servant_, DDS::RETCODE_NOT_ENABLED, and DDS::RETCODE_OK.

01198 {
01199   if (enabled_ == false) {
01200     ACE_ERROR_RETURN((LM_ERROR,
01201                       ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::")
01202                       ACE_TEXT("get_matched_subscription_data: ")
01203                       ACE_TEXT("Entity is not enabled. \n")),
01204                      DDS::RETCODE_NOT_ENABLED);
01205   }
01206 
01207   BIT_Helper_1 < DDS::SubscriptionBuiltinTopicDataDataReader,
01208                DDS::SubscriptionBuiltinTopicDataDataReader_var,
01209                DDS::SubscriptionBuiltinTopicDataSeq > hh;
01210 
01211   DDS::SubscriptionBuiltinTopicDataSeq data;
01212 
01213   DDS::ReturnCode_t ret =
01214     hh.instance_handle_to_bit_data(participant_servant_,
01215                                    BUILT_IN_SUBSCRIPTION_TOPIC,
01216                                    subscription_handle,
01217                                    data);
01218 
01219   if (ret == DDS::RETCODE_OK) {
01220     subscription_data = data[0];
01221   }
01222 
01223   return ret;
01224 }

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::get_matched_subscriptions ( DDS::InstanceHandleSeq subscription_handles  )  [virtual]

Definition at line 1162 of file DataWriterImpl.cpp.

References OpenDDS::DCPS::EntityImpl::enabled_, id_to_handle_map_, DDS::RETCODE_ERROR, DDS::RETCODE_NOT_ENABLED, and DDS::RETCODE_OK.

01164 {
01165   if (enabled_ == false) {
01166     ACE_ERROR_RETURN((LM_ERROR,
01167                       ACE_TEXT("(%P|%t) ERROR: ")
01168                       ACE_TEXT("DataWriterImpl::get_matched_subscriptions: ")
01169                       ACE_TEXT(" Entity is not enabled. \n")),
01170                      DDS::RETCODE_NOT_ENABLED);
01171   }
01172 
01173   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01174                    guard,
01175                    this->lock_,
01176                    DDS::RETCODE_ERROR);
01177 
01178   // Copy out the handles for the current set of subscriptions.
01179   int index = 0;
01180   subscription_handles.length(
01181     static_cast<CORBA::ULong>(this->id_to_handle_map_.size()));
01182 
01183   for (RepoIdToHandleMap::iterator
01184        current = this->id_to_handle_map_.begin();
01185        current != this->id_to_handle_map_.end();
01186        ++current, ++index) {
01187     subscription_handles[index] = current->second;
01188   }
01189 
01190   return DDS::RETCODE_OK;
01191 }

DDS::InstanceHandle_t OpenDDS::DCPS::DataWriterImpl::get_next_handle (  ) 

Get an instance handle for a new instance.

Definition at line 209 of file DataWriterImpl.cpp.

References OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::DomainParticipantImpl::id_to_handle(), and participant_servant_.

Referenced by OpenDDS::DCPS::WriteDataContainer::register_instance().

00210 {
00211   return this->participant_servant_->id_to_handle(GUID_UNKNOWN);
00212 }

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::get_offered_deadline_missed_status ( DDS::OfferedDeadlineMissedStatus status  )  [virtual]

Definition at line 1055 of file DataWriterImpl.cpp.

References last_deadline_missed_total_count_, DDS::OFFERED_DEADLINE_MISSED_STATUS, offered_deadline_missed_status_, DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), DDS::OfferedDeadlineMissedStatus::total_count, and DDS::OfferedDeadlineMissedStatus::total_count_change.

01057 {
01058   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01059                    guard,
01060                    this->lock_,
01061                    DDS::RETCODE_ERROR);
01062 
01063   set_status_changed_flag(DDS::OFFERED_DEADLINE_MISSED_STATUS, false);
01064 
01065   this->offered_deadline_missed_status_.total_count_change =
01066     this->offered_deadline_missed_status_.total_count
01067     - this->last_deadline_missed_total_count_;
01068 
01069   // Update for next status check.
01070   this->last_deadline_missed_total_count_ =
01071     this->offered_deadline_missed_status_.total_count;
01072 
01073   status = offered_deadline_missed_status_;
01074 
01075   this->offered_deadline_missed_status_.total_count_change = 0;
01076 
01077   return DDS::RETCODE_OK;
01078 }

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::get_offered_incompatible_qos_status ( DDS::OfferedIncompatibleQosStatus status  )  [virtual]

Definition at line 1081 of file DataWriterImpl.cpp.

References DDS::OFFERED_INCOMPATIBLE_QOS_STATUS, offered_incompatible_qos_status_, DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), and DDS::OfferedIncompatibleQosStatus::total_count_change.

01083 {
01084   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01085                    guard,
01086                    this->lock_,
01087                    DDS::RETCODE_ERROR);
01088   set_status_changed_flag(DDS::OFFERED_INCOMPATIBLE_QOS_STATUS, false);
01089   status = offered_incompatible_qos_status_;
01090   offered_incompatible_qos_status_.total_count_change = 0;
01091   return DDS::RETCODE_OK;
01092 }

CORBA::Long OpenDDS::DCPS::DataWriterImpl::get_priority_value ( const AssociationData  )  const [inline, private, virtual]

Implements OpenDDS::DCPS::TransportClient.

Definition at line 578 of file DataWriterImpl.h.

00578                                                              {
00579     return this->qos_.transport_priority.value;
00580   }

RepoId OpenDDS::DCPS::DataWriterImpl::get_publication_id (  ) 

Accessor of the repository id of this datawriter/publication.

Definition at line 1919 of file DataWriterImpl.cpp.

References publication_id_.

Referenced by add_association(), OpenDDS::DCPS::PublisherImpl::delete_contained_entities(), OpenDDS::DCPS::PublisherImpl::delete_datawriter(), OpenDDS::Federator::ManagerImpl::initialize(), OpenDDS::DCPS::DWPeriodicMonitorImpl::report(), OpenDDS::DCPS::DWMonitorImpl::report(), and OpenDDS::DCPS::PublisherImpl::writer_enabled().

01920 {
01921   return publication_id_;
01922 }

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::get_publication_matched_status ( DDS::PublicationMatchedStatus status  )  [virtual]

Definition at line 1095 of file DataWriterImpl.cpp.

References DDS::PublicationMatchedStatus::current_count_change, publication_match_status_, DDS::PUBLICATION_MATCHED_STATUS, DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), and DDS::PublicationMatchedStatus::total_count_change.

01097 {
01098   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01099                    guard,
01100                    this->lock_,
01101                    DDS::RETCODE_ERROR);
01102   set_status_changed_flag(DDS::PUBLICATION_MATCHED_STATUS, false);
01103   status = publication_match_status_;
01104   publication_match_status_.total_count_change = 0;
01105   publication_match_status_.current_count_change = 0;
01106   return DDS::RETCODE_OK;
01107 }

DDS::Publisher_ptr OpenDDS::DCPS::DataWriterImpl::get_publisher (  )  [virtual]

Implements DDS::DataWriter.

Definition at line 1035 of file DataWriterImpl.cpp.

References publisher_servant_.

Referenced by OpenDDS::DCPS::PublisherImpl::delete_datawriter(), OpenDDS::DCPS::StaticDiscovery::pre_writer(), and OpenDDS::DCPS::DWMonitorImpl::report().

01036 {
01037   return DDS::Publisher::_duplicate(publisher_servant_);
01038 }

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::get_qos ( DDS::DataWriterQos qos  )  [virtual]

Definition at line 965 of file DataWriterImpl.cpp.

References qos_, and DDS::RETCODE_OK.

Referenced by OpenDDS::DCPS::StaticDiscovery::pre_writer().

00966 {
00967   qos = qos_;
00968   return DDS::RETCODE_OK;
00969 }

void OpenDDS::DCPS::DataWriterImpl::get_readers ( RepoIdSet &  readers  ) 

Definition at line 2618 of file DataWriterImpl.cpp.

Referenced by OpenDDS::DCPS::DWMonitorImpl::report().

02619 {
02620   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
02621   readers = this->readers_;
02622 }

const RepoId& OpenDDS::DCPS::DataWriterImpl::get_repo_id (  )  const [inline, private, virtual]

Implements OpenDDS::DCPS::TransportClient.

Definition at line 571 of file DataWriterImpl.h.

00571                                     {
00572     return this->publication_id_;
00573   }

SendStateDataSampleList OpenDDS::DCPS::DataWriterImpl::get_resend_data (  )  [inline]

Definition at line 280 of file DataWriterImpl.h.

Referenced by association_complete_i().

00280                                             {
00281     return data_container_->get_resend_data();
00282   }

DDS::Topic_ptr OpenDDS::DCPS::DataWriterImpl::get_topic (  )  [virtual]

Implements DDS::DataWriter.

Definition at line 988 of file DataWriterImpl.cpp.

References topic_objref_.

Referenced by OpenDDS::DCPS::DWMonitorImpl::report().

00989 {
00990   return DDS::Topic::_duplicate(topic_objref_.in());
00991 }

const char * OpenDDS::DCPS::DataWriterImpl::get_topic_name (  ) 

Accessor of the associated topic name.

Definition at line 1931 of file DataWriterImpl.cpp.

References topic_name_.

Referenced by OpenDDS::DCPS::PublisherImpl::delete_datawriter(), and enable().

01932 {
01933   return topic_name_.in();
01934 }

char const * OpenDDS::DCPS::DataWriterImpl::get_type_name (  )  const

Get associated topic type name.

Definition at line 1937 of file DataWriterImpl.cpp.

References type_name_.

Referenced by enable().

01938 {
01939   return type_name_.in();
01940 }

ACE_UINT64 OpenDDS::DCPS::DataWriterImpl::get_unsent_data ( SendStateDataSampleList list  )  [inline]

Retrieve the unsent data from the WriteDataContainer.

Definition at line 276 of file DataWriterImpl.h.

Referenced by send_all_to_flush_control(), and write().

00276                                                              {
00277     return data_container_->get_unsent_data(list);
00278   }

int OpenDDS::DCPS::DataWriterImpl::handle_close ( ACE_HANDLE  ,
ACE_Reactor_Mask   
) [virtual]

Definition at line 2372 of file DataWriterImpl.cpp.

02374 {
02375   this->_remove_ref();
02376   return 0;
02377 }

int OpenDDS::DCPS::DataWriterImpl::handle_timeout ( const ACE_Time_Value &  tv,
const void *  arg 
) [virtual]

Handle the assert liveliness timeout.

Definition at line 2299 of file DataWriterImpl.cpp.

References DDS::AUTOMATIC_LIVELINESS_QOS, OpenDDS::DCPS::duration_to_time_value(), last_liveliness_activity_time_, last_liveliness_check_time_, listener_for(), DDS::DataWriterQos::liveliness, liveliness_asserted_, liveliness_check_interval_, liveliness_lost_, DDS::LIVELINESS_LOST_STATUS, liveliness_lost_status_, DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS, DDS::MANUAL_BY_TOPIC_LIVELINESS_QOS, qos_, reactor_, DDS::LivelinessLostStatus::total_count, and DDS::LivelinessLostStatus::total_count_change.

02301 {
02302   const ACE_Time_Value delta = tv - last_liveliness_check_time_;
02303   if (delta < liveliness_check_interval_) {
02304     // Too early.  Reschedule.
02305     if (reactor_->cancel_timer(this) == -1) {
02306       ACE_ERROR((LM_ERROR,
02307                  ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::handle_timeout: %p.\n"),
02308                  ACE_TEXT("cancel_timer")));
02309     }
02310     if (reactor_->schedule_timer(this, 0, liveliness_check_interval_ - delta, liveliness_check_interval_) == -1) {
02311       ACE_ERROR((LM_ERROR,
02312                  ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::handle_timeout: %p.\n"),
02313                  ACE_TEXT("schedule_timer")));
02314     }
02315     return 0;
02316   }
02317 
02318   bool liveliness_lost = false;
02319 
02320   ACE_Time_Value elapsed = tv - last_liveliness_activity_time_;
02321 
02322   // Do we need to send a liveliness message?
02323   if (elapsed >= liveliness_check_interval_) {
02324     switch (this->qos_.liveliness.kind) {
02325     case DDS::AUTOMATIC_LIVELINESS_QOS:
02326       if (this->send_liveliness(tv) == false) {
02327         liveliness_lost = true;
02328       }
02329       break;
02330 
02331     case DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS:
02332       if (liveliness_asserted_) {
02333         if (this->send_liveliness(tv) == false) {
02334           liveliness_lost = true;
02335         }
02336       }
02337       break;
02338 
02339     case DDS::MANUAL_BY_TOPIC_LIVELINESS_QOS:
02340       // Do nothing.
02341       break;
02342     }
02343   }
02344 
02345   liveliness_asserted_ = false;
02346   last_liveliness_check_time_ = tv;
02347   elapsed = tv - last_liveliness_activity_time_;
02348 
02349   // Have we lost liveliness?
02350   if (elapsed >= duration_to_time_value(qos_.liveliness.lease_duration)) {
02351     liveliness_lost = true;
02352   }
02353 
02354   if (!this->liveliness_lost_ && liveliness_lost) {
02355     ++ this->liveliness_lost_status_.total_count;
02356     ++ this->liveliness_lost_status_.total_count_change;
02357 
02358     DDS::DataWriterListener_var listener =
02359       listener_for(DDS::LIVELINESS_LOST_STATUS);
02360 
02361     if (!CORBA::is_nil(listener.in())) {
02362       listener->on_liveliness_lost(this->dw_local_objref_.in(),
02363                                    this->liveliness_lost_status_);
02364     }
02365   }
02366 
02367   this->liveliness_lost_ = liveliness_lost;
02368   return 0;
02369 }

void OpenDDS::DCPS::DataWriterImpl::inconsistent_topic (  )  [virtual]

Implements OpenDDS::DCPS::DataWriterCallbacks.

Definition at line 886 of file DataWriterImpl.cpp.

References OpenDDS::DCPS::TopicImpl::inconsistent_topic(), and topic_servant_.

00887 {
00888   topic_servant_->inconsistent_topic();
00889 }

void OpenDDS::DCPS::DataWriterImpl::init ( DDS::Topic_ptr  topic,
TopicImpl topic_servant,
const DDS::DataWriterQos qos,
DDS::DataWriterListener_ptr  a_listener,
const DDS::StatusMask mask,
OpenDDS::DCPS::DomainParticipantImpl participant_servant,
OpenDDS::DCPS::PublisherImpl publisher_servant,
DDS::DataWriter_ptr  dw_local 
) [virtual]

Initialize the data members.

Definition at line 155 of file DataWriterImpl.cpp.

References OpenDDS::DCPS::TopicImpl::add_entity_ref(), OpenDDS::DCPS::BUILT_IN_PARTICIPANT_TOPIC, OpenDDS::DCPS::BUILT_IN_PUBLICATION_TOPIC, OpenDDS::DCPS::BUILT_IN_SUBSCRIPTION_TOPIC, OpenDDS::DCPS::BUILT_IN_TOPIC_TOPIC, DBG_ENTRY_LVL, domain_id_, dw_local_objref_, OpenDDS::DCPS::DomainParticipantImpl::get_domain_id(), OpenDDS::DCPS::TopicImpl::get_id(), OpenDDS::DCPS::TopicDescriptionImpl::get_name(), OpenDDS::DCPS::TopicDescriptionImpl::get_type_name(), initialized_, is_bit_, listener_, listener_mask_, participant_servant_, publisher_servant_, qos_, reactor_, TheServiceParticipant, topic_id_, topic_name_, topic_objref_, topic_servant_, and type_name_.

Referenced by OpenDDS::DCPS::PublisherImpl::create_datawriter(), and OpenDDS::DCPS::DataWriterImpl_T< MessageType >::init().

00164 {
00165   DBG_ENTRY_LVL("DataWriterImpl","init",6);
00166   topic_objref_ = DDS::Topic::_duplicate(topic);
00167   topic_servant_ = topic_servant;
00168   topic_servant_->_add_ref();
00169   topic_servant_->add_entity_ref();
00170   topic_name_    = topic_servant_->get_name();
00171   topic_id_      = topic_servant_->get_id();
00172   type_name_     = topic_servant_->get_type_name();
00173 
00174 #if !defined (DDS_HAS_MINIMUM_BIT)
00175   is_bit_ = ACE_OS::strcmp(topic_name_.in(), BUILT_IN_PARTICIPANT_TOPIC) == 0
00176             || ACE_OS::strcmp(topic_name_.in(), BUILT_IN_TOPIC_TOPIC) == 0
00177             || ACE_OS::strcmp(topic_name_.in(), BUILT_IN_SUBSCRIPTION_TOPIC) == 0
00178             || ACE_OS::strcmp(topic_name_.in(), BUILT_IN_PUBLICATION_TOPIC) == 0;
00179 #endif // !defined (DDS_HAS_MINIMUM_BIT)
00180 
00181   qos_ = qos;
00182 
00183   //Note: OK to _duplicate(nil).
00184   listener_ = DDS::DataWriterListener::_duplicate(a_listener);
00185   listener_mask_ = mask;
00186 
00187   // Only store the participant pointer, since it is our "grand"
00188   // parent, we will exist as long as it does.
00189   participant_servant_ = participant_servant;
00190   domain_id_ = participant_servant_->get_domain_id();
00191 
00192   // Only store the publisher pointer, since it is our parent, we will
00193   // exist as long as it does.
00194   publisher_servant_ = publisher_servant;
00195   dw_local_objref_   = DDS::DataWriter::_duplicate(dw_local);
00196 
00197   this->reactor_ = TheServiceParticipant->timer();
00198 
00199   initialized_ = true;
00200 }

DDS::DataWriterListener_ptr OpenDDS::DCPS::DataWriterImpl::listener_for ( DDS::StatusKind  kind  ) 

This is used to retrieve the listener for a certain status change.

If this datawriter has a registered listener and the status kind is in the listener mask then the listener is returned. Otherwise, the query for the listener is propagated up to the factory/publisher.

Definition at line 2285 of file DataWriterImpl.cpp.

References listener_, OpenDDS::DCPS::PublisherImpl::listener_for(), listener_mask_, and publisher_servant_.

Referenced by association_complete_i(), OpenDDS::DCPS::OfferedDeadlineWatchdog::execute(), handle_timeout(), and update_incompatible_qos().

02286 {
02287   // per 2.1.4.3.1 Listener Access to Plain Communication Status
02288   // use this entities factory if listener is mask not enabled
02289   // for this kind.
02290   if (CORBA::is_nil(listener_.in()) || (listener_mask_ & kind) == 0) {
02291     return publisher_servant_->listener_for(kind);
02292 
02293   } else {
02294     return DDS::DataWriterListener::_duplicate(listener_.in());
02295   }
02296 }

ACE_Time_Value OpenDDS::DCPS::DataWriterImpl::liveliness_check_interval ( DDS::LivelinessQosPolicyKind  kind  ) 

Definition at line 1142 of file DataWriterImpl.cpp.

References liveliness_check_interval_.

Referenced by OpenDDS::DCPS::DomainParticipantImpl::LivelinessTimer::add_adjust().

01143 {
01144   if (this->qos_.liveliness.kind == kind) {
01145     return liveliness_check_interval_;
01146   } else {
01147     return ACE_Time_Value::max_time;
01148   }
01149 }

bool OpenDDS::DCPS::DataWriterImpl::lookup_instance_handles ( const ReaderIdSeq ids,
DDS::InstanceHandleSeq hdls 
) [private]

Lookup the instance handles by the subscription repo ids.

Definition at line 2547 of file DataWriterImpl.cpp.

References OpenDDS::DCPS::DCPS_debug_level, and OPENDDS_STRING.

Referenced by notify_publication_disconnected(), and notify_publication_lost().

02549 {
02550   if (DCPS_debug_level > 9) {
02551     CORBA::ULong const size = ids.length();
02552     OPENDDS_STRING separator;
02553     OPENDDS_STRING buffer;
02554 
02555     for (unsigned long i = 0; i < size; ++i) {
02556       buffer += separator + OPENDDS_STRING(GuidConverter(ids[i]));
02557       separator = ", ";
02558     }
02559 
02560     ACE_DEBUG((LM_DEBUG,
02561                ACE_TEXT("(%P|%t) DataWriterImpl::lookup_instance_handles: ")
02562                ACE_TEXT("searching for handles for reader Ids: %C.\n"),
02563                buffer.c_str()));
02564   }
02565 
02566   CORBA::ULong const num_rds = ids.length();
02567   hdls.length(num_rds);
02568 
02569   for (CORBA::ULong i = 0; i < num_rds; ++i) {
02570     hdls[i] = this->participant_servant_->id_to_handle(ids[i]);
02571   }
02572 
02573   return true;
02574 }

bool OpenDDS::DCPS::DataWriterImpl::need_sequence_repair (  )  [private]

Definition at line 2633 of file DataWriterImpl.cpp.

References need_sequence_repair_i().

Referenced by create_control_message(), and create_sample_data_message().

02634 {
02635   ACE_GUARD_RETURN(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_, false);
02636   return need_sequence_repair_i();
02637 }

bool OpenDDS::DCPS::DataWriterImpl::need_sequence_repair_i (  )  const [private]

Definition at line 2640 of file DataWriterImpl.cpp.

References reader_info_, and sequence_number_.

Referenced by need_sequence_repair().

02641 {
02642   for (RepoIdToReaderInfoMap::const_iterator it = reader_info_.begin(),
02643        end = reader_info_.end(); it != end; ++it) {
02644     if (it->second.expected_sequence_ != sequence_number_) {
02645       return true;
02646     }
02647   }
02648 
02649   return false;
02650 }

void OpenDDS::DCPS::DataWriterImpl::notify_connection_deleted ( const RepoId peerId  )  [virtual]

Implements OpenDDS::DCPS::TransportSendListener.

Definition at line 2533 of file DataWriterImpl.cpp.

References DBG_ENTRY_LVL, and OpenDDS::DCPS::TransportClient::on_notification_of_connection_deletion().

02534 {
02535   DBG_ENTRY_LVL("DataWriterImpl","notify_connection_deleted",6);
02536   on_notification_of_connection_deletion(peerId);
02537   // Narrow to DDS::DCPS::DataWriterListener. If a DDS::DataWriterListener
02538   // is given to this DataWriter then narrow() fails.
02539   DataWriterListener_var the_listener =
02540     DataWriterListener::_narrow(this->listener_.in());
02541 
02542   if (!CORBA::is_nil(the_listener.in()))
02543     the_listener->on_connection_deleted(this->dw_local_objref_.in());
02544 }

void OpenDDS::DCPS::DataWriterImpl::notify_publication_disconnected ( const ReaderIdSeq subids  )  [virtual]

Implements OpenDDS::DCPS::TransportSendListener.

Definition at line 2425 of file DataWriterImpl.cpp.

References DBG_ENTRY_LVL, is_bit_, lookup_instance_handles(), and OpenDDS::DCPS::PublicationLostStatus::subscription_handles.

02426 {
02427   DBG_ENTRY_LVL("DataWriterImpl","notify_publication_disconnected",6);
02428 
02429   if (!is_bit_) {
02430     // Narrow to DDS::DCPS::DataWriterListener. If a DDS::DataWriterListener
02431     // is given to this DataWriter then narrow() fails.
02432     DataWriterListener_var the_listener =
02433       DataWriterListener::_narrow(this->listener_.in());
02434 
02435     if (!CORBA::is_nil(the_listener.in())) {
02436       PublicationDisconnectedStatus status;
02437       // Since this callback may come after remove_association which
02438       // removes the reader from id_to_handle map, we can ignore this
02439       // error.
02440       this->lookup_instance_handles(subids,
02441                                     status.subscription_handles);
02442       the_listener->on_publication_disconnected(this->dw_local_objref_.in(),
02443                                                 status);
02444     }
02445   }
02446 }

void OpenDDS::DCPS::DataWriterImpl::notify_publication_lost ( const DDS::InstanceHandleSeq handles  )  [private]

Definition at line 2505 of file DataWriterImpl.cpp.

References DBG_ENTRY_LVL, is_bit_, and OpenDDS::DCPS::PublicationLostStatus::subscription_handles.

02506 {
02507   DBG_ENTRY_LVL("DataWriterImpl","notify_publication_lost",6);
02508 
02509   if (!is_bit_) {
02510     // Narrow to DDS::DCPS::DataWriterListener. If a
02511     // DDS::DataWriterListener is given to this DataWriter then
02512     // narrow() fails.
02513     DataWriterListener_var the_listener =
02514       DataWriterListener::_narrow(this->listener_.in());
02515 
02516     if (!CORBA::is_nil(the_listener.in())) {
02517       PublicationLostStatus status;
02518 
02519       CORBA::ULong len = handles.length();
02520       status.subscription_handles.length(len);
02521 
02522       for (CORBA::ULong i = 0; i < len; ++ i) {
02523         status.subscription_handles[i] = handles[i];
02524       }
02525 
02526       the_listener->on_publication_lost(this->dw_local_objref_.in(),
02527                                         status);
02528     }
02529   }
02530 }

void OpenDDS::DCPS::DataWriterImpl::notify_publication_lost ( const ReaderIdSeq subids  )  [virtual]

Implements OpenDDS::DCPS::TransportSendListener.

Definition at line 2480 of file DataWriterImpl.cpp.

References DBG_ENTRY_LVL, is_bit_, lookup_instance_handles(), and OpenDDS::DCPS::PublicationLostStatus::subscription_handles.

02481 {
02482   DBG_ENTRY_LVL("DataWriterImpl","notify_publication_lost",6);
02483 
02484   if (!is_bit_) {
02485     // Narrow to DDS::DCPS::DataWriterListener. If a
02486     // DDS::DataWriterListener is given to this DataWriter then
02487     // narrow() fails.
02488     DataWriterListener_var the_listener =
02489       DataWriterListener::_narrow(this->listener_.in());
02490 
02491     if (!CORBA::is_nil(the_listener.in())) {
02492       PublicationLostStatus status;
02493 
02494       // Since this callback may come after remove_association which removes
02495       // the reader from id_to_handle map, we can ignore this error.
02496       this->lookup_instance_handles(subids,
02497                                     status.subscription_handles);
02498       the_listener->on_publication_lost(this->dw_local_objref_.in(),
02499                                         status);
02500     }
02501   }
02502 }

void OpenDDS::DCPS::DataWriterImpl::notify_publication_reconnected ( const ReaderIdSeq subids  )  [virtual]

Implements OpenDDS::DCPS::TransportSendListener.

Definition at line 2449 of file DataWriterImpl.cpp.

References DBG_ENTRY_LVL, is_bit_, and OpenDDS::DCPS::PublicationLostStatus::subscription_handles.

02450 {
02451   DBG_ENTRY_LVL("DataWriterImpl","notify_publication_reconnected",6);
02452 
02453   if (!is_bit_) {
02454     // Narrow to DDS::DCPS::DataWriterListener. If a
02455     // DDS::DataWriterListener is given to this DataWriter then
02456     // narrow() fails.
02457     DataWriterListener_var the_listener =
02458       DataWriterListener::_narrow(this->listener_.in());
02459 
02460     if (!CORBA::is_nil(the_listener.in())) {
02461       PublicationDisconnectedStatus status;
02462 
02463       // If it's reconnected then the reader should be in id_to_handle
02464       // map otherwise log with an error.
02465       if (this->lookup_instance_handles(subids,
02466                                         status.subscription_handles) == false) {
02467         ACE_ERROR((LM_ERROR,
02468                    "(%P|%t) ERROR: DataWriterImpl::"
02469                    "notify_publication_reconnected: "
02470                    "lookup_instance_handles failed\n"));
02471       }
02472 
02473       the_listener->on_publication_reconnected(this->dw_local_objref_.in(),
02474                                                status);
02475     }
02476   }
02477 }

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::num_samples ( DDS::InstanceHandle_t  handle,
size_t &  size 
)

Return the number of samples for a given instance.

Definition at line 1900 of file DataWriterImpl.cpp.

References data_container_, and OpenDDS::DCPS::WriteDataContainer::num_samples().

01902 {
01903   return data_container_->num_samples(handle, size);
01904 }

typedef OpenDDS::DCPS::DataWriterImpl::OPENDDS_MAP_CMP ( RepoId  ,
DDS::InstanceHandle_t  ,
GUID_tKeyLessThan   
) [private]

typedef OpenDDS::DCPS::DataWriterImpl::OPENDDS_MAP_CMP ( RepoId  ,
ReaderInfo  ,
GUID_tKeyLessThan   
) [protected]

typedef OpenDDS::DCPS::DataWriterImpl::OPENDDS_MAP_CMP ( RepoId  ,
SequenceNumber  ,
GUID_tKeyLessThan   
)

typedef OpenDDS::DCPS::DataWriterImpl::OPENDDS_VECTOR ( DDS::InstanceHandle_t   ) 

EntityImpl * OpenDDS::DCPS::DataWriterImpl::parent (  )  const [virtual]

Reimplemented from OpenDDS::DCPS::EntityImpl.

Definition at line 2143 of file DataWriterImpl.cpp.

References publisher_servant_.

02144 {
02145   return this->publisher_servant_;
02146 }

bool OpenDDS::DCPS::DataWriterImpl::participant_liveliness_activity_after ( const ACE_Time_Value &  tv  ) 

Definition at line 1152 of file DataWriterImpl.cpp.

References last_liveliness_activity_time_, and DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS.

01153 {
01154   if (this->qos_.liveliness.kind == DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS) {
01155     return last_liveliness_activity_time_ > tv;
01156   } else {
01157     return false;
01158   }
01159 }

bool OpenDDS::DCPS::DataWriterImpl::pending_control (  )  [protected]

Answer if transport of all control messages is pending.

Definition at line 2593 of file DataWriterImpl.cpp.

References controlTracker, and OpenDDS::DCPS::MessageTracker::pending_messages().

02594 {
02595   return controlTracker.pending_messages();
02596 }

bool OpenDDS::DCPS::DataWriterImpl::persist_data (  ) 

Make sent data available beyond the lifetime of this DataWriter.

Definition at line 2578 of file DataWriterImpl.cpp.

References data_container_, and OpenDDS::DCPS::WriteDataContainer::persist_data().

Referenced by OpenDDS::DCPS::PublisherImpl::delete_datawriter().

02579 {
02580   return this->data_container_->persist_data();
02581 }

void OpenDDS::DCPS::DataWriterImpl::prepare_to_delete (  )  [protected]

Definition at line 2406 of file DataWriterImpl.cpp.

References OpenDDS::DCPS::EntityImpl::set_deleted(), and OpenDDS::DCPS::TransportClient::stop_associating().

Referenced by OpenDDS::DCPS::PublisherImpl::delete_datawriter().

02407 {
02408   this->set_deleted(true);
02409   this->stop_associating();
02410 }

void OpenDDS::DCPS::DataWriterImpl::register_for_reader ( const RepoId participant,
const RepoId writerid,
const RepoId readerid,
const TransportLocatorSeq locators,
DiscoveryListener listener 
) [virtual]

Reimplemented from OpenDDS::DCPS::DataWriterCallbacks.

Definition at line 803 of file DataWriterImpl.cpp.

References OpenDDS::DCPS::TransportClient::register_for_reader().

00808 {
00809   TransportClient::register_for_reader(participant, writerid, readerid, locators, listener);
00810 }

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::register_instance_from_durable_data ( DDS::InstanceHandle_t handle,
DataSample data,
const DDS::Time_t source_timestamp 
)

Delegate to the WriteDataContainer to register and tell the transport to broadcast the registered instance.

Definition at line 1529 of file DataWriterImpl.cpp.

References DBG_ENTRY_LVL, get_lock(), register_instance_i(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, and send_all_to_flush_control().

Referenced by OpenDDS::DCPS::DataDurabilityCache::get_data().

01532 {
01533   DBG_ENTRY_LVL("DataWriterImpl","register_instance_from_durable_data",6);
01534 
01535   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01536                    guard,
01537                    get_lock(),
01538                    ::DDS::RETCODE_ERROR);
01539 
01540   DDS::ReturnCode_t ret = register_instance_i(handle, data, source_timestamp);
01541   if (ret != DDS::RETCODE_OK) {
01542     ACE_ERROR_RETURN((LM_ERROR,
01543                       ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::register_instance_from_durable_data: ")
01544                       ACE_TEXT("register instance with container failed.\n")),
01545                       ret);
01546   }
01547 
01548   send_all_to_flush_control(guard);
01549 
01550   return ret;
01551 }

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::register_instance_i ( DDS::InstanceHandle_t handle,
DataSample data,
const DDS::Time_t source_timestamp 
)

Delegate to the WriteDataContainer to register Must tell the transport to broadcast the registered instance upon returning.

Definition at line 1469 of file DataWriterImpl.cpp.

References create_control_message(), data_container_, DBG_ENTRY_LVL, OpenDDS::DCPS::EntityImpl::enabled_, OpenDDS::DCPS::WriteDataContainer::enqueue_control(), OpenDDS::DCPS::DataSampleElement::get_header(), OpenDDS::DCPS::INSTANCE_REGISTRATION, monitor_, OpenDDS::DCPS::WriteDataContainer::obtain_buffer_for_control(), OpenDDS::DCPS::WriteDataContainer::register_instance(), OpenDDS::DCPS::Monitor::report(), DDS::RETCODE_NOT_ENABLED, DDS::RETCODE_OK, and OpenDDS::DCPS::DataSampleElement::set_sample().

Referenced by register_instance_from_durable_data().

01472 {
01473   DBG_ENTRY_LVL("DataWriterImpl","register_instance_i",6);
01474 
01475   if (enabled_ == false) {
01476     ACE_ERROR_RETURN((LM_ERROR,
01477                       ACE_TEXT("(%P|%t) ERROR: ")
01478                       ACE_TEXT("DataWriterImpl::register_instance_i: ")
01479                       ACE_TEXT(" Entity is not enabled. \n")),
01480                      DDS::RETCODE_NOT_ENABLED);
01481   }
01482 
01483   DDS::ReturnCode_t ret =
01484     this->data_container_->register_instance(handle, data);
01485 
01486   if (ret != DDS::RETCODE_OK) {
01487     ACE_ERROR_RETURN((LM_ERROR,
01488                       ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::register_instance_i: ")
01489                       ACE_TEXT("register instance with container failed.\n")),
01490                      ret);
01491   }
01492 
01493   if (this->monitor_) {
01494     this->monitor_->report();
01495   }
01496 
01497   DataSampleElement* element = 0;
01498   ret = this->data_container_->obtain_buffer_for_control(element);
01499 
01500   if (ret != DDS::RETCODE_OK) {
01501     ACE_ERROR_RETURN((LM_ERROR,
01502                       ACE_TEXT("(%P|%t) ERROR: ")
01503                       ACE_TEXT("DataWriterImpl::register_instance_i: ")
01504                       ACE_TEXT("obtain_buffer_for_control returned %d.\n"),
01505                       ret),
01506                      ret);
01507   }
01508 
01509   // Add header with the registration sample data.
01510   element->set_sample(create_control_message(INSTANCE_REGISTRATION,
01511                                              element->get_header(),
01512                                              data,
01513                                              source_timestamp));
01514 
01515   ret = this->data_container_->enqueue_control(element);
01516 
01517   if (ret != DDS::RETCODE_OK) {
01518     ACE_ERROR_RETURN((LM_ERROR,
01519                       ACE_TEXT("(%P|%t) ERROR: ")
01520                       ACE_TEXT("DataWriterImpl::register_instance_i: ")
01521                       ACE_TEXT("enqueue_control failed.\n")),
01522                      ret);
01523   }
01524 
01525   return ret;
01526 }

void OpenDDS::DCPS::DataWriterImpl::remove_all_associations (  ) 

Definition at line 751 of file DataWriterImpl.cpp.

References DBG_ENTRY_LVL, lock_, remove_associations(), and OpenDDS::DCPS::TransportClient::stop_associating().

Referenced by OpenDDS::DCPS::PublisherImpl::delete_datawriter().

00752 {
00753   DBG_ENTRY_LVL("DataWriterImpl", "remove_all_associations", 6);
00754   // stop pending associations
00755   this->stop_associating();
00756 
00757   OpenDDS::DCPS::ReaderIdSeq readers;
00758   CORBA::ULong size;
00759   CORBA::ULong num_pending_readers;
00760   {
00761     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, lock_);
00762 
00763     num_pending_readers = static_cast<CORBA::ULong>(pending_readers_.size());
00764     size = static_cast<CORBA::ULong>(readers_.size()) + num_pending_readers;
00765     readers.length(size);
00766 
00767     RepoIdSet::iterator itEnd = readers_.end();
00768     int i = 0;
00769 
00770     for (RepoIdSet::iterator it = readers_.begin(); it != itEnd; ++it) {
00771       readers[i ++] = *it;
00772     }
00773 
00774     itEnd = pending_readers_.end();
00775 
00776     for (RepoIdSet::iterator it = pending_readers_.begin(); it != itEnd; ++it) {
00777       readers[i ++] = *it;
00778     }
00779 
00780     if (num_pending_readers > 0) {
00781       ACE_DEBUG((LM_WARNING,
00782                  ACE_TEXT("(%P|%t) WARNING: DataWriterImpl::remove_all_associations() - ")
00783                  ACE_TEXT("%d subscribers were pending and never fully associated.\n"),
00784                  num_pending_readers));
00785     }
00786   }
00787 
00788   try {
00789     if (0 < size) {
00790       CORBA::Boolean dont_notify_lost = false;
00791 
00792       this->remove_associations(readers, dont_notify_lost);
00793     }
00794 
00795   } catch (const CORBA::Exception&) {
00796       ACE_DEBUG((LM_WARNING,
00797                  ACE_TEXT("(%P|%t) WARNING: DataWriterImpl::remove_all_associations() - ")
00798                  ACE_TEXT("caught exception from remove_associations.\n")));
00799   }
00800 }

virtual void OpenDDS::DCPS::DataWriterImpl::remove_associations ( const ReaderIdSeq readers,
bool  callback 
) [virtual]

Implements OpenDDS::DCPS::TransportSendListener.

Referenced by remove_all_associations().

void OpenDDS::DCPS::DataWriterImpl::reschedule_deadline (  ) 

Definition at line 2585 of file DataWriterImpl.cpp.

References data_container_, and OpenDDS::DCPS::WriteDataContainer::reschedule_deadline().

Referenced by OpenDDS::DCPS::OfferedDeadlineWatchdog::reschedule_deadline().

02586 {
02587   if (this->watchdog_ != 0) {
02588     this->data_container_->reschedule_deadline();
02589   }
02590 }

void OpenDDS::DCPS::DataWriterImpl::retrieve_inline_qos_data ( TransportSendListener::InlineQosData qos_data  )  const [virtual]

Reimplemented from OpenDDS::DCPS::TransportSendListener.

Definition at line 2625 of file DataWriterImpl.cpp.

References OpenDDS::DCPS::TransportSendListener::InlineQosData::dw_qos, OpenDDS::DCPS::PublisherImpl::get_qos(), OpenDDS::DCPS::TransportSendListener::InlineQosData::pub_qos, publisher_servant_, qos_, OpenDDS::DCPS::TransportSendListener::InlineQosData::topic_name, and topic_name_.

02626 {
02627   this->publisher_servant_->get_qos(qos_data.pub_qos);
02628   qos_data.dw_qos = this->qos_;
02629   qos_data.topic_name = this->topic_name_.in();
02630 }

void OpenDDS::DCPS::DataWriterImpl::send_all_to_flush_control ( ACE_Guard< ACE_Recursive_Thread_Mutex > &  guard  ) 

Definition at line 1452 of file DataWriterImpl.cpp.

References controlTracker, DBG_ENTRY_LVL, get_unsent_data(), OpenDDS::DCPS::MessageTracker::message_sent(), and OpenDDS::DCPS::TransportClient::send().

Referenced by dispose(), dispose_and_unregister(), register_instance_from_durable_data(), and unregister_instance_i().

01453 {
01454   DBG_ENTRY_LVL("DataWriterImpl","send_all_to_flush_control",6);
01455 
01456   SendStateDataSampleList list;
01457 
01458   ACE_UINT64 transaction_id = this->get_unsent_data(list);
01459 
01460   controlTracker.message_sent();
01461 
01462   //need to release guard to call down to transport
01463   guard.release();
01464 
01465   this->send(list, transaction_id);
01466 }

SendControlStatus OpenDDS::DCPS::DataWriterImpl::send_control ( const DataSampleHeader header,
ACE_Message_Block *  msg 
) [protected, virtual]

Reimplemented from OpenDDS::DCPS::TransportClient.

Definition at line 2653 of file DataWriterImpl.cpp.

References controlTracker, header, OpenDDS::DCPS::MessageTracker::message_dropped(), OpenDDS::DCPS::MessageTracker::message_sent(), OpenDDS::DCPS::TransportClient::send_control(), and OpenDDS::DCPS::SEND_CONTROL_OK.

02655 {
02656   controlTracker.message_sent();
02657 
02658   SendControlStatus status = TransportClient::send_control(header, msg);
02659 
02660   if (status != SEND_CONTROL_OK) {
02661     controlTracker.message_dropped();
02662   }
02663 
02664   return status;
02665 }

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::send_end_historic_samples ( const RepoId readerId  )  [private]

bool OpenDDS::DCPS::DataWriterImpl::send_liveliness ( const ACE_Time_Value &  now  )  [private]

Send the liveliness message.

Definition at line 2380 of file DataWriterImpl.cpp.

References create_control_message(), OpenDDS::DCPS::DATAWRITER_LIVELINESS, domain_id_, header, last_liveliness_activity_time_, DDS::MANUAL_BY_TOPIC_LIVELINESS_QOS, OpenDDS::DCPS::SEND_CONTROL_ERROR, TheServiceParticipant, and OpenDDS::DCPS::time_value_to_time().

02381 {
02382   if (this->qos_.liveliness.kind == DDS::MANUAL_BY_TOPIC_LIVELINESS_QOS ||
02383       !TheServiceParticipant->get_discovery(domain_id_)->supports_liveliness()) {
02384     DDS::Time_t t = time_value_to_time(now);
02385     DataSampleHeader header;
02386     ACE_Message_Block* liveliness_msg =
02387       this->create_control_message(DATAWRITER_LIVELINESS, header, 0, t);
02388 
02389     if (this->send_control(header, liveliness_msg) == SEND_CONTROL_ERROR) {
02390       ACE_ERROR_RETURN((LM_ERROR,
02391                         ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::send_liveliness: ")
02392                         ACE_TEXT(" send_control failed. \n")),
02393                        false);
02394 
02395     } else {
02396       last_liveliness_activity_time_ = now;
02397       return true;
02398     }
02399   } else {
02400     last_liveliness_activity_time_ = now;
02401     return true;
02402   }
02403 }

void OpenDDS::DCPS::DataWriterImpl::send_suspended_data (  ) 

Called by the PublisherImpl to indicate that the Publisher is now resumed and any data collected while it was suspended should now be sent.

Definition at line 1822 of file DataWriterImpl.cpp.

References available_data_list_, max_suspended_transaction_id_, min_suspended_transaction_id_, OpenDDS::DCPS::SendStateDataSampleList::reset(), and OpenDDS::DCPS::TransportClient::send().

01823 {
01824   //this serves to get TransportClient's max_transaction_id_seen_
01825   //to the correct value for this list of transactions
01826   if (max_suspended_transaction_id_ != 0) {
01827     this->send(this->available_data_list_, max_suspended_transaction_id_);
01828     max_suspended_transaction_id_ = 0;
01829   }
01830 
01831   //this serves to actually have the send proceed in
01832   //sending the samples to the datalinks by passing it
01833   //the min_suspended_transaction_id_ which should be the
01834   //TransportClient's expected_transaction_id_
01835   this->send(this->available_data_list_, min_suspended_transaction_id_);
01836   min_suspended_transaction_id_ = 0;
01837   this->available_data_list_.reset();
01838 }

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::set_listener ( DDS::DataWriterListener_ptr  a_listener,
DDS::StatusMask  mask 
) [virtual]

Definition at line 972 of file DataWriterImpl.cpp.

References listener_, listener_mask_, and DDS::RETCODE_OK.

00974 {
00975   listener_mask_ = mask;
00976   //note: OK to duplicate  a nil object ref
00977   listener_ = DDS::DataWriterListener::_duplicate(a_listener);
00978   return DDS::RETCODE_OK;
00979 }

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::set_qos ( const DDS::DataWriterQos qos  )  [virtual]

Definition at line 892 of file DataWriterImpl.cpp.

References OpenDDS::DCPS::Watchdog::cancel_all(), OpenDDS::DCPS::Qos_Helper::changeable(), OpenDDS::DCPS::Qos_Helper::consistent(), OpenDDS::DCPS::ReactorInterceptor::destroy(), domain_id_, DDS::DURATION_INFINITE_NSEC, DDS::DURATION_INFINITE_SEC, OpenDDS::DCPS::duration_to_time_value(), OpenDDS::DCPS::EntityImpl::enabled_, OpenDDS::DCPS::DomainParticipantImpl::get_id(), OpenDDS::DCPS::PublisherImpl::get_qos(), last_deadline_missed_total_count_, offered_deadline_missed_status_, OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK, OPENDDS_NO_DURABILITY_SERVICE_COMPATIBILITY_CHECK, OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK, OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK, OPENDDS_NO_OWNERSHIP_STRENGTH_COMPATIBILITY_CHECK, participant_servant_, publication_id_, publisher_servant_, qos_, OpenDDS::DCPS::Watchdog::reset_interval(), DDS::RETCODE_ERROR, DDS::RETCODE_IMMUTABLE_POLICY, DDS::RETCODE_INCONSISTENT_POLICY, DDS::RETCODE_OK, DDS::RETCODE_UNSUPPORTED, TheServiceParticipant, OpenDDS::DCPS::Qos_Helper::valid(), and watchdog_.

00893 {
00894 
00895   OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00896   OPENDDS_NO_OWNERSHIP_STRENGTH_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00897   OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00898   OPENDDS_NO_DURABILITY_SERVICE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00899   OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00900 
00901   if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
00902     if (qos_ == qos)
00903       return DDS::RETCODE_OK;
00904 
00905     if (!Qos_Helper::changeable(qos_, qos) && enabled_ == true) {
00906       return DDS::RETCODE_IMMUTABLE_POLICY;
00907 
00908     } else {
00909       Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
00910       DDS::PublisherQos publisherQos;
00911       this->publisher_servant_->get_qos(publisherQos);
00912       const bool status
00913         = disco->update_publication_qos(this->participant_servant_->get_domain_id(),
00914                                         this->participant_servant_->get_id(),
00915                                         this->publication_id_,
00916                                         qos,
00917                                         publisherQos);
00918 
00919       if (!status) {
00920         ACE_ERROR_RETURN((LM_ERROR,
00921                           ACE_TEXT("(%P|%t) DataWriterImpl::set_qos, ")
00922                           ACE_TEXT("qos not updated. \n")),
00923                          DDS::RETCODE_ERROR);
00924       }
00925     }
00926 
00927     if (!(qos_ == qos)) {
00928       // Reset the deadline timer if the period has changed.
00929       if (qos_.deadline.period.sec != qos.deadline.period.sec
00930           || qos_.deadline.period.nanosec != qos.deadline.period.nanosec) {
00931         if (qos_.deadline.period.sec == DDS::DURATION_INFINITE_SEC
00932             && qos_.deadline.period.nanosec == DDS::DURATION_INFINITE_NSEC) {
00933           this->watchdog_ =
00934                              new OfferedDeadlineWatchdog(
00935                                this->lock_,
00936                                qos.deadline,
00937                                this,
00938                                this->dw_local_objref_.in(),
00939                                this->offered_deadline_missed_status_,
00940                                this->last_deadline_missed_total_count_);
00941 
00942         } else if (qos.deadline.period.sec == DDS::DURATION_INFINITE_SEC
00943                    && qos.deadline.period.nanosec == DDS::DURATION_INFINITE_NSEC) {
00944           this->watchdog_->cancel_all();
00945           this->watchdog_->destroy();
00946           this->watchdog_ = 0;
00947 
00948         } else {
00949           this->watchdog_->reset_interval(
00950             duration_to_time_value(qos.deadline.period));
00951         }
00952       }
00953 
00954       qos_ = qos;
00955     }
00956 
00957     return DDS::RETCODE_OK;
00958 
00959   } else {
00960     return DDS::RETCODE_INCONSISTENT_POLICY;
00961   }
00962 }

bool OpenDDS::DCPS::DataWriterImpl::should_ack (  )  const

Does this writer have samples to be acknowledged?

Definition at line 994 of file DataWriterImpl.cpp.

00995 {
00996   // N.B. It may be worthwhile to investigate a more efficient
00997   // heuristic for determining if a writer should send SAMPLE_ACK
00998   // control samples. Perhaps based on a sequence number delta?
00999   return this->readers_.size() != 0;
01000 }

void OpenDDS::DCPS::DataWriterImpl::track_sequence_number ( GUIDSeq filter_out  )  [private]

Definition at line 1789 of file DataWriterImpl.cpp.

References reader_info_, and sequence_number_.

Referenced by write().

01790 {
01791   ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
01792 
01793 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
01794   // Track individual expected sequence numbers in ReaderInfo
01795   RepoIdSet excluded;
01796 
01797   if (filter_out && !reader_info_.empty()) {
01798     const GUID_t* buf = filter_out->get_buffer();
01799     excluded.insert(buf, buf + filter_out->length());
01800   }
01801 
01802   for (RepoIdToReaderInfoMap::iterator iter = reader_info_.begin(),
01803        end = reader_info_.end(); iter != end; ++iter) {
01804     // If not excluding this reader, update expected sequence
01805     if (excluded.count(iter->first) == 0) {
01806       iter->second.expected_sequence_ = sequence_number_;
01807     }
01808   }
01809 
01810 #else
01811   ACE_UNUSED_ARG(filter_out);
01812   for (RepoIdToReaderInfoMap::iterator iter = reader_info_.begin(),
01813        end = reader_info_.end(); iter != end; ++iter) {
01814     iter->second.expected_sequence_ = sequence_number_;
01815   }
01816 
01817 #endif // OPENDDS_NO_CONTENT_FILTERED_TOPIC
01818 
01819 }

void OpenDDS::DCPS::DataWriterImpl::transport_assoc_done ( int  flags,
const RepoId remote_id 
) [virtual]

Reimplemented from OpenDDS::DCPS::TransportClient.

Definition at line 279 of file DataWriterImpl.cpp.

References OpenDDS::DCPS::TransportClient::ASSOC_ACTIVE, assoc_complete_readers_, OpenDDS::DCPS::TransportClient::ASSOC_OK, association_complete_i(), DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, domain_id_, OpenDDS::DCPS::DomainParticipantImpl::get_id(), OpenDDS::DCPS::insert(), lock_, OPENDDS_STRING, participant_servant_, pending_readers_, publication_id_, and TheServiceParticipant.

00280 {
00281   DBG_ENTRY_LVL("DataWriterImpl", "transport_assoc_done", 6);
00282 
00283   if (!(flags & ASSOC_OK)) {
00284     if (DCPS_debug_level) {
00285       const GuidConverter conv(remote_id);
00286       ACE_DEBUG((LM_ERROR,
00287                  ACE_TEXT("(%P|%t) DataWriterImpl::transport_assoc_done: ")
00288                  ACE_TEXT("ERROR: transport layer failed to associate %C\n"),
00289                  OPENDDS_STRING(conv).c_str()));
00290     }
00291 
00292     return;
00293   }
00294   if (DCPS_debug_level) {
00295     const GuidConverter writer_conv(publication_id_);
00296     const GuidConverter conv(remote_id);
00297     ACE_DEBUG((LM_INFO,
00298                ACE_TEXT("(%P|%t) DataWriterImpl::transport_assoc_done: ")
00299                ACE_TEXT(" writer %C succeeded in associating with reader %C\n"),
00300                OPENDDS_STRING(writer_conv).c_str(),
00301                OPENDDS_STRING(conv).c_str()));
00302   }
00303   if (flags & ASSOC_ACTIVE) {
00304 
00305     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, lock_);
00306 
00307     // Have we already received an association_complete() callback?
00308     if (assoc_complete_readers_.count(remote_id)) {
00309       if (DCPS_debug_level) {
00310         const GuidConverter writer_conv(publication_id_);
00311         const GuidConverter converter(remote_id);
00312         ACE_DEBUG((LM_DEBUG,
00313                    ACE_TEXT("(%P|%t) DataWriterImpl::transport_assoc_done: ")
00314                    ACE_TEXT("writer %C found assoc_complete_reader %C, continue with association_complete_i\n"),
00315                    OPENDDS_STRING(writer_conv).c_str(),
00316                    OPENDDS_STRING(converter).c_str()));
00317       }
00318       assoc_complete_readers_.erase(remote_id);
00319       association_complete_i(remote_id);
00320 
00321       // Add to pending_readers_ -> pending means we are waiting
00322       // for the association_complete() callback.
00323 
00324     } else if (OpenDDS::DCPS::insert(pending_readers_, remote_id) == -1) {
00325       const GuidConverter converter(remote_id);
00326       ACE_ERROR((LM_ERROR,
00327                  ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::transport_assoc_done: ")
00328                  ACE_TEXT("failed to mark %C as pending.\n"),
00329                  OPENDDS_STRING(converter).c_str()));
00330 
00331     } else {
00332       if (DCPS_debug_level) {
00333         const GuidConverter converter(remote_id);
00334         ACE_DEBUG((LM_DEBUG,
00335                    ACE_TEXT("(%P|%t) DataWriterImpl::transport_assoc_done: ")
00336                    ACE_TEXT("marked %C as pending.\n"),
00337                    OPENDDS_STRING(converter).c_str()));
00338       }
00339     }
00340 
00341   } else {
00342     // In the current implementation, DataWriter is always active, so this
00343     // code will not be applicable.
00344     if (DCPS_debug_level) {
00345       const GuidConverter conv(publication_id_);
00346       ACE_DEBUG((LM_ERROR,
00347                  ACE_TEXT("(%P|%t) DataWriterImpl::transport_assoc_done: ")
00348                  ACE_TEXT("ERROR: DataWriter (%C) should always be active in current implementation\n"),
00349                  OPENDDS_STRING(conv).c_str()));
00350     }
00351     Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
00352     disco->association_complete(domain_id_, participant_servant_->get_id(),
00353                                 publication_id_, remote_id);
00354   }
00355 }

void OpenDDS::DCPS::DataWriterImpl::unregister_all (  ) 

Delegate to WriteDataContainer to unregister all instances.

Definition at line 1907 of file DataWriterImpl.cpp.

References cancel_timer_, data_container_, reactor_, and OpenDDS::DCPS::WriteDataContainer::unregister_all().

Referenced by OpenDDS::DCPS::PublisherImpl::delete_datawriter().

01908 {
01909   if (cancel_timer_) {
01910     // The cancel_timer will call handle_close to remove_ref.
01911     (void) reactor_->cancel_timer(this, 0);
01912     cancel_timer_ = false;
01913   }
01914 
01915   data_container_->unregister_all();
01916 }

void OpenDDS::DCPS::DataWriterImpl::unregister_for_reader ( const RepoId participant,
const RepoId writerid,
const RepoId readerid 
) [virtual]

Reimplemented from OpenDDS::DCPS::DataWriterCallbacks.

Definition at line 813 of file DataWriterImpl.cpp.

References OpenDDS::DCPS::TransportClient::unregister_for_reader().

00816 {
00817   TransportClient::unregister_for_reader(participant, writerid, readerid);
00818 }

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::unregister_instance_i ( DDS::InstanceHandle_t  handle,
const DDS::Time_t source_timestamp 
)

Delegate to the WriteDataContainer to unregister and tell the transport to broadcast the unregistered instance.

Definition at line 1554 of file DataWriterImpl.cpp.

References create_control_message(), data_container_, DBG_ENTRY_LVL, dispose_and_unregister(), OpenDDS::DCPS::EntityImpl::enabled_, OpenDDS::DCPS::WriteDataContainer::enqueue_control(), OpenDDS::DCPS::DataSampleElement::get_header(), get_lock(), OpenDDS::DCPS::WriteDataContainer::obtain_buffer_for_control(), DDS::RETCODE_ERROR, DDS::RETCODE_NOT_ENABLED, DDS::RETCODE_OK, send_all_to_flush_control(), OpenDDS::DCPS::DataSampleElement::set_sample(), OpenDDS::DCPS::WriteDataContainer::unregister(), and OpenDDS::DCPS::UNREGISTER_INSTANCE.

Referenced by OpenDDS::DCPS::DataWriterImpl_T< MessageType >::unregister_instance_w_timestamp().

01556 {
01557   DBG_ENTRY_LVL("DataWriterImpl","unregister_instance_i",6);
01558 
01559   if (enabled_ == false) {
01560     ACE_ERROR_RETURN((LM_ERROR,
01561                       ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::unregister_instance_i: ")
01562                       ACE_TEXT(" Entity is not enabled.\n")),
01563                      DDS::RETCODE_NOT_ENABLED);
01564   }
01565 
01566   // According to spec 1.2, autodispose_unregistered_instances true causes
01567   // dispose on the instance prior to calling unregister operation.
01568   if (this->qos_.writer_data_lifecycle.autodispose_unregistered_instances) {
01569     return this->dispose_and_unregister(handle, source_timestamp);
01570   }
01571 
01572   DDS::ReturnCode_t ret = DDS::RETCODE_ERROR;
01573   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, get_lock(), ret);
01574   DataSample* unregistered_sample_data = 0;
01575   ret = this->data_container_->unregister(handle, unregistered_sample_data);
01576 
01577   if (ret != DDS::RETCODE_OK) {
01578     ACE_ERROR_RETURN((LM_ERROR,
01579                       ACE_TEXT("(%P|%t) ERROR: ")
01580                       ACE_TEXT("DataWriterImpl::unregister_instance_i: ")
01581                       ACE_TEXT(" unregister with container failed. \n")),
01582                      ret);
01583   }
01584 
01585   DataSampleElement* element = 0;
01586   ret = this->data_container_->obtain_buffer_for_control(element);
01587 
01588   if (ret != DDS::RETCODE_OK) {
01589     ACE_ERROR_RETURN((LM_ERROR,
01590                       ACE_TEXT("(%P|%t) ERROR: ")
01591                       ACE_TEXT("DataWriterImpl::unregister_instance_i: ")
01592                       ACE_TEXT("obtain_buffer_for_control returned %d.\n"),
01593                       ret),
01594                      ret);
01595   }
01596 
01597   element->set_sample(create_control_message(UNREGISTER_INSTANCE,
01598                                              element->get_header(),
01599                                              unregistered_sample_data,
01600                                              source_timestamp));
01601   ret = this->data_container_->enqueue_control(element);
01602 
01603   if (ret != DDS::RETCODE_OK) {
01604     ACE_ERROR_RETURN((LM_ERROR,
01605                       ACE_TEXT("(%P|%t) ERROR: ")
01606                       ACE_TEXT("DataWriterImpl::unregister_instance_i: ")
01607                       ACE_TEXT("enqueue_control failed.\n")),
01608                      ret);
01609   }
01610 
01611   send_all_to_flush_control(guard);
01612   return DDS::RETCODE_OK;
01613 }

void OpenDDS::DCPS::DataWriterImpl::unregister_instances ( const DDS::Time_t source_timestamp  ) 

Unregister all registered instances and tell the transport to broadcast the unregistered instances.

Definition at line 1677 of file DataWriterImpl.cpp.

References data_container_, OpenDDS::DCPS::WriteDataContainer::instances_, and sync_unreg_rem_assocs_lock_.

Referenced by OpenDDS::DCPS::PublisherImpl::delete_datawriter().

01678 {
01679   {
01680     ACE_GUARD(ACE_Thread_Mutex, guard, sync_unreg_rem_assocs_lock_);
01681 
01682     PublicationInstanceMapType::iterator it =
01683       this->data_container_->instances_.begin();
01684 
01685     while (it != this->data_container_->instances_.end()) {
01686       DDS::InstanceHandle_t handle = it->first;
01687       ++it; // avoid mangling the iterator
01688 
01689       this->unregister_instance_i(handle, source_timestamp);
01690     }
01691   }
01692 }

virtual void OpenDDS::DCPS::DataWriterImpl::unregistered ( DDS::InstanceHandle_t  instance_handle  )  [pure virtual]

This method is called when an instance is unregistered from the WriteDataContainer.

The subclass must provide the implementation to unregister the instance from its own map.

Referenced by OpenDDS::DCPS::WriteDataContainer::unregister().

void OpenDDS::DCPS::DataWriterImpl::update_incompatible_qos ( const IncompatibleQosStatus status  )  [virtual]

Implements OpenDDS::DCPS::DataWriterCallbacks.

Definition at line 821 of file DataWriterImpl.cpp.

References OpenDDS::DCPS::IncompatibleQosStatus::count_since_last_send, dw_local_objref_, OpenDDS::DCPS::IncompatibleQosStatus::last_policy_id, DDS::OfferedIncompatibleQosStatus::last_policy_id, listener_for(), OpenDDS::DCPS::EntityImpl::notify_status_condition(), DDS::OFFERED_INCOMPATIBLE_QOS_STATUS, offered_incompatible_qos_status_, OpenDDS::DCPS::IncompatibleQosStatus::policies, DDS::OfferedIncompatibleQosStatus::policies, OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), DDS::OfferedIncompatibleQosStatus::total_count, OpenDDS::DCPS::IncompatibleQosStatus::total_count, and DDS::OfferedIncompatibleQosStatus::total_count_change.

00822 {
00823   DDS::DataWriterListener_var listener =
00824     listener_for(DDS::OFFERED_INCOMPATIBLE_QOS_STATUS);
00825 
00826   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00827 
00828 #if 0
00829 
00830   if (this->offered_incompatible_qos_status_.total_count == status.total_count) {
00831     // This test should make the method idempotent.
00832     return;
00833   }
00834 
00835 #endif
00836 
00837   set_status_changed_flag(DDS::OFFERED_INCOMPATIBLE_QOS_STATUS, true);
00838 
00839   // copy status and increment change
00840   offered_incompatible_qos_status_.total_count = status.total_count;
00841   offered_incompatible_qos_status_.total_count_change +=
00842     status.count_since_last_send;
00843   offered_incompatible_qos_status_.last_policy_id = status.last_policy_id;
00844   offered_incompatible_qos_status_.policies = status.policies;
00845 
00846   if (!CORBA::is_nil(listener.in())) {
00847     listener->on_offered_incompatible_qos(dw_local_objref_.in(),
00848                                           offered_incompatible_qos_status_);
00849 
00850     // TBD - Why does the spec say to change this but not change the
00851     //       ChangeFlagStatus after a listener call?
00852     offered_incompatible_qos_status_.total_count_change = 0;
00853   }
00854 
00855   notify_status_condition();
00856 }

void OpenDDS::DCPS::DataWriterImpl::update_subscription_params ( const RepoId readerId,
const DDS::StringSeq params 
) [virtual]

Implements OpenDDS::DCPS::DataWriterCallbacks.

Definition at line 859 of file DataWriterImpl.cpp.

References OpenDDS::DCPS::DCPS_debug_level, OPENDDS_STRING, reader_info_, and TheServiceParticipant.

00861 {
00862 #ifdef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00863   ACE_UNUSED_ARG(readerId);
00864   ACE_UNUSED_ARG(params);
00865 #else
00866   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00867   ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
00868   RepoIdToReaderInfoMap::iterator iter = reader_info_.find(readerId);
00869 
00870   if (iter != reader_info_.end()) {
00871     iter->second.expression_params_ = params;
00872 
00873   } else if (DCPS_debug_level > 4 &&
00874              TheServiceParticipant->publisher_content_filter()) {
00875     GuidConverter pubConv(this->publication_id_), subConv(readerId);
00876     ACE_DEBUG((LM_WARNING,
00877                ACE_TEXT("(%P|%t) WARNING: DataWriterImpl::update_subscription_params()")
00878                ACE_TEXT(" - writer: %C has no info about reader: %C\n"),
00879                OPENDDS_STRING(pubConv).c_str(), OPENDDS_STRING(subConv).c_str()));
00880   }
00881 
00882 #endif
00883 }

void OpenDDS::DCPS::DataWriterImpl::wait_control_pending (  ) 

Wait until pending control elements have either been delivered or dropped.

Definition at line 2599 of file DataWriterImpl.cpp.

References controlTracker, OPENDDS_STRING, and OpenDDS::DCPS::MessageTracker::wait_messages_pending().

Referenced by OpenDDS::DCPS::PublisherImpl::delete_datawriter().

02600 {
02601   OPENDDS_STRING caller_string("DataWriterImpl::wait_control_pending");
02602   controlTracker.wait_messages_pending(caller_string);
02603 }

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::wait_for_acknowledgments ( const DDS::Duration_t max_wait  )  [virtual]

Definition at line 1015 of file DataWriterImpl.cpp.

References create_ack_token(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::SequenceNumber::getValue(), DDS::RELIABLE_RELIABILITY_QOS, DDS::RETCODE_OK, OpenDDS::DCPS::DataWriterImpl::AckToken::sequence_, and wait_for_specific_ack().

01016 {
01017   if (this->qos_.reliability.kind != DDS::RELIABLE_RELIABILITY_QOS)
01018     return DDS::RETCODE_OK;
01019   DataWriterImpl::AckToken token = create_ack_token(max_wait);
01020   if (DCPS_debug_level) {
01021     ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) DataWriterImpl::wait_for_acknowledgments")
01022                           ACE_TEXT(" waiting for acknowledgment of sequence %q at %T\n"),
01023                           token.sequence_.getValue()));
01024   }
01025   return wait_for_specific_ack(token);
01026 }

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::wait_for_specific_ack ( const AckToken token  )  [protected]

Definition at line 1029 of file DataWriterImpl.cpp.

References data_container_, OpenDDS::DCPS::DataWriterImpl::AckToken::deadline(), OpenDDS::DCPS::DataWriterImpl::AckToken::sequence_, and OpenDDS::DCPS::WriteDataContainer::wait_ack_of_seq().

Referenced by wait_for_acknowledgments().

01030 {
01031   return this->data_container_->wait_ack_of_seq(token.deadline(), token.sequence_);
01032 }

void OpenDDS::DCPS::DataWriterImpl::wait_pending (  ) 

Wait for pending samples to drain.

Definition at line 2606 of file DataWriterImpl.cpp.

References data_container_, and OpenDDS::DCPS::WriteDataContainer::wait_pending().

Referenced by OpenDDS::DCPS::PublisherImpl::delete_datawriter().

02607 {
02608   this->data_container_->wait_pending();
02609 }

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::write ( DataSample sample,
DDS::InstanceHandle_t  handle,
const DDS::Time_t source_timestamp,
GUIDSeq filter_out 
)

Delegate to the WriteDataContainer to queue the instance sample and finally tell the transport to send the sample.

Parameters:
filter_out can either be null (if the writer can't or won't evaluate the filters), or a list of associated reader RepoIds that should NOT get the data sample due to content filtering.

Definition at line 1695 of file DataWriterImpl.cpp.

References available_data_list_, coherent_samples_, create_sample_data_message(), data_container_, DBG_ENTRY_LVL, OpenDDS::DCPS::EntityImpl::enabled_, OpenDDS::DCPS::WriteDataContainer::enqueue(), OpenDDS::DCPS::SendStateDataSampleList::enqueue_tail(), OpenDDS::DCPS::DataSampleElement::get_header(), get_lock(), get_unsent_data(), last_liveliness_activity_time_, max_suspended_transaction_id_, min_suspended_transaction_id_, OpenDDS::DCPS::WriteDataContainer::obtain_buffer(), DDS::RETCODE_ERROR, DDS::RETCODE_NOT_ENABLED, DDS::RETCODE_OK, DDS::RETCODE_TIMEOUT, OpenDDS::DCPS::TransportClient::send(), OpenDDS::DCPS::DataSampleElement::set_filter_out(), OpenDDS::DCPS::DataSampleElement::set_sample(), and track_sequence_number().

Referenced by OpenDDS::DCPS::DataDurabilityCache::get_data(), and OpenDDS::DCPS::DataWriterImpl_T< MessageType >::write_w_timestamp().

01699 {
01700   DBG_ENTRY_LVL("DataWriterImpl","write",6);
01701 
01702   ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
01703                     guard,
01704                     get_lock (),
01705                     ::DDS::RETCODE_ERROR);
01706 
01707   // take ownership of sequence allocated in FooDWImpl::write_w_timestamp()
01708   GUIDSeq_var filter_out_var(filter_out);
01709 
01710   if (enabled_ == false) {
01711     ACE_ERROR_RETURN((LM_ERROR,
01712                       ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::write: ")
01713                       ACE_TEXT(" Entity is not enabled. \n")),
01714                      DDS::RETCODE_NOT_ENABLED);
01715   }
01716 
01717   DataSampleElement* element = 0;
01718   DDS::ReturnCode_t ret = this->data_container_->obtain_buffer(element, handle);
01719 
01720   if (ret == DDS::RETCODE_TIMEOUT) {
01721     return ret; // silent for timeout
01722 
01723   } else if (ret != DDS::RETCODE_OK) {
01724     ACE_ERROR_RETURN((LM_ERROR,
01725                       ACE_TEXT("(%P|%t) ERROR: ")
01726                       ACE_TEXT("DataWriterImpl::write: ")
01727                       ACE_TEXT("obtain_buffer returned %d.\n"),
01728                       ret),
01729                      ret);
01730   }
01731 
01732   DataSample* temp;
01733   ret = create_sample_data_message(data,
01734                                    handle,
01735                                    element->get_header(),
01736                                    temp,
01737                                    source_timestamp,
01738                                    (filter_out != 0));
01739   element->set_sample(temp);
01740 
01741   if (ret != DDS::RETCODE_OK) {
01742     return ret;
01743   }
01744 
01745   element->set_filter_out(filter_out_var._retn()); // ownership passed to element
01746 
01747   ret = this->data_container_->enqueue(element, handle);
01748 
01749   if (ret != DDS::RETCODE_OK) {
01750     ACE_ERROR_RETURN((LM_ERROR,
01751                       ACE_TEXT("(%P|%t) ERROR: ")
01752                       ACE_TEXT("DataWriterImpl::write: ")
01753                       ACE_TEXT("enqueue failed.\n")),
01754                      ret);
01755   }
01756   this->last_liveliness_activity_time_ = ACE_OS::gettimeofday();
01757 
01758   track_sequence_number(filter_out);
01759 
01760   if (this->coherent_) {
01761     ++this->coherent_samples_;
01762   }
01763   SendStateDataSampleList list;
01764 
01765   ACE_UINT64 transaction_id = this->get_unsent_data(list);
01766 
01767   if (this->publisher_servant_->is_suspended()) {
01768     if (min_suspended_transaction_id_ == 0) {
01769       //provides transaction id for lower bound of suspended transactions
01770       //or transaction id for single suspended write transaction
01771       min_suspended_transaction_id_ = transaction_id;
01772     } else {
01773       //when multiple write transactions have suspended, provides the upper bound
01774       //for suspended transactions.
01775       max_suspended_transaction_id_ = transaction_id;
01776     }
01777     this->available_data_list_.enqueue_tail(list);
01778 
01779   } else {
01780     guard.release();
01781 
01782     this->send(list, transaction_id);
01783   }
01784 
01785   return DDS::RETCODE_OK;
01786 }


Friends And Related Function Documentation

friend class ::DDS_TEST [friend]

Reimplemented from OpenDDS::DCPS::TransportClient.

Reimplemented in OpenDDS::DCPS::DataWriterImpl_T< MessageType >.

Definition at line 584 of file DataWriterImpl.h.

friend class PublisherImpl [friend]

Definition at line 85 of file DataWriterImpl.h.

friend class WriteDataContainer [friend]

Definition at line 84 of file DataWriterImpl.h.

Referenced by enable().


Member Data Documentation

RepoIdSet OpenDDS::DCPS::DataWriterImpl::assoc_complete_readers_ [private]

Definition at line 683 of file DataWriterImpl.h.

Referenced by association_complete(), and transport_assoc_done().

size_t OpenDDS::DCPS::DataWriterImpl::association_chunk_multiplier_ [protected]

The multiplier for allocators affected by associations.

Definition at line 489 of file DataWriterImpl.h.

Referenced by enable().

SendStateDataSampleList OpenDDS::DCPS::DataWriterImpl::available_data_list_ [private]

Definition at line 688 of file DataWriterImpl.h.

Referenced by send_suspended_data(), and write().

bool OpenDDS::DCPS::DataWriterImpl::cancel_timer_ [private]

The flag indicates whether the liveliness timer is scheduled and needs be cancelled.

Definition at line 674 of file DataWriterImpl.h.

Referenced by cleanup(), enable(), and unregister_all().

bool OpenDDS::DCPS::DataWriterImpl::coherent_ [private]

Flag indicating DataWriter current belongs to a coherent change set.

Definition at line 612 of file DataWriterImpl.h.

Referenced by begin_coherent_changes(), coherent_changes_pending(), create_sample_data_message(), and end_coherent_changes().

ACE_UINT32 OpenDDS::DCPS::DataWriterImpl::coherent_samples_ [private]

The number of samples belonging to the current coherent change set.

Definition at line 615 of file DataWriterImpl.h.

Referenced by end_coherent_changes(), and write().

MessageTracker OpenDDS::DCPS::DataWriterImpl::controlTracker

Definition at line 423 of file DataWriterImpl.h.

Referenced by association_complete_i(), control_delivered(), control_dropped(), OpenDDS::DCPS::WriteDataContainer::data_delivered(), OpenDDS::DCPS::WriteDataContainer::data_dropped(), pending_control(), send_all_to_flush_control(), send_control(), and wait_control_pending().

WriteDataContainer* OpenDDS::DCPS::DataWriterImpl::data_container_ [private]

The sample data container.

Definition at line 617 of file DataWriterImpl.h.

Referenced by association_complete_i(), create_sample_data_message(), data_delivered(), data_dropped(), dispose(), dispose_and_unregister(), enable(), get_handle_instance(), get_instance_handles(), num_samples(), persist_data(), register_instance_i(), reschedule_deadline(), unregister_all(), unregister_instance_i(), unregister_instances(), wait_for_specific_ack(), wait_pending(), write(), and ~DataWriterImpl().

int OpenDDS::DCPS::DataWriterImpl::data_delivered_count_

Definition at line 421 of file DataWriterImpl.h.

Referenced by data_delivered().

int OpenDDS::DCPS::DataWriterImpl::data_dropped_count_

Statistics counter.

Definition at line 420 of file DataWriterImpl.h.

Referenced by data_dropped().

DataBlockAllocator* OpenDDS::DCPS::DataWriterImpl::db_allocator_ [private]

The data block allocator.

Reimplemented in OpenDDS::DCPS::DataWriterImpl_T< MessageType >.

Definition at line 653 of file DataWriterImpl.h.

Referenced by create_control_message(), create_sample_data_message(), enable(), and ~DataWriterImpl().

DataBlockLockPool* OpenDDS::DCPS::DataWriterImpl::db_lock_pool_ [private]

Definition at line 697 of file DataWriterImpl.h.

Referenced by DataWriterImpl(), and ~DataWriterImpl().

DDS::DomainId_t OpenDDS::DCPS::DataWriterImpl::domain_id_ [private]

The domain id.

Definition at line 601 of file DataWriterImpl.h.

Referenced by enable(), init(), send_liveliness(), set_qos(), and transport_assoc_done().

DDS::DataWriter_var OpenDDS::DCPS::DataWriterImpl::dw_local_objref_ [private]

the object reference of the local datawriter

Definition at line 605 of file DataWriterImpl.h.

Referenced by association_complete_i(), cleanup(), init(), and update_incompatible_qos().

DataSampleHeaderAllocator* OpenDDS::DCPS::DataWriterImpl::header_allocator_ [private]

The header data allocator.

Definition at line 655 of file DataWriterImpl.h.

Referenced by create_sample_data_message(), enable(), and ~DataWriterImpl().

RepoIdToHandleMap OpenDDS::DCPS::DataWriterImpl::id_to_handle_map_ [private]

Definition at line 624 of file DataWriterImpl.h.

Referenced by association_complete_i(), and get_matched_subscriptions().

bool OpenDDS::DCPS::DataWriterImpl::initialized_ [private]

Flag indicates that the init() is called.

Definition at line 681 of file DataWriterImpl.h.

Referenced by init(), and ~DataWriterImpl().

bool OpenDDS::DCPS::DataWriterImpl::is_bit_ [private]

Flag indicates that this datawriter is a builtin topic datawriter.

Definition at line 678 of file DataWriterImpl.h.

Referenced by add_association(), association_complete(), association_complete_i(), init(), notify_publication_disconnected(), notify_publication_lost(), and notify_publication_reconnected().

CORBA::Long OpenDDS::DCPS::DataWriterImpl::last_deadline_missed_total_count_ [private]

Total number of offered deadlines missed during last offered deadline status check.

Definition at line 668 of file DataWriterImpl.h.

Referenced by enable(), get_offered_deadline_missed_status(), and set_qos().

ACE_Time_Value OpenDDS::DCPS::DataWriterImpl::last_liveliness_activity_time_ [private]

Timestamp of last write/dispose/assert_liveliness.

Definition at line 663 of file DataWriterImpl.h.

Referenced by handle_timeout(), participant_liveliness_activity_after(), send_liveliness(), and write().

ACE_Time_Value OpenDDS::DCPS::DataWriterImpl::last_liveliness_check_time_ [private]

Timestamp of the last time liveliness was checked.

Definition at line 665 of file DataWriterImpl.h.

Referenced by enable(), and handle_timeout().

DDS::DataWriterListener_var OpenDDS::DCPS::DataWriterImpl::listener_ [private]

Used to notify the entity for relevant events.

Definition at line 599 of file DataWriterImpl.h.

Referenced by get_listener(), init(), listener_for(), and set_listener().

DDS::StatusMask OpenDDS::DCPS::DataWriterImpl::listener_mask_ [private]

The StatusKind bit mask indicates which status condition change can be notified by the listener of this entity.

Definition at line 597 of file DataWriterImpl.h.

Referenced by init(), listener_for(), and set_listener().

bool OpenDDS::DCPS::DataWriterImpl::liveliness_asserted_ [private]

Definition at line 706 of file DataWriterImpl.h.

Referenced by assert_liveliness_by_participant(), and handle_timeout().

ACE_Time_Value OpenDDS::DCPS::DataWriterImpl::liveliness_check_interval_ [private]

The time interval for sending liveliness message.

Definition at line 661 of file DataWriterImpl.h.

Referenced by enable(), handle_timeout(), and liveliness_check_interval().

bool OpenDDS::DCPS::DataWriterImpl::liveliness_lost_ [private]

True if the writer failed to actively signal its liveliness within its offered liveliness period.

Definition at line 636 of file DataWriterImpl.h.

Referenced by handle_timeout().

DDS::LivelinessLostStatus OpenDDS::DCPS::DataWriterImpl::liveliness_lost_status_ [private]

Status conditions.

Definition at line 629 of file DataWriterImpl.h.

Referenced by DataWriterImpl(), get_liveliness_lost_status(), and handle_timeout().

ACE_Recursive_Thread_Mutex OpenDDS::DCPS::DataWriterImpl::lock_ [private]

The lock to protect the activate subscriptions and status changes.

Reimplemented from OpenDDS::DCPS::EntityImpl.

Definition at line 620 of file DataWriterImpl.h.

Referenced by remove_all_associations(), and transport_assoc_done().

ACE_UINT64 OpenDDS::DCPS::DataWriterImpl::max_suspended_transaction_id_ [private]

Definition at line 687 of file DataWriterImpl.h.

Referenced by send_suspended_data(), and write().

MessageBlockAllocator* OpenDDS::DCPS::DataWriterImpl::mb_allocator_ [private]

The message block allocator.

Todo:
The publication_lost_status_ and publication_reconnecting_status_ are left here for future use when we add get_publication_lost_status() and get_publication_reconnecting_status() methods.

Reimplemented in OpenDDS::DCPS::DataWriterImpl_T< MessageType >.

Definition at line 651 of file DataWriterImpl.h.

Referenced by create_control_message(), create_sample_data_message(), enable(), and ~DataWriterImpl().

ACE_UINT64 OpenDDS::DCPS::DataWriterImpl::min_suspended_transaction_id_ [private]

The cached available data while suspending and associated transaction ids.

Definition at line 686 of file DataWriterImpl.h.

Referenced by send_suspended_data(), and write().

Monitor* OpenDDS::DCPS::DataWriterImpl::monitor_ [private]

Monitor object for this entity.

Definition at line 691 of file DataWriterImpl.h.

Referenced by association_complete_i(), DataWriterImpl(), enable(), and register_instance_i().

size_t OpenDDS::DCPS::DataWriterImpl::n_chunks_ [protected]

The number of chunks for the cached allocator.

Definition at line 486 of file DataWriterImpl.h.

Referenced by DataWriterImpl(), and enable().

DDS::OfferedDeadlineMissedStatus OpenDDS::DCPS::DataWriterImpl::offered_deadline_missed_status_ [private]

Definition at line 630 of file DataWriterImpl.h.

Referenced by DataWriterImpl(), enable(), get_offered_deadline_missed_status(), and set_qos().

DDS::OfferedIncompatibleQosStatus OpenDDS::DCPS::DataWriterImpl::offered_incompatible_qos_status_ [private]

Definition at line 631 of file DataWriterImpl.h.

Referenced by DataWriterImpl(), get_offered_incompatible_qos_status(), and update_incompatible_qos().

DomainParticipantImpl* OpenDDS::DCPS::DataWriterImpl::participant_servant_ [protected]

The participant servant which creats the publisher that creates this datawriter.

Definition at line 505 of file DataWriterImpl.h.

Referenced by add_association(), assert_liveliness(), association_complete_i(), enable(), get_dp_id(), get_instance_handle(), get_matched_subscription_data(), get_next_handle(), init(), set_qos(), and transport_assoc_done().

RepoIdSet OpenDDS::DCPS::DataWriterImpl::pending_readers_ [private]

Definition at line 683 of file DataWriterImpl.h.

Referenced by association_complete(), and transport_assoc_done().

Monitor* OpenDDS::DCPS::DataWriterImpl::periodic_monitor_ [private]

Periodic Monitor object for this entity.

Definition at line 694 of file DataWriterImpl.h.

Referenced by DataWriterImpl().

PublicationId OpenDDS::DCPS::DataWriterImpl::publication_id_ [private]

The repository id of this datawriter/publication.

Definition at line 607 of file DataWriterImpl.h.

Referenced by add_association(), create_control_message(), create_sample_data_message(), data_delivered(), enable(), get_instance_handle(), get_publication_id(), set_qos(), and transport_assoc_done().

DDS::PublicationMatchedStatus OpenDDS::DCPS::DataWriterImpl::publication_match_status_ [private]

Definition at line 632 of file DataWriterImpl.h.

Referenced by association_complete_i(), DataWriterImpl(), and get_publication_matched_status().

PublisherImpl* OpenDDS::DCPS::DataWriterImpl::publisher_servant_ [private]

The publisher servant which creates this datawriter.

Definition at line 603 of file DataWriterImpl.h.

Referenced by create_control_message(), create_sample_data_message(), enable(), end_coherent_changes(), get_publisher(), init(), listener_for(), parent(), retrieve_inline_qos_data(), and set_qos().

DDS::DataWriterQos OpenDDS::DCPS::DataWriterImpl::qos_ [protected]

The qos policy list of this datawriter.

Definition at line 501 of file DataWriterImpl.h.

Referenced by add_association(), create_sample_data_message(), enable(), get_qos(), handle_timeout(), init(), retrieve_inline_qos_data(), and set_qos().

ACE_Reactor_Timer_Interface* OpenDDS::DCPS::DataWriterImpl::reactor_ [private]

The orb's reactor to be used to register the liveliness timer.

Definition at line 659 of file DataWriterImpl.h.

Referenced by cleanup(), enable(), handle_timeout(), init(), and unregister_all().

RepoIdToReaderInfoMap OpenDDS::DCPS::DataWriterImpl::reader_info_ [protected]

Definition at line 526 of file DataWriterImpl.h.

Referenced by add_association(), association_complete_i(), create_control_message(), need_sequence_repair_i(), track_sequence_number(), and update_subscription_params().

ACE_Thread_Mutex OpenDDS::DCPS::DataWriterImpl::reader_info_lock_ [protected]

Definition at line 508 of file DataWriterImpl.h.

RepoIdSet OpenDDS::DCPS::DataWriterImpl::readers_ [private]

Definition at line 626 of file DataWriterImpl.h.

Referenced by association_complete_i().

SequenceNumber OpenDDS::DCPS::DataWriterImpl::sequence_number_ [private]

The sequence number unique in DataWriter scope.

Definition at line 609 of file DataWriterImpl.h.

Referenced by create_control_message(), create_sample_data_message(), end_coherent_changes(), need_sequence_repair_i(), and track_sequence_number().

ACE_Thread_Mutex OpenDDS::DCPS::DataWriterImpl::sync_unreg_rem_assocs_lock_ [private]

Definition at line 710 of file DataWriterImpl.h.

Referenced by unregister_instances().

RepoId OpenDDS::DCPS::DataWriterImpl::topic_id_ [private]

The associated topic repository id.

Definition at line 589 of file DataWriterImpl.h.

Referenced by init().

CORBA::String_var OpenDDS::DCPS::DataWriterImpl::topic_name_ [private]

The name of associated topic.

Definition at line 587 of file DataWriterImpl.h.

Referenced by enable(), get_topic_name(), init(), and retrieve_inline_qos_data().

DDS::Topic_var OpenDDS::DCPS::DataWriterImpl::topic_objref_ [private]

The object reference of the associated topic.

Definition at line 591 of file DataWriterImpl.h.

Referenced by cleanup(), get_topic(), and init().

TopicImpl* OpenDDS::DCPS::DataWriterImpl::topic_servant_ [private]

The topic servant.

Definition at line 593 of file DataWriterImpl.h.

Referenced by cleanup(), enable(), filter_out(), inconsistent_topic(), and init().

CORBA::String_var OpenDDS::DCPS::DataWriterImpl::type_name_ [protected]

The type name of associated topic.

Definition at line 498 of file DataWriterImpl.h.

Referenced by get_type_name(), and init().

OfferedDeadlineWatchdog* OpenDDS::DCPS::DataWriterImpl::watchdog_ [private]

Watchdog responsible for reporting missed offered deadlines.

Definition at line 671 of file DataWriterImpl.h.

Referenced by OpenDDS::DCPS::WriteDataContainer::dispose(), enable(), OpenDDS::DCPS::WriteDataContainer::enqueue(), OpenDDS::DCPS::WriteDataContainer::register_instance(), set_qos(), and OpenDDS::DCPS::WriteDataContainer::unregister().


The documentation for this class was generated from the following files:
Generated on Fri Feb 12 20:06:14 2016 for OpenDDS by  doxygen 1.4.7