OpenDDS::DCPS::DataWriterImpl Class Reference

Implements the OpenDDS::DCPS::DataWriterRemote interfaces and DDS::DataWriter interfaces. More...

#include <DataWriterImpl.h>

Inheritance diagram for OpenDDS::DCPS::DataWriterImpl:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::DataWriterImpl:
Collaboration graph
[legend]

List of all members.

Classes

struct  AckCustomization
struct  AckToken
struct  ReaderInfo

Public Member Functions

typedef OPENDDS_MAP_CMP (RepoId, SequenceNumber, GUID_tKeyLessThan) RepoIdToSequenceMap
 DataWriterImpl ()
virtual ~DataWriterImpl ()
virtual DDS::InstanceHandle_t get_instance_handle ()
virtual DDS::ReturnCode_t set_qos (const DDS::DataWriterQos &qos)
virtual DDS::ReturnCode_t get_qos (DDS::DataWriterQos &qos)
virtual DDS::ReturnCode_t set_listener (DDS::DataWriterListener_ptr a_listener, DDS::StatusMask mask)
virtual DDS::DataWriterListener_ptr get_listener ()
virtual DDS::Topic_ptr get_topic ()
virtual DDS::ReturnCode_t wait_for_acknowledgments (const DDS::Duration_t &max_wait)
virtual DDS::Publisher_ptr get_publisher ()
virtual DDS::ReturnCode_t get_liveliness_lost_status (DDS::LivelinessLostStatus &status)
virtual DDS::ReturnCode_t get_offered_deadline_missed_status (DDS::OfferedDeadlineMissedStatus &status)
virtual DDS::ReturnCode_t get_offered_incompatible_qos_status (DDS::OfferedIncompatibleQosStatus &status)
virtual DDS::ReturnCode_t get_publication_matched_status (DDS::PublicationMatchedStatus &status)
ACE_Time_Value liveliness_check_interval (DDS::LivelinessQosPolicyKind kind)
bool participant_liveliness_activity_after (const ACE_Time_Value &tv)
virtual DDS::ReturnCode_t assert_liveliness ()
virtual DDS::ReturnCode_t assert_liveliness_by_participant ()
typedef OPENDDS_VECTOR (DDS::InstanceHandle_t) InstanceHandleVec
void get_instance_handles (InstanceHandleVec &instance_handles)
void get_readers (RepoIdSet &readers)
virtual DDS::ReturnCode_t get_matched_subscriptions (DDS::InstanceHandleSeq &subscription_handles)
virtual DDS::ReturnCode_t get_matched_subscription_data (DDS::SubscriptionBuiltinTopicData &subscription_data, DDS::InstanceHandle_t subscription_handle)
virtual DDS::ReturnCode_t enable ()
virtual void add_association (const RepoId &yourId, const ReaderAssociation &reader, bool active)
virtual void transport_assoc_done (int flags, const RepoId &remote_id)
virtual void association_complete (const RepoId &remote_id)
virtual void remove_associations (const ReaderIdSeq &readers, bool callback)
virtual void update_incompatible_qos (const IncompatibleQosStatus &status)
virtual void update_subscription_params (const RepoId &readerId, const DDS::StringSeq &params)
virtual void inconsistent_topic ()
void cleanup ()
void init (TopicImpl *topic_servant, const DDS::DataWriterQos &qos, DDS::DataWriterListener_ptr a_listener, const DDS::StatusMask &mask, WeakRcHandle< OpenDDS::DCPS::DomainParticipantImpl > participant_servant, OpenDDS::DCPS::PublisherImpl *publisher_servant)
void send_all_to_flush_control (ACE_Guard< ACE_Recursive_Thread_Mutex > &guard)
DDS::ReturnCode_t register_instance_i (DDS::InstanceHandle_t &handle, Message_Block_Ptr data, const DDS::Time_t &source_timestamp)
DDS::ReturnCode_t register_instance_from_durable_data (DDS::InstanceHandle_t &handle, Message_Block_Ptr data, const DDS::Time_t &source_timestamp)
DDS::ReturnCode_t unregister_instance_i (DDS::InstanceHandle_t handle, const DDS::Time_t &source_timestamp)
void unregister_instances (const DDS::Time_t &source_timestamp)
DDS::ReturnCode_t write (Message_Block_Ptr sample, DDS::InstanceHandle_t handle, const DDS::Time_t &source_timestamp, GUIDSeq *filter_out)
DDS::ReturnCode_t dispose (DDS::InstanceHandle_t handle, const DDS::Time_t &source_timestamp)
DDS::ReturnCode_t num_samples (DDS::InstanceHandle_t handle, size_t &size)
ACE_UINT64 get_unsent_data (SendStateDataSampleList &list)
SendStateDataSampleList get_resend_data ()
RepoId get_publication_id ()
RepoId get_dp_id ()
void unregister_all ()
void data_delivered (const DataSampleElement *sample)
void control_delivered (const Message_Block_Ptr &sample)
bool should_ack () const
 Does this writer have samples to be acknowledged?
AckToken create_ack_token (DDS::Duration_t max_wait) const
 Create an AckToken for ack operations.
virtual void retrieve_inline_qos_data (TransportSendListener::InlineQosData &qos_data) const
virtual bool check_transport_qos (const TransportInst &inst)
bool coherent_changes_pending ()
 Are coherent changes pending?
void begin_coherent_changes ()
 Starts a coherent change set; should only be called once.
void end_coherent_changes (const GroupCoherentSamples &group_samples)
 Ends a coherent change set; should only be called once.
char const * get_type_name () const
void data_dropped (const DataSampleElement *element, bool dropped_by_transport)
void control_dropped (const Message_Block_Ptr &sample, bool dropped_by_transport)
ACE_INLINE
ACE_Recursive_Thread_Mutex
get_lock ()
DDS::DataWriterListener_ptr listener_for (DDS::StatusKind kind)
virtual int handle_timeout (const ACE_Time_Value &tv, const void *arg)
 Handle the assert liveliness timeout.
void send_suspended_data ()
void remove_all_associations ()
virtual void register_for_reader (const RepoId &participant, const RepoId &writerid, const RepoId &readerid, const TransportLocatorSeq &locators, DiscoveryListener *listener)
virtual void unregister_for_reader (const RepoId &participant, const RepoId &writerid, const RepoId &readerid)
void notify_publication_disconnected (const ReaderIdSeq &subids)
void notify_publication_reconnected (const ReaderIdSeq &subids)
void notify_publication_lost (const ReaderIdSeq &subids)
DDS::ReturnCode_t create_sample_data_message (Message_Block_Ptr data, DDS::InstanceHandle_t instance_handle, DataSampleHeader &header_data, Message_Block_Ptr &message, const DDS::Time_t &source_timestamp, bool content_filter)
bool persist_data ()
void reschedule_deadline ()
void wait_pending ()
 Wait for pending samples to drain.
DDS::InstanceHandle_t get_next_handle ()
virtual RcHandle< EntityImplparent () const
bool filter_out (const DataSampleElement &elt, const OPENDDS_STRING &filterClassName, const FilterEvaluator &evaluator, const DDS::StringSeq &expression_params) const
void wait_control_pending ()
DataBlockLockPool::DataBlockLockget_db_lock ()
PublicationInstance_rch get_handle_instance (DDS::InstanceHandle_t handle)

Public Attributes

int data_dropped_count_
 Statistics counter.
int data_delivered_count_
MessageTracker controlTracker

Protected Member Functions

DDS::ReturnCode_t wait_for_specific_ack (const AckToken &token)
void prepare_to_delete ()
virtual DDS::ReturnCode_t enable_specific ()=0
typedef OPENDDS_MAP_CMP (RepoId, ReaderInfo, GUID_tKeyLessThan) RepoIdToReaderInfoMap
virtual SendControlStatus send_control (const DataSampleHeader &header, Message_Block_Ptr msg)

Protected Attributes

size_t n_chunks_
 The number of chunks for the cached allocator.
size_t association_chunk_multiplier_
 The multiplier for allocators affected by associations.
CORBA::String_var type_name_
 The type name of associated topic.
DDS::DataWriterQos qos_
 The qos policy list of this datawriter.
WeakRcHandle
< DomainParticipantImpl
participant_servant_
ACE_Thread_Mutex reader_info_lock_
RepoIdToReaderInfoMap reader_info_

Private Member Functions

void track_sequence_number (GUIDSeq *filter_out)
void notify_publication_lost (const DDS::InstanceHandleSeq &handles)
DDS::ReturnCode_t dispose_and_unregister (DDS::InstanceHandle_t handle, const DDS::Time_t &timestamp)
ACE_Message_Blockcreate_control_message (MessageId message_id, DataSampleHeader &header, Message_Block_Ptr data, const DDS::Time_t &source_timestamp)
bool send_liveliness (const ACE_Time_Value &now)
 Send the liveliness message.
void lookup_instance_handles (const ReaderIdSeq &ids, DDS::InstanceHandleSeq &hdls)
 Lookup the instance handles by the subscription repo ids.
const RepoIdget_repo_id () const
DDS::DomainId_t domain_id () const
CORBA::Long get_priority_value (const AssociationData &) const
void association_complete_i (const RepoId &remote_id)
typedef OPENDDS_MAP_CMP (RepoId, DDS::InstanceHandle_t, GUID_tKeyLessThan) RepoIdToHandleMap
bool need_sequence_repair ()
bool need_sequence_repair_i () const
DDS::ReturnCode_t send_end_historic_samples (const RepoId &readerId)
DDS::ReturnCode_t send_request_ack ()

Private Attributes

unique_ptr< DataBlockLockPooldb_lock_pool_
CORBA::String_var topic_name_
 The name of associated topic.
RepoId topic_id_
 The associated topic repository id.
TopicDescriptionPtr< TopicImpltopic_servant_
 The topic servant.
DDS::StatusMask listener_mask_
DDS::DataWriterListener_var listener_
 Used to notify the entity for relevant events.
DDS::DomainId_t domain_id_
 The domain id.
RepoId dp_id_
WeakRcHandle< PublisherImplpublisher_servant_
 The publisher servant which creates this datawriter.
PublicationId publication_id_
 The repository id of this datawriter/publication.
SequenceNumber sequence_number_
 The sequence number unique in DataWriter scope.
bool coherent_
ACE_UINT32 coherent_samples_
unique_ptr< WriteDataContainerdata_container_
 The sample data container.
ACE_Recursive_Thread_Mutex lock_
RepoIdToHandleMap id_to_handle_map_
RepoIdSet readers_
DDS::LivelinessLostStatus liveliness_lost_status_
 Status conditions.
DDS::OfferedDeadlineMissedStatus offered_deadline_missed_status_
DDS::OfferedIncompatibleQosStatus offered_incompatible_qos_status_
DDS::PublicationMatchedStatus publication_match_status_
bool liveliness_lost_
unique_ptr< MessageBlockAllocatormb_allocator_
 The message block allocator.
unique_ptr< DataBlockAllocatordb_allocator_
 The data block allocator.
unique_ptr
< DataSampleHeaderAllocator
header_allocator_
 The header data allocator.
ACE_Reactor_Timer_Interfacereactor_
ACE_Time_Value liveliness_check_interval_
 The time interval for sending liveliness message.
ACE_Time_Value last_liveliness_activity_time_
 Timestamp of last write/dispose/assert_liveliness.
CORBA::Long last_deadline_missed_total_count_
RcHandle< OfferedDeadlineWatchdogwatchdog_
bool is_bit_
RepoIdSet pending_readers_
RepoIdSet assoc_complete_readers_
ACE_UINT64 min_suspended_transaction_id_
 The cached available data while suspending and associated transaction ids.
ACE_UINT64 max_suspended_transaction_id_
SendStateDataSampleList available_data_list_
Monitormonitor_
 Monitor object for this entity.
Monitorperiodic_monitor_
 Periodic Monitor object for this entity.
bool liveliness_asserted_
ACE_Thread_Mutex sync_unreg_rem_assocs_lock_
RcHandle< LivenessTimerliveness_timer_

Friends

class WriteDataContainer
class PublisherImpl
class ::DDS_TEST

Detailed Description

Implements the OpenDDS::DCPS::DataWriterRemote interfaces and DDS::DataWriter interfaces.

See the DDS specification, OMG formal/04-12-02, for a description of the interface this class is implementing.

This class must be inherited by the type-specific datawriter which is specific to the data-type associated with the topic.

Note:
: This class is responsible for allocating memory for the header message block (MessageBlock + DataBlock + DataSampleHeader) and the DataSampleElement. The data-type datawriter is responsible for allocating memory for the sample data message block. (e.g. MessageBlock + DataBlock + Foo data). But it gives up ownership to this WriteDataContainer.

Definition at line 83 of file DataWriterImpl.h.


Constructor & Destructor Documentation

OpenDDS::DCPS::DataWriterImpl::DataWriterImpl (  ) 

Definition at line 58 of file DataWriterImpl.cpp.

References DDS::PublicationMatchedStatus::current_count, DDS::PublicationMatchedStatus::current_count_change, DDS::HANDLE_NIL, DDS::OfferedDeadlineMissedStatus::last_instance_handle, DDS::OfferedIncompatibleQosStatus::last_policy_id, DDS::PublicationMatchedStatus::last_subscription_handle, liveliness_lost_status_, monitor_, offered_deadline_missed_status_, offered_incompatible_qos_status_, periodic_monitor_, DDS::OfferedIncompatibleQosStatus::policies, publication_match_status_, TheServiceParticipant, DDS::PublicationMatchedStatus::total_count, DDS::OfferedIncompatibleQosStatus::total_count, DDS::OfferedDeadlineMissedStatus::total_count, DDS::LivelinessLostStatus::total_count, DDS::PublicationMatchedStatus::total_count_change, DDS::OfferedIncompatibleQosStatus::total_count_change, DDS::OfferedDeadlineMissedStatus::total_count_change, and DDS::LivelinessLostStatus::total_count_change.

00059   : data_dropped_count_(0),
00060     data_delivered_count_(0),
00061     controlTracker("DataWriterImpl"),
00062     n_chunks_(TheServiceParticipant->n_chunks()),
00063     association_chunk_multiplier_(TheServiceParticipant->association_chunk_multiplier()),
00064     qos_(TheServiceParticipant->initial_DataWriterQos()),
00065     db_lock_pool_(new DataBlockLockPool((unsigned long)TheServiceParticipant->n_chunks())),
00066     topic_id_(GUID_UNKNOWN),
00067     topic_servant_(0),
00068     listener_mask_(DEFAULT_STATUS_MASK),
00069     domain_id_(0),
00070     publication_id_(GUID_UNKNOWN),
00071     sequence_number_(SequenceNumber::SEQUENCENUMBER_UNKNOWN()),
00072     coherent_(false),
00073     coherent_samples_(0),
00074     liveliness_lost_(false),
00075     reactor_(0),
00076     liveliness_check_interval_(ACE_Time_Value::max_time),
00077     last_liveliness_activity_time_(ACE_Time_Value::zero),
00078     last_deadline_missed_total_count_(0),
00079     watchdog_(),
00080     is_bit_(false),
00081     min_suspended_transaction_id_(0),
00082     max_suspended_transaction_id_(0),
00083     monitor_(0),
00084     periodic_monitor_(0),
00085     liveliness_asserted_(false),
00086     liveness_timer_(make_rch<LivenessTimer>(ref(*this)))
00087 {
00088   liveliness_lost_status_.total_count = 0;
00089   liveliness_lost_status_.total_count_change = 0;
00090 
00091   offered_deadline_missed_status_.total_count = 0;
00092   offered_deadline_missed_status_.total_count_change = 0;
00093   offered_deadline_missed_status_.last_instance_handle = DDS::HANDLE_NIL;
00094 
00095   offered_incompatible_qos_status_.total_count = 0;
00096   offered_incompatible_qos_status_.total_count_change = 0;
00097   offered_incompatible_qos_status_.last_policy_id = 0;
00098   offered_incompatible_qos_status_.policies.length(0);
00099 
00100   publication_match_status_.total_count = 0;
00101   publication_match_status_.total_count_change = 0;
00102   publication_match_status_.current_count = 0;
00103   publication_match_status_.current_count_change = 0;
00104   publication_match_status_.last_subscription_handle = DDS::HANDLE_NIL;
00105 
00106   monitor_ =
00107     TheServiceParticipant->monitor_factory_->create_data_writer_monitor(this);
00108   periodic_monitor_ =
00109     TheServiceParticipant->monitor_factory_->create_data_writer_periodic_monitor(this);
00110 }

OpenDDS::DCPS::DataWriterImpl::~DataWriterImpl (  )  [virtual]

Definition at line 114 of file DataWriterImpl.cpp.

References DBG_ENTRY_LVL.

00115 {
00116   DBG_ENTRY_LVL("DataWriterImpl","~DataWriterImpl",6);
00117 }


Member Function Documentation

void OpenDDS::DCPS::DataWriterImpl::add_association ( const RepoId yourId,
const ReaderAssociation reader,
bool  active 
) [virtual]

Implements OpenDDS::DCPS::DataWriterCallbacks.

Definition at line 190 of file DataWriterImpl.cpp.

References ACE_TEXT(), OpenDDS::DCPS::TransportClient::associate(), DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, DDS::DataReaderQos::durability, OpenDDS::DCPS::EntityImpl::entity_deleted_, OpenDDS::DCPS::ReaderAssociation::exprParams, OpenDDS::DCPS::ReaderAssociation::filterClassName, OpenDDS::DCPS::ReaderAssociation::filterExpression, get_publication_id(), OpenDDS::DCPS::GUID_UNKNOWN, is_bit_, LM_DEBUG, LM_ERROR, OPENDDS_STRING, participant_servant_, publication_id_, qos_, reader_info_, reader_info_lock_, OpenDDS::DCPS::ReaderAssociation::readerId, OpenDDS::DCPS::ReaderAssociation::readerQos, OpenDDS::DCPS::ReaderAssociation::readerTransInfo, DDS::DataReaderQos::reliability, DDS::RELIABLE_RELIABILITY_QOS, OpenDDS::DCPS::AssociationData::remote_data_, OpenDDS::DCPS::AssociationData::remote_durable_, OpenDDS::DCPS::AssociationData::remote_id_, OpenDDS::DCPS::AssociationData::remote_reliable_, TheServiceParticipant, DDS::DataWriterQos::transport_priority, ACE_Atomic_Op< ACE_LOCK, TYPE >::value(), and DDS::VOLATILE_DURABILITY_QOS.

00193 {
00194   DBG_ENTRY_LVL("DataWriterImpl", "add_association", 6);
00195 
00196   if (DCPS_debug_level) {
00197     GuidConverter writer_converter(yourId);
00198     GuidConverter reader_converter(reader.readerId);
00199     ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DataWriterImpl::add_association - ")
00200                ACE_TEXT("bit %d local %C remote %C\n"), is_bit_,
00201                OPENDDS_STRING(writer_converter).c_str(),
00202                OPENDDS_STRING(reader_converter).c_str()));
00203   }
00204 
00205   if (entity_deleted_.value()) {
00206     if (DCPS_debug_level)
00207       ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DataWriterImpl::add_association")
00208                  ACE_TEXT(" This is a deleted datawriter, ignoring add.\n")));
00209 
00210     return;
00211   }
00212 
00213   if (GUID_UNKNOWN == publication_id_) {
00214     publication_id_ = yourId;
00215   }
00216 
00217   {
00218     ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
00219     reader_info_.insert(std::make_pair(reader.readerId,
00220                                        ReaderInfo(reader.filterClassName,
00221                                                   TheServiceParticipant->publisher_content_filter() ? reader.filterExpression : "",
00222                                                   reader.exprParams, participant_servant_,
00223                                                   reader.readerQos.durability.kind > DDS::VOLATILE_DURABILITY_QOS)));
00224   }
00225 
00226   if (DCPS_debug_level > 4) {
00227     GuidConverter converter(get_publication_id());
00228     ACE_DEBUG((LM_DEBUG,
00229                ACE_TEXT("(%P|%t) DataWriterImpl::add_association(): ")
00230                ACE_TEXT("adding subscription to publication %C with priority %d.\n"),
00231                OPENDDS_STRING(converter).c_str(),
00232                qos_.transport_priority.value));
00233   }
00234 
00235   AssociationData data;
00236   data.remote_id_ = reader.readerId;
00237   data.remote_data_ = reader.readerTransInfo;
00238   data.remote_reliable_ =
00239     (reader.readerQos.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS);
00240   data.remote_durable_ =
00241     (reader.readerQos.durability.kind > DDS::VOLATILE_DURABILITY_QOS);
00242 
00243   if (!associate(data, active)) {
00244     //FUTURE: inform inforepo and try again as passive peer
00245     if (DCPS_debug_level) {
00246       ACE_ERROR((LM_ERROR,
00247                  ACE_TEXT("(%P|%t) DataWriterImpl::add_association: ")
00248                  ACE_TEXT("ERROR: transport layer failed to associate.\n")));
00249     }
00250   }
00251 }

Here is the call graph for this function:

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::assert_liveliness (  )  [virtual]

Implements DDS::DataWriter.

Definition at line 1136 of file DataWriterImpl.cpp.

References DDS::AUTOMATIC_LIVELINESS_QOS, ACE_OS::gettimeofday(), DDS::DataWriterQos::liveliness, OpenDDS::DCPS::WeakRcHandle< T >::lock(), DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS, DDS::MANUAL_BY_TOPIC_LIVELINESS_QOS, participant_servant_, qos_, DDS::RETCODE_ERROR, DDS::RETCODE_OK, and send_liveliness().

01137 {
01138   switch (this->qos_.liveliness.kind) {
01139   case DDS::AUTOMATIC_LIVELINESS_QOS:
01140     // Do nothing.
01141     break;
01142   case DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS:
01143     {
01144       RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock();
01145       if (participant)
01146         return participant->assert_liveliness();
01147       return DDS::RETCODE_OK;
01148     }
01149   case DDS::MANUAL_BY_TOPIC_LIVELINESS_QOS:
01150     if (this->send_liveliness(ACE_OS::gettimeofday()) == false) {
01151       return DDS::RETCODE_ERROR;
01152     }
01153     break;
01154   }
01155 
01156   return DDS::RETCODE_OK;
01157 }

Here is the call graph for this function:

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::assert_liveliness_by_participant (  )  [virtual]

Definition at line 1160 of file DataWriterImpl.cpp.

References DDS::DataWriterQos::liveliness, liveliness_asserted_, DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS, qos_, and DDS::RETCODE_OK.

01161 {
01162   // This operation is called by participant.
01163 
01164   if (this->qos_.liveliness.kind == DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS) {
01165     // Set a flag indicating that we should send a liveliness message on the timer if necessary.
01166     liveliness_asserted_ = true;
01167   }
01168 
01169   return DDS::RETCODE_OK;
01170 }

void OpenDDS::DCPS::DataWriterImpl::association_complete ( const RepoId remote_id  )  [virtual]

Implements OpenDDS::DCPS::DataWriterCallbacks.

Definition at line 374 of file DataWriterImpl.cpp.

References ACE_TEXT(), assoc_complete_readers_, association_complete_i(), DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, is_bit_, LM_DEBUG, lock_, OPENDDS_STRING, pending_readers_, publication_id_, and OpenDDS::DCPS::remove().

00375 {
00376   DBG_ENTRY_LVL("DataWriterImpl", "association_complete", 6);
00377 
00378   if (DCPS_debug_level >= 1) {
00379     GuidConverter writer_converter(this->publication_id_);
00380     GuidConverter reader_converter(remote_id);
00381     ACE_DEBUG((LM_DEBUG,
00382                ACE_TEXT("(%P|%t) DataWriterImpl::association_complete - ")
00383                ACE_TEXT("bit %d local %C remote %C\n"),
00384                is_bit_,
00385                OPENDDS_STRING(writer_converter).c_str(),
00386                OPENDDS_STRING(reader_converter).c_str()));
00387   }
00388 
00389   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00390 
00391   if (OpenDDS::DCPS::remove(pending_readers_, remote_id) == -1) {
00392     if (DCPS_debug_level) {
00393       GuidConverter writer_converter(this->publication_id_);
00394       GuidConverter reader_converter(remote_id);
00395       ACE_DEBUG((LM_DEBUG,
00396                  ACE_TEXT("(%P|%t) DataWriterImpl::association_complete - ")
00397                  ACE_TEXT("bit %d local %C did not find pending reader: %C")
00398                  ACE_TEXT("defer association_complete_i until add_association resumes\n"),
00399                  is_bit_,
00400                  OPENDDS_STRING(writer_converter).c_str(),
00401                  OPENDDS_STRING(reader_converter).c_str()));
00402     }
00403     // Not found in pending_readers_, defer calling association_complete_i()
00404     // until add_association() resumes and sees this ID in assoc_complete_readers_.
00405     assoc_complete_readers_.insert(remote_id);
00406 
00407   } else {
00408     association_complete_i(remote_id);
00409   }
00410 }

Here is the call graph for this function:

void OpenDDS::DCPS::DataWriterImpl::association_complete_i ( const RepoId remote_id  )  [private]

Definition at line 413 of file DataWriterImpl.cpp.

References ACE_TEXT(), available_data_list_, OpenDDS::DCPS::SendStateDataSampleList::begin(), OpenDDS::DCPS::bind(), controlTracker, create_control_message(), DDS::PublicationMatchedStatus::current_count, DDS::PublicationMatchedStatus::current_count_change, data_container_, DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::SendStateDataSampleList::end(), OpenDDS::DCPS::END_HISTORIC_SAMPLES, OpenDDS::DCPS::SendStateDataSampleList::enqueue_tail(), OpenDDS::DCPS::gen_find_size(), OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), get_db_lock(), get_lock(), get_resend_data(), ACE_OS::gettimeofday(), header, id_to_handle_map_, OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::insert(), is_bit_, CORBA::is_nil(), DDS::PublicationMatchedStatus::last_subscription_handle, DDS::DataWriterQos::lifespan, listener_for(), LM_DEBUG, LM_ERROR, LM_INFO, LM_WARNING, OpenDDS::DCPS::WeakRcHandle< T >::lock(), lock_, ACE_Message_Block::MB_DATA, OpenDDS::DCPS::MessageTracker::message_dropped(), OpenDDS::DCPS::MessageTracker::message_sent(), monitor_, OpenDDS::DCPS::move(), OpenDDS::DCPS::EntityImpl::notify_status_condition(), OPENDDS_STRING, participant_servant_, publication_id_, publication_match_status_, DDS::PUBLICATION_MATCHED_STATUS, publisher_servant_, qos_, reader_info_, reader_info_lock_, readers_, OpenDDS::DCPS::Monitor::report(), OpenDDS::DCPS::SEND_CONTROL_ERROR, OpenDDS::DCPS::TransportClient::send_w_control(), OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), size, OpenDDS::DCPS::time_value_to_time(), timestamp(), DDS::PublicationMatchedStatus::total_count, and DDS::PublicationMatchedStatus::total_count_change.

Referenced by association_complete(), and transport_assoc_done().

00414 {
00415   DBG_ENTRY_LVL("DataWriterImpl", "association_complete_i", 6);
00416 
00417   if (DCPS_debug_level >= 1) {
00418     GuidConverter writer_converter(this->publication_id_);
00419     GuidConverter reader_converter(remote_id);
00420     ACE_DEBUG((LM_DEBUG,
00421                ACE_TEXT("(%P|%t) DataWriterImpl::association_complete_i - ")
00422                ACE_TEXT("bit %d local %C remote %C\n"),
00423                is_bit_,
00424                OPENDDS_STRING(writer_converter).c_str(),
00425                OPENDDS_STRING(reader_converter).c_str()));
00426   }
00427 
00428   bool reader_durable = false;
00429 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00430   OPENDDS_STRING filterClassName;
00431   RcHandle<FilterEvaluator> eval;
00432   DDS::StringSeq expression_params;
00433 #endif
00434   {
00435     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00436 
00437     if (OpenDDS::DCPS::insert(readers_, remote_id) == -1) {
00438       GuidConverter converter(remote_id);
00439       ACE_ERROR((LM_ERROR,
00440                  ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::association_complete_i: ")
00441                  ACE_TEXT("insert %C from pending failed.\n"),
00442                  OPENDDS_STRING(converter).c_str()));
00443     }
00444   }
00445   {
00446     ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
00447     RepoIdToReaderInfoMap::const_iterator it = reader_info_.find(remote_id);
00448 
00449     if (it != reader_info_.end()) {
00450       reader_durable = it->second.durable_;
00451 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00452       filterClassName = it->second.filter_class_name_;
00453       eval = it->second.eval_;
00454       expression_params = it->second.expression_params_;
00455 #endif
00456     }
00457   }
00458 
00459   if (this->monitor_) {
00460     this->monitor_->report();
00461   }
00462 
00463   if (!is_bit_) {
00464 
00465     RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock();
00466 
00467     if (!participant)
00468       return;
00469 
00470     DDS::InstanceHandle_t handle =
00471       participant->id_to_handle(remote_id);
00472 
00473     {
00474       // protect publication_match_status_ and status changed flags.
00475       ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00476 
00477       if (OpenDDS::DCPS::bind(id_to_handle_map_, remote_id, handle) != 0) {
00478         GuidConverter converter(remote_id);
00479         ACE_DEBUG((LM_WARNING,
00480                    ACE_TEXT("(%P|%t) WARNING: DataWriterImpl::association_complete_i: ")
00481                    ACE_TEXT("id_to_handle_map_%C = 0x%x failed.\n"),
00482                    OPENDDS_STRING(converter).c_str(),
00483                    handle));
00484         return;
00485 
00486       } else if (DCPS_debug_level > 4) {
00487         GuidConverter converter(remote_id);
00488         ACE_DEBUG((LM_DEBUG,
00489                    ACE_TEXT("(%P|%t) DataWriterImpl::association_complete_i: ")
00490                    ACE_TEXT("id_to_handle_map_%C = 0x%x.\n"),
00491                    OPENDDS_STRING(converter).c_str(),
00492                    handle));
00493       }
00494 
00495       ++publication_match_status_.total_count;
00496       ++publication_match_status_.total_count_change;
00497       ++publication_match_status_.current_count;
00498       ++publication_match_status_.current_count_change;
00499       publication_match_status_.last_subscription_handle = handle;
00500       set_status_changed_flag(DDS::PUBLICATION_MATCHED_STATUS, true);
00501     }
00502 
00503     DDS::DataWriterListener_var listener =
00504       listener_for(DDS::PUBLICATION_MATCHED_STATUS);
00505 
00506     if (!CORBA::is_nil(listener.in())) {
00507 
00508       listener->on_publication_matched(this, publication_match_status_);
00509 
00510       // TBD - why does the spec say to change this but not
00511       // change the ChangeFlagStatus after a listener call?
00512       publication_match_status_.total_count_change = 0;
00513       publication_match_status_.current_count_change = 0;
00514     }
00515 
00516     notify_status_condition();
00517   }
00518 
00519   // Support DURABILITY QoS
00520   if (reader_durable) {
00521     // Tell the WriteDataContainer to resend all sending/sent
00522     // samples.
00523     this->data_container_->reenqueue_all(remote_id, this->qos_.lifespan
00524 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00525                                          , filterClassName, eval.in(), expression_params
00526 #endif
00527                                         );
00528 
00529     // Acquire the data writer container lock to avoid deadlock. The
00530     // thread calling association_complete() has to acquire lock in the
00531     // same order as the write()/register() operation.
00532 
00533     // Since the thread calling association_complete() is the ORB
00534     // thread, it may have some performance penalty. If the
00535     // performance is an issue, we may need a new thread to handle the
00536     // data_available() calls.
00537     ACE_GUARD(ACE_Recursive_Thread_Mutex,
00538               guard,
00539               this->get_lock());
00540 
00541     SendStateDataSampleList list = this->get_resend_data();
00542     {
00543       ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
00544       // Update the reader's expected sequence
00545       SequenceNumber& seq =
00546         reader_info_.find(remote_id)->second.expected_sequence_;
00547 
00548       for (SendStateDataSampleList::iterator list_el = list.begin();
00549            list_el != list.end(); ++list_el) {
00550         list_el->get_header().historic_sample_ = true;
00551 
00552         if (list_el->get_header().sequence_ > seq) {
00553           seq = list_el->get_header().sequence_;
00554         }
00555       }
00556     }
00557 
00558     RcHandle<PublisherImpl> publisher = this->publisher_servant_.lock();
00559     if (!publisher || publisher->is_suspended()) {
00560       this->available_data_list_.enqueue_tail(list);
00561 
00562     } else {
00563       if (DCPS_debug_level >= 4) {
00564         ACE_DEBUG((LM_INFO, "(%P|%t) Sending historic samples\n"));
00565       }
00566 
00567       size_t size = 0, padding = 0;
00568       gen_find_size(remote_id, size, padding);
00569       Message_Block_Ptr data(
00570         new ACE_Message_Block(size, ACE_Message_Block::MB_DATA, 0, 0, 0,
00571                               get_db_lock()));
00572       Serializer ser(data.get());
00573       ser << remote_id;
00574 
00575       const DDS::Time_t timestamp = time_value_to_time(ACE_OS::gettimeofday());
00576       DataSampleHeader header;
00577       Message_Block_Ptr end_historic_samples(
00578         create_control_message(END_HISTORIC_SAMPLES, header, move(data), timestamp));
00579 
00580       this->controlTracker.message_sent();
00581       guard.release();
00582       SendControlStatus ret = send_w_control(list, header, move(end_historic_samples), remote_id);
00583       if (ret == SEND_CONTROL_ERROR) {
00584         ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00585                              ACE_TEXT("DataWriterImpl::association_complete_i: ")
00586                              ACE_TEXT("send_w_control failed.\n")));
00587         this->controlTracker.message_dropped();
00588       }
00589     }
00590   }
00591 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::DataWriterImpl::begin_coherent_changes (  ) 

Starts a coherent change set; should only be called once.

Definition at line 2235 of file DataWriterImpl.cpp.

References coherent_, and get_lock().

02236 {
02237   ACE_GUARD(ACE_Recursive_Thread_Mutex,
02238             guard,
02239             get_lock());
02240 
02241   this->coherent_ = true;
02242 }

Here is the call graph for this function:

bool OpenDDS::DCPS::DataWriterImpl::check_transport_qos ( const TransportInst inst  )  [virtual]

Implements OpenDDS::DCPS::TransportClient.

Definition at line 2214 of file DataWriterImpl.cpp.

02215 {
02216   // DataWriter does not impose any constraints on which transports
02217   // may be used based on QoS.
02218   return true;
02219 }

void OpenDDS::DCPS::DataWriterImpl::cleanup ( void   ) 

cleanup the DataWriter.

Definition at line 121 of file DataWriterImpl.cpp.

References OpenDDS::DCPS::NO_STATUS_MASK, set_listener(), and topic_servant_.

Referenced by OpenDDS::DCPS::PublisherImpl::delete_datawriter().

00122 {
00123   // As first step set our listener to nill which will prevent us from calling
00124   // back onto the listener at the moment the related DDS entity has been
00125   // deleted
00126   set_listener(0, NO_STATUS_MASK);
00127   topic_servant_ = 0;
00128 }

Here is the call graph for this function:

Here is the caller graph for this function:

bool OpenDDS::DCPS::DataWriterImpl::coherent_changes_pending (  ) 

Are coherent changes pending?

Definition at line 2224 of file DataWriterImpl.cpp.

References coherent_, and get_lock().

02225 {
02226   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
02227                    guard,
02228                    get_lock(),
02229                    false);
02230 
02231   return this->coherent_;
02232 }

Here is the call graph for this function:

void OpenDDS::DCPS::DataWriterImpl::control_delivered ( const Message_Block_Ptr sample  )  [virtual]

This is called by transport to notify that the control message is delivered.

Reimplemented from OpenDDS::DCPS::TransportSendListener.

Definition at line 2173 of file DataWriterImpl.cpp.

References controlTracker, DBG_ENTRY_LVL, and OpenDDS::DCPS::MessageTracker::message_delivered().

02174 {
02175   DBG_ENTRY_LVL("DataWriterImpl","control_delivered",6);
02176   controlTracker.message_delivered();
02177 }

Here is the call graph for this function:

void OpenDDS::DCPS::DataWriterImpl::control_dropped ( const Message_Block_Ptr sample,
bool  dropped_by_transport 
) [virtual]

This is called by transport to notify that the control message is dropped.

Reimplemented from OpenDDS::DCPS::TransportSendListener.

Definition at line 2317 of file DataWriterImpl.cpp.

References controlTracker, DBG_ENTRY_LVL, and OpenDDS::DCPS::MessageTracker::message_dropped().

02319 {
02320   DBG_ENTRY_LVL("DataWriterImpl","control_dropped",6);
02321   controlTracker.message_dropped();
02322 }

Here is the call graph for this function:

DataWriterImpl::AckToken OpenDDS::DCPS::DataWriterImpl::create_ack_token ( DDS::Duration_t  max_wait  )  const

Create an AckToken for ack operations.

Definition at line 976 of file DataWriterImpl.cpp.

References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::SequenceNumber::getValue(), LM_DEBUG, and sequence_number_.

Referenced by wait_for_acknowledgments().

00977 {
00978   if (DCPS_debug_level > 0) {
00979     ACE_DEBUG((LM_DEBUG,
00980                ACE_TEXT("(%P|%t) DataWriterImpl::create_ack_token() - ")
00981                ACE_TEXT("for sequence %q \n"),
00982                this->sequence_number_.getValue()));
00983   }
00984   return AckToken(max_wait, this->sequence_number_);
00985 }

Here is the call graph for this function:

Here is the caller graph for this function:

ACE_Message_Block * OpenDDS::DCPS::DataWriterImpl::create_control_message ( MessageId  message_id,
DataSampleHeader header,
Message_Block_Ptr  data,
const DDS::Time_t source_timestamp 
) [private]

This method create a header message block and chain with the registered sample. The header contains the information needed. e.g. message id, length of whole message... The fast allocator is not used for the header.

Definition at line 1971 of file DataWriterImpl.cpp.

References ACE_TEXT(), OpenDDS::DCPS::DataSampleHeader::byte_order_, OpenDDS::DCPS::DataSampleHeader::coherent_change_, db_allocator_, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::DISPOSE_INSTANCE, OpenDDS::DCPS::DISPOSE_UNREGISTER_INSTANCE, OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), get_db_lock(), OpenDDS::DCPS::INSTANCE_REGISTRATION, OpenDDS::DCPS::DataSampleHeader::key_fields_only_, LM_DEBUG, OpenDDS::DCPS::WeakRcHandle< T >::lock(), OpenDDS::DCPS::DataSampleHeader::max_marshaled_size(), ACE_Time_Value::max_time, mb_allocator_, ACE_Message_Block::MB_DATA, OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::DataSampleHeader::message_length_, DDS::Time_t::nanosec, need_sequence_repair(), OPENDDS_STRING, publication_id_, OpenDDS::DCPS::DataSampleHeader::publication_id_, OpenDDS::DCPS::DataSampleHeader::publisher_id_, publisher_servant_, reader_info_, reader_info_lock_, OpenDDS::DCPS::unique_ptr< T, Deleter >::release(), OpenDDS::DCPS::REQUEST_ACK, DDS::Time_t::sec, OpenDDS::DCPS::DataSampleHeader::sequence_, sequence_number_, OpenDDS::DCPS::DataSampleHeader::sequence_repair_, OpenDDS::DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN(), OpenDDS::RTPS::SEQUENCENUMBER_UNKNOWN, OpenDDS::DCPS::DataSampleHeader::source_timestamp_nanosec_, OpenDDS::DCPS::DataSampleHeader::source_timestamp_sec_, OpenDDS::DCPS::TransportClient::swap_bytes(), OpenDDS::DCPS::to_string(), OpenDDS::DCPS::UNREGISTER_INSTANCE, and ACE_Time_Value::zero.

Referenced by association_complete_i(), dispose(), dispose_and_unregister(), end_coherent_changes(), register_instance_i(), send_liveliness(), send_request_ack(), and unregister_instance_i().

01975 {
01976   header_data.message_id_ = message_id;
01977   header_data.byte_order_ =
01978     this->swap_bytes() ? !ACE_CDR_BYTE_ORDER : ACE_CDR_BYTE_ORDER;
01979   header_data.coherent_change_ = 0;
01980 
01981   if (data) {
01982     header_data.message_length_ = static_cast<ACE_UINT32>(data->total_length());
01983   }
01984 
01985   header_data.sequence_ = SequenceNumber::SEQUENCENUMBER_UNKNOWN();
01986   header_data.sequence_repair_ = false; // set below
01987   header_data.source_timestamp_sec_ = source_timestamp.sec;
01988   header_data.source_timestamp_nanosec_ = source_timestamp.nanosec;
01989   header_data.publication_id_ = publication_id_;
01990 
01991   RcHandle<PublisherImpl> publisher = this->publisher_servant_.lock();
01992   if (!publisher) {
01993     return 0;
01994   }
01995 
01996   header_data.publisher_id_ = publisher->publisher_id_;
01997 
01998   if (message_id == INSTANCE_REGISTRATION
01999       || message_id == DISPOSE_INSTANCE
02000       || message_id == UNREGISTER_INSTANCE
02001       || message_id == DISPOSE_UNREGISTER_INSTANCE
02002       || message_id == REQUEST_ACK) {
02003 
02004     header_data.sequence_repair_ = need_sequence_repair();
02005 
02006     // Use the sequence number here for the sake of RTPS (where these
02007     // control messages map onto the Data Submessage).
02008     if (this->sequence_number_ == SequenceNumber::SEQUENCENUMBER_UNKNOWN()) {
02009       this->sequence_number_ = SequenceNumber();
02010 
02011     } else {
02012       ++this->sequence_number_;
02013     }
02014 
02015     header_data.sequence_ = this->sequence_number_;
02016     header_data.key_fields_only_ = true;
02017   }
02018 
02019   ACE_Message_Block* message = 0;
02020   ACE_NEW_MALLOC_RETURN(message,
02021                         static_cast<ACE_Message_Block*>(
02022                           mb_allocator_->malloc(sizeof(ACE_Message_Block))),
02023                         ACE_Message_Block(
02024                           DataSampleHeader::max_marshaled_size(),
02025                           ACE_Message_Block::MB_DATA,
02026                           header_data.message_length_ ? data.release() : 0, //cont
02027                           0, //data
02028                           0, //allocator_strategy
02029                           get_db_lock(), //locking_strategy
02030                           ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
02031                           ACE_Time_Value::zero,
02032                           ACE_Time_Value::max_time,
02033                           db_allocator_.get(),
02034                           mb_allocator_.get()),
02035                         0);
02036 
02037   *message << header_data;
02038 
02039   // If we incremented sequence number for this control message
02040   if (header_data.sequence_ != SequenceNumber::SEQUENCENUMBER_UNKNOWN()) {
02041     ACE_GUARD_RETURN(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_, 0);
02042     // Update the expected sequence number for all readers
02043     RepoIdToReaderInfoMap::iterator reader;
02044 
02045     for (reader = reader_info_.begin(); reader != reader_info_.end(); ++reader) {
02046       reader->second.expected_sequence_ = sequence_number_;
02047     }
02048   }
02049   if (DCPS_debug_level >= 4) {
02050     const GuidConverter converter(publication_id_);
02051     ACE_DEBUG((LM_DEBUG,
02052                ACE_TEXT("(%P|%t) DataWriterImpl::create_control_message: ")
02053                ACE_TEXT("from publication %C sending control sample: %C .\n"),
02054                OPENDDS_STRING(converter).c_str(),
02055                to_string(header_data).c_str()));
02056   }
02057   return message;
02058 }

Here is the call graph for this function:

Here is the caller graph for this function:

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::create_sample_data_message ( Message_Block_Ptr  data,
DDS::InstanceHandle_t  instance_handle,
DataSampleHeader header_data,
Message_Block_Ptr message,
const DDS::Time_t source_timestamp,
bool  content_filter 
)

This method create a header message block and chain with the sample data. The header contains the information needed. e.g. message id, length of whole message... The fast allocator is used to allocate the message block, data block and header.

Definition at line 2061 of file DataWriterImpl.cpp.

References ACE_TEXT(), OpenDDS::DCPS::DataSampleHeader::byte_order_, OpenDDS::DCPS::TransportClient::cdr_encapsulation(), OpenDDS::DCPS::DataSampleHeader::cdr_encapsulation_, coherent_, OpenDDS::DCPS::DataSampleHeader::coherent_change_, OpenDDS::DCPS::DataSampleHeader::content_filter_, data_container_, db_allocator_, OpenDDS::DCPS::DCPS_debug_level, DDS::DURATION_INFINITE_NSEC, DDS::DURATION_INFINITE_SEC, OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), get_db_lock(), OpenDDS::DCPS::DataSampleHeader::group_coherent_, DDS::GROUP_PRESENTATION_QOS, header_allocator_, DDS::DataWriterQos::lifespan, OpenDDS::DCPS::DataSampleHeader::lifespan_duration_, OpenDDS::DCPS::DataSampleHeader::lifespan_duration_nanosec_, OpenDDS::DCPS::DataSampleHeader::lifespan_duration_sec_, LM_DEBUG, LM_ERROR, OpenDDS::DCPS::WeakRcHandle< T >::lock(), OpenDDS::DCPS::DataSampleHeader::max_marshaled_size(), ACE_Time_Value::max_time, mb_allocator_, ACE_Message_Block::MB_DATA, OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::DataSampleHeader::message_length_, DDS::Time_t::nanosec, need_sequence_repair(), OPENDDS_STRING, publication_id_, OpenDDS::DCPS::DataSampleHeader::publication_id_, OpenDDS::DCPS::DataSampleHeader::publisher_id_, publisher_servant_, qos_, OpenDDS::DCPS::unique_ptr< T, Deleter >::release(), OpenDDS::DCPS::unique_ptr< T, Deleter >::reset(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::SAMPLE_DATA, DDS::Time_t::sec, OpenDDS::DCPS::DataSampleHeader::sequence_, sequence_number_, OpenDDS::DCPS::DataSampleHeader::sequence_repair_, OpenDDS::DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN(), OpenDDS::DCPS::DataSampleHeader::source_timestamp_nanosec_, OpenDDS::DCPS::DataSampleHeader::source_timestamp_sec_, OpenDDS::DCPS::TransportClient::swap_bytes(), OpenDDS::DCPS::to_string(), and ACE_Time_Value::zero.

Referenced by write().

02067 {
02068   PublicationInstance_rch instance =
02069     data_container_->get_handle_instance(instance_handle);
02070 
02071   if (0 == instance) {
02072     ACE_ERROR_RETURN((LM_ERROR,
02073                       ACE_TEXT("(%P|%t) DataWriterImpl::create_sample_data_message ")
02074                       ACE_TEXT("failed to find instance for handle %d\n"),
02075                       instance_handle),
02076                      DDS::RETCODE_ERROR);
02077   }
02078 
02079   header_data.message_id_ = SAMPLE_DATA;
02080   header_data.byte_order_ =
02081     this->swap_bytes() ? !ACE_CDR_BYTE_ORDER : ACE_CDR_BYTE_ORDER;
02082   header_data.coherent_change_ = this->coherent_;
02083 
02084   RcHandle<PublisherImpl> publisher = this->publisher_servant_.lock();
02085 
02086   if (!publisher)
02087     return DDS::RETCODE_ERROR;
02088 
02089 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
02090   header_data.group_coherent_ =
02091     publisher->qos_.presentation.access_scope
02092     == DDS::GROUP_PRESENTATION_QOS;
02093 #endif
02094   header_data.content_filter_ = content_filter;
02095   header_data.cdr_encapsulation_ = this->cdr_encapsulation();
02096   header_data.message_length_ = static_cast<ACE_UINT32>(data->total_length());
02097   header_data.sequence_repair_ = need_sequence_repair();
02098 
02099   if (this->sequence_number_ == SequenceNumber::SEQUENCENUMBER_UNKNOWN()) {
02100     this->sequence_number_ = SequenceNumber();
02101 
02102   } else {
02103     ++this->sequence_number_;
02104   }
02105 
02106   header_data.sequence_ = this->sequence_number_;
02107   header_data.source_timestamp_sec_ = source_timestamp.sec;
02108   header_data.source_timestamp_nanosec_ = source_timestamp.nanosec;
02109 
02110   if (qos_.lifespan.duration.sec != DDS::DURATION_INFINITE_SEC
02111       || qos_.lifespan.duration.nanosec != DDS::DURATION_INFINITE_NSEC) {
02112     header_data.lifespan_duration_ = true;
02113     header_data.lifespan_duration_sec_ = qos_.lifespan.duration.sec;
02114     header_data.lifespan_duration_nanosec_ = qos_.lifespan.duration.nanosec;
02115   }
02116 
02117   header_data.publication_id_ = publication_id_;
02118   header_data.publisher_id_ = publisher->publisher_id_;
02119   size_t max_marshaled_size = header_data.max_marshaled_size();
02120 
02121   ACE_Message_Block* tmp_message;
02122   ACE_NEW_MALLOC_RETURN(tmp_message,
02123                         static_cast<ACE_Message_Block*>(
02124                           mb_allocator_->malloc(sizeof(ACE_Message_Block))),
02125                         ACE_Message_Block(max_marshaled_size,
02126                                           ACE_Message_Block::MB_DATA,
02127                                           data.release(), //cont
02128                                           0, //data
02129                                           header_allocator_.get(), //alloc_strategy
02130                                           get_db_lock(), //locking_strategy
02131                                           ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
02132                                           ACE_Time_Value::zero,
02133                                           ACE_Time_Value::max_time,
02134                                           db_allocator_.get(),
02135                                           mb_allocator_.get()),
02136                         DDS::RETCODE_ERROR);
02137   message.reset(tmp_message);
02138   *message << header_data;
02139   if (DCPS_debug_level >= 4) {
02140     const GuidConverter converter(publication_id_);
02141     ACE_DEBUG((LM_DEBUG,
02142                ACE_TEXT("(%P|%t) DataWriterImpl::create_sample_data_message: ")
02143                ACE_TEXT("from publication %C sending data sample: %C .\n"),
02144                OPENDDS_STRING(converter).c_str(),
02145                to_string(header_data).c_str()));
02146   }
02147   return DDS::RETCODE_OK;
02148 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::DataWriterImpl::data_delivered ( const DataSampleElement sample  )  [virtual]

This is called by transport to notify that the sample is delivered and it is delegated to WriteDataContainer to adjust the internal data sample threads.

Reimplemented from OpenDDS::DCPS::TransportSendListener.

Definition at line 2151 of file DataWriterImpl.cpp.

References ACE_TEXT(), data_container_, data_delivered_count_, DBG_ENTRY_LVL, OpenDDS::DCPS::DataSampleElement::get_pub_id(), LM_ERROR, OPENDDS_STRING, and publication_id_.

02152 {
02153   DBG_ENTRY_LVL("DataWriterImpl","data_delivered",6);
02154 
02155   if (!(sample->get_pub_id() == this->publication_id_)) {
02156     GuidConverter sample_converter(sample->get_pub_id());
02157     GuidConverter writer_converter(publication_id_);
02158     ACE_ERROR((LM_ERROR,
02159                ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::data_delivered: ")
02160                ACE_TEXT(" The publication id %C from delivered element ")
02161                ACE_TEXT("does not match the datawriter's id %C\n"),
02162                OPENDDS_STRING(sample_converter).c_str(),
02163                OPENDDS_STRING(writer_converter).c_str()));
02164     return;
02165   }
02166   //provided for statistics tracking in tests
02167   ++data_delivered_count_;
02168 
02169   this->data_container_->data_delivered(sample);
02170 }

Here is the call graph for this function:

void OpenDDS::DCPS::DataWriterImpl::data_dropped ( const DataSampleElement element,
bool  dropped_by_transport 
) [virtual]

This mothod is called by transport to notify the instance sample is dropped and it delegates to WriteDataContainer to update the internal list.

Reimplemented from OpenDDS::DCPS::TransportSendListener.

Definition at line 2305 of file DataWriterImpl.cpp.

References data_container_, data_dropped_count_, and DBG_ENTRY_LVL.

02307 {
02308   DBG_ENTRY_LVL("DataWriterImpl","data_dropped",6);
02309 
02310   //provided for statistics tracking in tests
02311   ++data_dropped_count_;
02312 
02313   this->data_container_->data_dropped(element, dropped_by_transport);
02314 }

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::dispose ( DDS::InstanceHandle_t  handle,
const DDS::Time_t source_timestamp 
)

Delegate to the WriteDataContainer to dispose all data samples for a given instance and tell the transport to broadcast the disposed instance.

Definition at line 1880 of file DataWriterImpl.cpp.

References ACE_TEXT(), create_control_message(), data_container_, DBG_ENTRY_LVL, OpenDDS::DCPS::DISPOSE_INSTANCE, OpenDDS::DCPS::EntityImpl::enabled_, OpenDDS::DCPS::DataSampleElement::get_header(), get_lock(), LM_ERROR, OpenDDS::DCPS::move(), DDS::RETCODE_ERROR, DDS::RETCODE_NOT_ENABLED, DDS::RETCODE_OK, send_all_to_flush_control(), and OpenDDS::DCPS::DataSampleElement::set_sample().

01882 {
01883   DBG_ENTRY_LVL("DataWriterImpl","dispose",6);
01884 
01885   if (enabled_ == false) {
01886     ACE_ERROR_RETURN((LM_ERROR,
01887                       ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::dispose: ")
01888                       ACE_TEXT(" Entity is not enabled. \n")),
01889                      DDS::RETCODE_NOT_ENABLED);
01890   }
01891 
01892   DDS::ReturnCode_t ret = DDS::RETCODE_ERROR;
01893 
01894   ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, guard, get_lock(), ret);
01895 
01896   Message_Block_Ptr registered_sample_data;
01897   ret = this->data_container_->dispose(handle, registered_sample_data);
01898 
01899   if (ret != DDS::RETCODE_OK) {
01900     ACE_ERROR_RETURN((LM_ERROR,
01901                       ACE_TEXT("(%P|%t) ERROR: ")
01902                       ACE_TEXT("DataWriterImpl::dispose: ")
01903                       ACE_TEXT("dispose failed.\n")),
01904                      ret);
01905   }
01906 
01907   DataSampleElement* element = 0;
01908   ret = this->data_container_->obtain_buffer_for_control(element);
01909 
01910   if (ret != DDS::RETCODE_OK) {
01911     ACE_ERROR_RETURN((LM_ERROR,
01912                       ACE_TEXT("(%P|%t) ERROR: ")
01913                       ACE_TEXT("DataWriterImpl::dispose: ")
01914                       ACE_TEXT("obtain_buffer_for_control returned %d.\n"),
01915                       ret),
01916                      ret);
01917   }
01918 
01919   Message_Block_Ptr sample(create_control_message(DISPOSE_INSTANCE,
01920                                                   element->get_header(),
01921                                                   move(registered_sample_data),
01922                                                   source_timestamp));
01923   element->set_sample(move(sample));
01924   ret = this->data_container_->enqueue_control(element);
01925 
01926   if (ret != DDS::RETCODE_OK) {
01927     ACE_ERROR_RETURN((LM_ERROR,
01928                       ACE_TEXT("(%P|%t) ERROR: ")
01929                       ACE_TEXT("DataWriterImpl::dispose: ")
01930                       ACE_TEXT("enqueue_control failed.\n")),
01931                      ret);
01932   }
01933 
01934   send_all_to_flush_control(guard);
01935 
01936   return DDS::RETCODE_OK;
01937 }

Here is the call graph for this function:

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::dispose_and_unregister ( DDS::InstanceHandle_t  handle,
const DDS::Time_t timestamp 
) [private]

Definition at line 1651 of file DataWriterImpl.cpp.

References ACE_TEXT(), create_control_message(), data_container_, DBG_ENTRY_LVL, OpenDDS::DCPS::DISPOSE_UNREGISTER_INSTANCE, OpenDDS::DCPS::DataSampleElement::get_header(), get_lock(), LM_ERROR, OpenDDS::DCPS::move(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, send_all_to_flush_control(), and OpenDDS::DCPS::DataSampleElement::set_sample().

Referenced by unregister_instance_i().

01653 {
01654   DBG_ENTRY_LVL("DataWriterImpl", "dispose_and_unregister", 6);
01655 
01656   DDS::ReturnCode_t ret = DDS::RETCODE_ERROR;
01657   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, get_lock(), ret);
01658 
01659   Message_Block_Ptr data_sample;
01660   ret = this->data_container_->dispose(handle, data_sample);
01661 
01662   if (ret != DDS::RETCODE_OK) {
01663     ACE_ERROR_RETURN((LM_ERROR,
01664                       ACE_TEXT("(%P|%t) ERROR: ")
01665                       ACE_TEXT("DataWriterImpl::dispose_and_unregister: ")
01666                       ACE_TEXT("dispose on container failed. \n")),
01667                      ret);
01668   }
01669 
01670   ret = this->data_container_->unregister(handle, data_sample, false);
01671 
01672   if (ret != DDS::RETCODE_OK) {
01673     ACE_ERROR_RETURN((LM_ERROR,
01674                       ACE_TEXT("(%P|%t) ERROR: ")
01675                       ACE_TEXT("DataWriterImpl::dispose_and_unregister: ")
01676                       ACE_TEXT("unregister with container failed. \n")),
01677                      ret);
01678   }
01679 
01680   DataSampleElement* element = 0;
01681   ret = this->data_container_->obtain_buffer_for_control(element);
01682 
01683   if (ret != DDS::RETCODE_OK) {
01684     ACE_ERROR_RETURN((LM_ERROR,
01685                       ACE_TEXT("(%P|%t) ERROR: ")
01686                       ACE_TEXT("DataWriterImpl::dispose_and_unregister: ")
01687                       ACE_TEXT("obtain_buffer_for_control returned %d.\n"),
01688                       ret),
01689                      ret);
01690   }
01691 
01692   Message_Block_Ptr sample(create_control_message(DISPOSE_UNREGISTER_INSTANCE,
01693                                                   element->get_header(),
01694                                                   move(data_sample),
01695                                                   source_timestamp));
01696   element->set_sample(move(sample));
01697   ret = this->data_container_->enqueue_control(element);
01698 
01699   if (ret != DDS::RETCODE_OK) {
01700     ACE_ERROR_RETURN((LM_ERROR,
01701                       ACE_TEXT("(%P|%t) ERROR: ")
01702                       ACE_TEXT("DataWriterImpl::dispose_and_unregister: ")
01703                       ACE_TEXT("enqueue_control failed.\n")),
01704                      ret);
01705   }
01706 
01707   send_all_to_flush_control(guard);
01708   return DDS::RETCODE_OK;
01709 }

Here is the call graph for this function:

Here is the caller graph for this function:

DDS::DomainId_t OpenDDS::DCPS::DataWriterImpl::domain_id (  )  const [inline, private, virtual]

Implements OpenDDS::DCPS::TransportClient.

Definition at line 555 of file DataWriterImpl.h.

00555                                   {
00556     return this->domain_id_;
00557   }

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::enable (  )  [virtual]

Implements DDS::Entity.

Definition at line 1259 of file DataWriterImpl.cpp.

References ACE_TEXT(), association_chunk_multiplier_, OpenDDS::DCPS::TransportClient::connection_info(), data_container_, db_allocator_, OpenDDS::DCPS::DCPS_debug_level, DDS::DataWriterQos::deadline, domain_id_, dp_id_, DDS::DataWriterQos::durability, DDS::DataWriterQos::durability_service, DDS::DURATION_INFINITE_NSEC, DDS::DURATION_INFINITE_SEC, OpenDDS::DCPS::duration_to_time_value(), OpenDDS::DCPS::TransportClient::enable_transport(), OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), get_type_name(), OpenDDS::DCPS::GUID_UNKNOWN, header_allocator_, DDS::DataWriterQos::history, if(), OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::EntityImpl::is_enabled(), DDS::KEEP_ALL_HISTORY_QOS, last_deadline_missed_total_count_, DDS::LENGTH_UNLIMITED, DDS::DataWriterQos::lifespan, DDS::DataWriterQos::liveliness, liveliness_check_interval_, liveness_timer_, LM_DEBUG, LM_ERROR, LM_WARNING, OpenDDS::DCPS::WeakRcHandle< T >::lock(), lock_, mb_allocator_, monitor_, n_chunks_, DDS::Duration_t::nanosec, offered_deadline_missed_status_, participant_servant_, publication_id_, publisher_servant_, qos_, reactor_, OpenDDS::DCPS::ref(), DDS::DataWriterQos::reliability, DDS::RELIABLE_RELIABILITY_QOS, OpenDDS::DCPS::Monitor::report(), OpenDDS::DCPS::unique_ptr< T, Deleter >::reset(), DDS::DataWriterQos::resource_limits, DDS::RETCODE_ERROR, DDS::RETCODE_OK, DDS::RETCODE_PRECONDITION_NOT_MET, ACE_Reactor_Timer_Interface::schedule_timer(), DDS::Duration_t::sec, OpenDDS::DCPS::EntityImpl::set_enabled(), TheServiceParticipant, topic_name_, topic_servant_, DDS::VOLATILE_DURABILITY_QOS, watchdog_, WriteDataContainer, and ACE_Time_Value::zero.

Referenced by OpenDDS::DCPS::PublisherImpl::create_datawriter().

01260 {
01261   //According spec:
01262   // - Calling enable on an already enabled Entity returns OK and has no
01263   // effect.
01264   // - Calling enable on an Entity whose factory is not enabled will fail
01265   // and return PRECONDITION_NOT_MET.
01266 
01267   if (this->is_enabled()) {
01268     return DDS::RETCODE_OK;
01269   }
01270 
01271   RcHandle<PublisherImpl> publisher = this->publisher_servant_.lock();
01272   if (!publisher || !publisher->is_enabled()) {
01273     return DDS::RETCODE_PRECONDITION_NOT_MET;
01274   }
01275 
01276   RcHandle<DomainParticipantImpl> participant = participant_servant_.lock();
01277   if (participant) {
01278     dp_id_ = participant->get_id();
01279   }
01280 
01281   // Note: do configuration based on QoS in enable() because
01282   //       before enable is called the QoS can be changed -- even
01283   //       for Changeable=NO
01284 
01285   // Configure WriteDataContainer constructor parameters from qos.
01286 
01287   const bool reliable = qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS;
01288 
01289   CORBA::Long const max_samples_per_instance =
01290     (qos_.resource_limits.max_samples_per_instance == DDS::LENGTH_UNLIMITED)
01291     ? 0x7fffffff : qos_.resource_limits.max_samples_per_instance;
01292 
01293   CORBA::Long max_instances = 0, max_total_samples = 0;
01294 
01295   if (qos_.resource_limits.max_samples != DDS::LENGTH_UNLIMITED) {
01296     n_chunks_ = qos_.resource_limits.max_samples;
01297 
01298     if (qos_.resource_limits.max_instances == DDS::LENGTH_UNLIMITED ||
01299         (qos_.resource_limits.max_samples < qos_.resource_limits.max_instances)
01300         || (qos_.resource_limits.max_samples <
01301             (qos_.resource_limits.max_instances * max_samples_per_instance))) {
01302       max_total_samples = reliable ? qos_.resource_limits.max_samples : 0;
01303     }
01304   }
01305 
01306   if (reliable && qos_.resource_limits.max_instances != DDS::LENGTH_UNLIMITED)
01307     max_instances = qos_.resource_limits.max_instances;
01308 
01309   const CORBA::Long history_depth =
01310     (qos_.history.kind == DDS::KEEP_ALL_HISTORY_QOS ||
01311      qos_.history.depth == DDS::LENGTH_UNLIMITED) ? 0x7fffffff : qos_.history.depth;
01312 
01313   const CORBA::Long max_durable_per_instance =
01314     qos_.durability.kind == DDS::VOLATILE_DURABILITY_QOS ? 0 : history_depth;
01315 
01316   // enable the type specific part of this DataWriter
01317   this->enable_specific();
01318 
01319 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
01320   // Get data durability cache if DataWriter QoS requires durable
01321   // samples.  Publisher servant retains ownership of the cache.
01322   DataDurabilityCache* const durability_cache =
01323     TheServiceParticipant->get_data_durability_cache(qos_.durability);
01324 #endif
01325 
01326   //Note: the QoS used to set n_chunks_ is Changable=No so
01327   // it is OK that we cannot change the size of our allocators.
01328   data_container_ .reset(new WriteDataContainer(this,
01329                                            max_samples_per_instance,
01330                                            history_depth,
01331                                            max_durable_per_instance,
01332                                            qos_.reliability.max_blocking_time,
01333                                            n_chunks_,
01334                                            domain_id_,
01335                                            topic_name_,
01336                                            get_type_name(),
01337 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
01338                                            durability_cache,
01339                                            qos_.durability_service,
01340 #endif
01341                                            max_instances,
01342                                            max_total_samples));
01343 
01344   // +1 because we might allocate one before releasing another
01345   // TBD - see if this +1 can be removed.
01346   mb_allocator_.reset(new MessageBlockAllocator(n_chunks_ * association_chunk_multiplier_));
01347   db_allocator_.reset(new DataBlockAllocator(n_chunks_+1));
01348   header_allocator_.reset(new DataSampleHeaderAllocator(n_chunks_+1));
01349 
01350   if (DCPS_debug_level >= 2) {
01351     ACE_DEBUG((LM_DEBUG,
01352                "(%P|%t) DataWriterImpl::enable-mb"
01353                " Cached_Allocator_With_Overflow %x with %d chunks\n",
01354                mb_allocator_.get(),
01355                n_chunks_));
01356 
01357     ACE_DEBUG((LM_DEBUG,
01358                "(%P|%t) DataWriterImpl::enable-db"
01359                " Cached_Allocator_With_Overflow %x with %d chunks\n",
01360                db_allocator_.get(),
01361                n_chunks_));
01362 
01363     ACE_DEBUG((LM_DEBUG,
01364                "(%P|%t) DataWriterImpl::enable-header"
01365                " Cached_Allocator_With_Overflow %x with %d chunks\n",
01366                header_allocator_.get(),
01367                n_chunks_));
01368   }
01369 
01370   if (qos_.liveliness.lease_duration.sec != DDS::DURATION_INFINITE_SEC &&
01371       qos_.liveliness.lease_duration.nanosec != DDS::DURATION_INFINITE_NSEC) {
01372     liveliness_check_interval_ = duration_to_time_value(qos_.liveliness.lease_duration);
01373     liveliness_check_interval_ *= TheServiceParticipant->liveliness_factor()/100.0;
01374     // Must be at least 1 micro second.
01375     if (liveliness_check_interval_ == ACE_Time_Value::zero) {
01376       liveliness_check_interval_ = ACE_Time_Value (0, 1);
01377     }
01378 
01379     if (reactor_->schedule_timer(liveness_timer_.in(),
01380                                  0,
01381                                  liveliness_check_interval_,
01382                                  liveliness_check_interval_) == -1) {
01383       ACE_ERROR((LM_ERROR,
01384                  ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::enable: %p.\n"),
01385                  ACE_TEXT("schedule_timer")));
01386 
01387     }
01388   }
01389 
01390   if (!participant)
01391     return DDS::RETCODE_ERROR;
01392 
01393   participant->add_adjust_liveliness_timers(this);
01394 
01395   // Setup the offered deadline watchdog if the configured deadline
01396   // period is not the default (infinite).
01397   DDS::Duration_t const deadline_period = this->qos_.deadline.period;
01398 
01399   if (deadline_period.sec != DDS::DURATION_INFINITE_SEC
01400       || deadline_period.nanosec != DDS::DURATION_INFINITE_NSEC) {
01401     this->watchdog_ = make_rch<OfferedDeadlineWatchdog>(
01402                            ref(this->lock_),
01403                            this->qos_.deadline,
01404                            ref(*this),
01405                            ref(this->offered_deadline_missed_status_),
01406                            ref(this->last_deadline_missed_total_count_));
01407   }
01408 
01409   Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
01410   disco->pre_writer(this);
01411 
01412   this->set_enabled();
01413 
01414   try {
01415     this->enable_transport(reliable,
01416                            this->qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS);
01417 
01418   } catch (const Transport::Exception&) {
01419     ACE_ERROR((LM_ERROR,
01420                ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::enable, ")
01421                ACE_TEXT("Transport Exception.\n")));
01422     data_container_->shutdown_ = true;
01423     return DDS::RETCODE_ERROR;
01424   }
01425 
01426   const TransportLocatorSeq& trans_conf_info = connection_info();
01427 
01428   DDS::PublisherQos pub_qos;
01429 
01430   publisher->get_qos(pub_qos);
01431 
01432   this->publication_id_ =
01433     disco->add_publication(this->domain_id_,
01434                            this->dp_id_,
01435                            this->topic_servant_->get_id(),
01436                            this,
01437                            this->qos_,
01438                            trans_conf_info,
01439                            pub_qos);
01440 
01441 
01442   if (!publisher || this->publication_id_ == GUID_UNKNOWN) {
01443     ACE_DEBUG((LM_WARNING,
01444                ACE_TEXT("(%P|%t) WARNING: DataWriterImpl::enable, ")
01445                ACE_TEXT("add_publication returned invalid id. \n")));
01446     data_container_->shutdown_ = true;
01447     return DDS::RETCODE_ERROR;
01448   }
01449 
01450   this->data_container_->publication_id_ = this->publication_id_;
01451 
01452   const DDS::ReturnCode_t writer_enabled_result =
01453     publisher->writer_enabled(topic_name_.in(), this);
01454 
01455   if (this->monitor_) {
01456     this->monitor_->report();
01457   }
01458 
01459 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
01460 
01461   // Move cached data from the durability cache to the unsent data
01462   // queue.
01463   if (durability_cache != 0) {
01464 
01465     if (!durability_cache->get_data(this->domain_id_,
01466                                     this->topic_name_,
01467                                     get_type_name(),
01468                                     this,
01469                                     this->mb_allocator_.get(),
01470                                     this->db_allocator_.get(),
01471                                     this->qos_.lifespan)) {
01472       ACE_ERROR((LM_ERROR,
01473                  ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::enable: ")
01474                  ACE_TEXT("unable to retrieve durable data\n")));
01475     }
01476   }
01477 
01478 #endif
01479 
01480   return writer_enabled_result;
01481 }

Here is the call graph for this function:

Here is the caller graph for this function:

virtual DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::enable_specific (  )  [protected, pure virtual]
void OpenDDS::DCPS::DataWriterImpl::end_coherent_changes ( const GroupCoherentSamples &  group_samples  ) 

Ends a coherent change set; should only be called once.

Definition at line 2245 of file DataWriterImpl.cpp.

References ACE_TEXT(), coherent_, coherent_samples_, OpenDDS::DCPS::CoherentChangeControl::coherent_samples_, create_control_message(), OpenDDS::DCPS::END_COHERENT_CHANGES, OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), get_db_lock(), get_lock(), ACE_OS::gettimeofday(), OpenDDS::DCPS::CoherentChangeControl::group_coherent_, OpenDDS::DCPS::CoherentChangeControl::group_coherent_samples_, DDS::GROUP_PRESENTATION_QOS, header, OpenDDS::DCPS::WriterCoherentSample::last_sample_, LM_ERROR, OpenDDS::DCPS::WeakRcHandle< T >::lock(), OpenDDS::DCPS::CoherentChangeControl::max_marshaled_size(), ACE_Message_Block::MB_DATA, OpenDDS::DCPS::move(), OpenDDS::DCPS::WriterCoherentSample::num_samples_, OpenDDS::DCPS::CoherentChangeControl::publisher_id_, publisher_servant_, send_control(), OpenDDS::DCPS::SEND_CONTROL_ERROR, sequence_number_, OpenDDS::DCPS::TransportClient::swap_bytes(), and OpenDDS::DCPS::time_value_to_time().

02246 {
02247   // PublisherImpl::pi_lock_ should be held.
02248   ACE_GUARD(ACE_Recursive_Thread_Mutex,
02249             guard,
02250             get_lock());
02251 
02252   CoherentChangeControl end_msg;
02253   end_msg.coherent_samples_.num_samples_ = this->coherent_samples_;
02254   end_msg.coherent_samples_.last_sample_ = this->sequence_number_;
02255 
02256   RcHandle<PublisherImpl> publisher = this->publisher_servant_.lock();
02257 
02258   if (publisher) {
02259     end_msg.group_coherent_
02260       = publisher->qos_.presentation.access_scope == DDS::GROUP_PRESENTATION_QOS;
02261   }
02262 
02263   if (publisher && end_msg.group_coherent_) {
02264     end_msg.publisher_id_ = publisher->publisher_id_;
02265     end_msg.group_coherent_samples_ = group_samples;
02266   }
02267 
02268   size_t max_marshaled_size = end_msg.max_marshaled_size();
02269 
02270   Message_Block_Ptr data( new ACE_Message_Block(max_marshaled_size,
02271                                   ACE_Message_Block::MB_DATA,
02272                                   0, //cont
02273                                   0, //data
02274                                   0, //alloc_strategy
02275                                   get_db_lock()));
02276 
02277   Serializer serializer(
02278     data.get(),
02279     this->swap_bytes());
02280 
02281   serializer << end_msg;
02282 
02283   DDS::Time_t source_timestamp =
02284     time_value_to_time(ACE_OS::gettimeofday());
02285 
02286   DataSampleHeader header;
02287   Message_Block_Ptr control(
02288     create_control_message(END_COHERENT_CHANGES, header, move(data), source_timestamp));
02289 
02290 
02291   this->coherent_ = false;
02292   this->coherent_samples_ = 0;
02293 
02294   guard.release();
02295   if (this->send_control(header, move(control)) == SEND_CONTROL_ERROR) {
02296     ACE_ERROR((LM_ERROR,
02297                ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::end_coherent_changes:")
02298                ACE_TEXT(" unable to send END_COHERENT_CHANGES control message!\n")));
02299   }
02300 }

Here is the call graph for this function:

bool OpenDDS::DCPS::DataWriterImpl::filter_out ( const DataSampleElement elt,
const OPENDDS_STRING &  filterClassName,
const FilterEvaluator evaluator,
const DDS::StringSeq expression_params 
) const

Definition at line 2187 of file DataWriterImpl.cpp.

References OpenDDS::DCPS::DataSampleHeader::byte_order_, OpenDDS::DCPS::DataSampleHeader::cdr_encapsulation_, ACE_Message_Block::cont(), OpenDDS::DCPS::FilterEvaluator::eval(), OpenDDS::DCPS::DataSampleElement::get_header(), OpenDDS::DCPS::DataSampleElement::get_sample(), OpenDDS::DCPS::TypeSupportImpl::getMetaStructForType(), LM_ERROR, and topic_servant_.

Referenced by OpenDDS::DCPS::WriteDataContainer::copy_and_prepend().

02191 {
02192   TypeSupportImpl* const typesupport =
02193     dynamic_cast<TypeSupportImpl*>(topic_servant_->get_type_support());
02194 
02195   if (!typesupport) {
02196     ACE_ERROR((LM_ERROR, "(%P|%t) ERROR DataWriterImpl::filter_out - Could not cast type support, not filtering\n"));
02197     return false;
02198   }
02199 
02200   if (filterClassName == "DDSSQL" ||
02201       filterClassName == "OPENDDSSQL") {
02202     return !evaluator.eval(elt.get_sample()->cont(),
02203                            elt.get_header().byte_order_ != ACE_CDR_BYTE_ORDER,
02204                            elt.get_header().cdr_encapsulation_, typesupport->getMetaStructForType(),
02205                            expression_params);
02206   }
02207   else {
02208     return false;
02209   }
02210 }

Here is the call graph for this function:

Here is the caller graph for this function:

DataBlockLockPool::DataBlockLock* OpenDDS::DCPS::DataWriterImpl::get_db_lock (  )  [inline]

Definition at line 456 of file DataWriterImpl.h.

Referenced by association_complete_i(), create_control_message(), create_sample_data_message(), end_coherent_changes(), and OpenDDS::DCPS::DataDurabilityCache::get_data().

00456                                                 {
00457     return db_lock_pool_->get_lock();
00458   }

Here is the caller graph for this function:

RepoId OpenDDS::DCPS::DataWriterImpl::get_dp_id (  ) 

Accessor of the repository id of the domain participant.

Definition at line 1959 of file DataWriterImpl.cpp.

References dp_id_.

Referenced by OpenDDS::DCPS::DWMonitorImpl::report().

01960 {
01961   return dp_id_;
01962 }

Here is the caller graph for this function:

PublicationInstance_rch OpenDDS::DCPS::DataWriterImpl::get_handle_instance ( DDS::InstanceHandle_t  handle  ) 

Attempt to locate an existing instance for the given handle.

Definition at line 2449 of file DataWriterImpl.cpp.

References data_container_.

02450 {
02451 
02452   if (0 != data_container_) {
02453     return data_container_->get_handle_instance(handle);
02454   }
02455 
02456   return PublicationInstance_rch();
02457 }

DDS::InstanceHandle_t OpenDDS::DCPS::DataWriterImpl::get_instance_handle (  )  [virtual]

Implements OpenDDS::DCPS::EntityImpl.

Definition at line 170 of file DataWriterImpl.cpp.

References OpenDDS::DCPS::WeakRcHandle< T >::lock(), participant_servant_, and publication_id_.

00171 {
00172   using namespace OpenDDS::DCPS;
00173   RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock();
00174   if (participant)
00175     return participant->id_to_handle(publication_id_);
00176   return 0;
00177 }

Here is the call graph for this function:

void OpenDDS::DCPS::DataWriterImpl::get_instance_handles ( InstanceHandleVec &  instance_handles  ) 

Definition at line 2623 of file DataWriterImpl.cpp.

References data_container_.

Referenced by OpenDDS::DCPS::DWMonitorImpl::report().

02624 {
02625   this->data_container_->get_instance_handles(instance_handles);
02626 }

Here is the caller graph for this function:

DDS::DataWriterListener_ptr OpenDDS::DCPS::DataWriterImpl::get_listener (  )  [virtual]

Implements DDS::DataWriter.

Definition at line 955 of file DataWriterImpl.cpp.

References CORBA::LocalObject::_duplicate(), and listener_.

00956 {
00957   return DDS::DataWriterListener::_duplicate(listener_.in());
00958 }

Here is the call graph for this function:

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::get_liveliness_lost_status ( DDS::LivelinessLostStatus status  )  [virtual]
ACE_INLINE ACE_Recursive_Thread_Mutex& OpenDDS::DCPS::DataWriterImpl::get_lock (  )  [inline]

Accessor of the WriterDataContainer's lock.

Definition at line 364 of file DataWriterImpl.h.

References ACE_Recursive_Thread_Mutex::lock_.

Referenced by association_complete_i(), begin_coherent_changes(), coherent_changes_pending(), dispose(), dispose_and_unregister(), end_coherent_changes(), register_instance_from_durable_data(), send_request_ack(), unregister_instance_i(), and write().

00364                                          {
00365     return data_container_->lock_;
00366   }

Here is the caller graph for this function:

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::get_matched_subscription_data ( DDS::SubscriptionBuiltinTopicData subscription_data,
DDS::InstanceHandle_t  subscription_handle 
) [virtual]

Definition at line 1226 of file DataWriterImpl.cpp.

References ACE_TEXT(), OpenDDS::DCPS::BUILT_IN_SUBSCRIPTION_TOPIC, OpenDDS::DCPS::EntityImpl::enabled_, OpenDDS::DCPS::RcHandle< T >::in(), LM_ERROR, OpenDDS::DCPS::WeakRcHandle< T >::lock(), participant_servant_, DDS::RETCODE_ERROR, DDS::RETCODE_NOT_ENABLED, and DDS::RETCODE_OK.

01229 {
01230   if (enabled_ == false) {
01231     ACE_ERROR_RETURN((LM_ERROR,
01232                       ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::")
01233                       ACE_TEXT("get_matched_subscription_data: ")
01234                       ACE_TEXT("Entity is not enabled. \n")),
01235                      DDS::RETCODE_NOT_ENABLED);
01236   }
01237   RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock();
01238 
01239   DDS::ReturnCode_t ret = DDS::RETCODE_ERROR;
01240   DDS::SubscriptionBuiltinTopicDataSeq data;
01241 
01242   if (participant) {
01243     ret = instance_handle_to_bit_data<DDS::SubscriptionBuiltinTopicDataDataReader_var>(
01244             participant.in(),
01245             BUILT_IN_SUBSCRIPTION_TOPIC,
01246             subscription_handle,
01247             data);
01248   }
01249 
01250   if (ret == DDS::RETCODE_OK) {
01251     subscription_data = data[0];
01252   }
01253 
01254   return ret;
01255 }

Here is the call graph for this function:

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::get_matched_subscriptions ( DDS::InstanceHandleSeq subscription_handles  )  [virtual]

Definition at line 1193 of file DataWriterImpl.cpp.

References ACE_TEXT(), OpenDDS::DCPS::EntityImpl::enabled_, id_to_handle_map_, LM_ERROR, lock_, DDS::RETCODE_ERROR, DDS::RETCODE_NOT_ENABLED, and DDS::RETCODE_OK.

01195 {
01196   if (enabled_ == false) {
01197     ACE_ERROR_RETURN((LM_ERROR,
01198                       ACE_TEXT("(%P|%t) ERROR: ")
01199                       ACE_TEXT("DataWriterImpl::get_matched_subscriptions: ")
01200                       ACE_TEXT(" Entity is not enabled. \n")),
01201                      DDS::RETCODE_NOT_ENABLED);
01202   }
01203 
01204   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01205                    guard,
01206                    this->lock_,
01207                    DDS::RETCODE_ERROR);
01208 
01209   // Copy out the handles for the current set of subscriptions.
01210   int index = 0;
01211   subscription_handles.length(
01212     static_cast<CORBA::ULong>(this->id_to_handle_map_.size()));
01213 
01214   for (RepoIdToHandleMap::iterator
01215        current = this->id_to_handle_map_.begin();
01216        current != this->id_to_handle_map_.end();
01217        ++current, ++index) {
01218     subscription_handles[index] = current->second;
01219   }
01220 
01221   return DDS::RETCODE_OK;
01222 }

Here is the call graph for this function:

DDS::InstanceHandle_t OpenDDS::DCPS::DataWriterImpl::get_next_handle (  ) 

Get an instance handle for a new instance.

Definition at line 180 of file DataWriterImpl.cpp.

References OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::WeakRcHandle< T >::lock(), and participant_servant_.

Referenced by OpenDDS::DCPS::WriteDataContainer::register_instance().

00181 {
00182   using namespace OpenDDS::DCPS;
00183   RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock();
00184   if (participant)
00185     return participant->id_to_handle(GUID_UNKNOWN);
00186   return 0;
00187 }

Here is the call graph for this function:

Here is the caller graph for this function:

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::get_offered_deadline_missed_status ( DDS::OfferedDeadlineMissedStatus status  )  [virtual]
DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::get_offered_incompatible_qos_status ( DDS::OfferedIncompatibleQosStatus status  )  [virtual]
CORBA::Long OpenDDS::DCPS::DataWriterImpl::get_priority_value ( const AssociationData  )  const [inline, private, virtual]

Implements OpenDDS::DCPS::TransportClient.

Definition at line 559 of file DataWriterImpl.h.

00559                                                              {
00560     return this->qos_.transport_priority.value;
00561   }

RepoId OpenDDS::DCPS::DataWriterImpl::get_publication_id (  ) 

Accessor of the repository id of this datawriter/publication.

Definition at line 1953 of file DataWriterImpl.cpp.

References publication_id_.

Referenced by add_association(), OpenDDS::DCPS::PublisherImpl::delete_datawriter(), OpenDDS::Federator::ManagerImpl::initialize(), OpenDDS::DCPS::DWPeriodicMonitorImpl::report(), and OpenDDS::DCPS::DWMonitorImpl::report().

01954 {
01955   return publication_id_;
01956 }

Here is the caller graph for this function:

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::get_publication_matched_status ( DDS::PublicationMatchedStatus status  )  [virtual]
DDS::Publisher_ptr OpenDDS::DCPS::DataWriterImpl::get_publisher (  )  [virtual]

Implements DDS::DataWriter.

Definition at line 1061 of file DataWriterImpl.cpp.

References OpenDDS::DCPS::WeakRcHandle< T >::lock(), and publisher_servant_.

Referenced by OpenDDS::DCPS::PublisherImpl::delete_datawriter(), OpenDDS::DCPS::StaticDiscovery::pre_writer(), and OpenDDS::DCPS::DWMonitorImpl::report().

01062 {
01063   return publisher_servant_.lock()._retn();
01064 }

Here is the call graph for this function:

Here is the caller graph for this function:

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::get_qos ( DDS::DataWriterQos qos  )  [virtual]

Definition at line 938 of file DataWriterImpl.cpp.

References qos_, and DDS::RETCODE_OK.

Referenced by OpenDDS::DCPS::StaticDiscovery::pre_writer().

00939 {
00940   qos = qos_;
00941   return DDS::RETCODE_OK;
00942 }

Here is the caller graph for this function:

void OpenDDS::DCPS::DataWriterImpl::get_readers ( RepoIdSet &  readers  ) 

Definition at line 2629 of file DataWriterImpl.cpp.

References lock_, and readers_.

Referenced by OpenDDS::DCPS::DWMonitorImpl::report().

02630 {
02631   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
02632   readers = this->readers_;
02633 }

Here is the caller graph for this function:

const RepoId& OpenDDS::DCPS::DataWriterImpl::get_repo_id (  )  const [inline, private, virtual]

Implements OpenDDS::DCPS::TransportClient.

Definition at line 551 of file DataWriterImpl.h.

00551                                     {
00552     return this->publication_id_;
00553   }

SendStateDataSampleList OpenDDS::DCPS::DataWriterImpl::get_resend_data (  )  [inline]

Definition at line 283 of file DataWriterImpl.h.

Referenced by association_complete_i().

00283                                             {
00284     return data_container_->get_resend_data();
00285   }

Here is the caller graph for this function:

DDS::Topic_ptr OpenDDS::DCPS::DataWriterImpl::get_topic (  )  [virtual]

Implements DDS::DataWriter.

Definition at line 961 of file DataWriterImpl.cpp.

References CORBA::LocalObject::_duplicate(), OpenDDS::DCPS::TopicDescriptionPtr< Topic >::get(), and topic_servant_.

Referenced by OpenDDS::DCPS::DWMonitorImpl::report().

00962 {
00963   return DDS::Topic::_duplicate(topic_servant_.get());
00964 }

Here is the call graph for this function:

Here is the caller graph for this function:

char const * OpenDDS::DCPS::DataWriterImpl::get_type_name (  )  const

Get associated topic type name.

Definition at line 1965 of file DataWriterImpl.cpp.

References type_name_.

Referenced by enable().

01966 {
01967   return type_name_.in();
01968 }

Here is the caller graph for this function:

ACE_UINT64 OpenDDS::DCPS::DataWriterImpl::get_unsent_data ( SendStateDataSampleList list  )  [inline]

Retrieve the unsent data from the WriteDataContainer.

Definition at line 279 of file DataWriterImpl.h.

Referenced by send_all_to_flush_control(), and write().

00279                                                              {
00280     return data_container_->get_unsent_data(list);
00281   }

Here is the caller graph for this function:

int OpenDDS::DCPS::DataWriterImpl::handle_timeout ( const ACE_Time_Value tv,
const void *  arg 
) [virtual]

Handle the assert liveliness timeout.

Definition at line 2343 of file DataWriterImpl.cpp.

References ACE_TEXT(), DDS::AUTOMATIC_LIVELINESS_QOS, ACE_Reactor_Timer_Interface::cancel_timer(), OpenDDS::DCPS::duration_to_time_value(), OpenDDS::DCPS::RcHandle< T >::in(), CORBA::is_nil(), last_liveliness_activity_time_, listener_for(), DDS::DataWriterQos::liveliness, liveliness_asserted_, liveliness_check_interval_, liveliness_lost_, DDS::LIVELINESS_LOST_STATUS, liveliness_lost_status_, liveness_timer_, LM_ERROR, DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS, DDS::MANUAL_BY_TOPIC_LIVELINESS_QOS, qos_, reactor_, ACE_Reactor_Timer_Interface::schedule_timer(), send_liveliness(), DDS::LivelinessLostStatus::total_count, and DDS::LivelinessLostStatus::total_count_change.

02345 {
02346   bool liveliness_lost = false;
02347 
02348   ACE_Time_Value elapsed = tv - last_liveliness_activity_time_;
02349 
02350   // Do we need to send a liveliness message?
02351   if (elapsed >= liveliness_check_interval_) {
02352     switch (this->qos_.liveliness.kind) {
02353     case DDS::AUTOMATIC_LIVELINESS_QOS:
02354       if (this->send_liveliness(tv) == false) {
02355         liveliness_lost = true;
02356       }
02357       break;
02358 
02359     case DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS:
02360       if (liveliness_asserted_) {
02361         if (this->send_liveliness(tv) == false) {
02362           liveliness_lost = true;
02363         }
02364       }
02365       break;
02366 
02367     case DDS::MANUAL_BY_TOPIC_LIVELINESS_QOS:
02368       // Do nothing.
02369       break;
02370     }
02371   }
02372   else {
02373     // Reschedule.
02374     if (reactor_->cancel_timer(liveness_timer_.in()) == -1) {
02375       ACE_ERROR((LM_ERROR,
02376         ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::handle_timeout: %p.\n"),
02377         ACE_TEXT("cancel_timer")));
02378     }
02379     if (reactor_->schedule_timer(liveness_timer_.in(), 0, liveliness_check_interval_ - elapsed,
02380       liveliness_check_interval_) == -1)
02381     {
02382       ACE_ERROR((LM_ERROR,
02383         ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::handle_timeout: %p.\n"),
02384         ACE_TEXT("schedule_timer")));
02385     }
02386     return 0;
02387   }
02388 
02389   liveliness_asserted_ = false;
02390   elapsed = tv - last_liveliness_activity_time_;
02391 
02392   // Have we lost liveliness?
02393   if (elapsed >= duration_to_time_value(qos_.liveliness.lease_duration)) {
02394     liveliness_lost = true;
02395   }
02396 
02397   if (!this->liveliness_lost_ && liveliness_lost) {
02398     ++ this->liveliness_lost_status_.total_count;
02399     ++ this->liveliness_lost_status_.total_count_change;
02400 
02401     DDS::DataWriterListener_var listener =
02402       listener_for(DDS::LIVELINESS_LOST_STATUS);
02403 
02404     if (!CORBA::is_nil(listener.in())) {
02405       listener->on_liveliness_lost(this, this->liveliness_lost_status_);
02406       this->liveliness_lost_status_.total_count_change = 0;
02407     }
02408   }
02409 
02410   this->liveliness_lost_ = liveliness_lost;
02411   return 0;
02412 }

Here is the call graph for this function:

void OpenDDS::DCPS::DataWriterImpl::inconsistent_topic (  )  [virtual]

Implements OpenDDS::DCPS::DataWriterCallbacks.

Definition at line 858 of file DataWriterImpl.cpp.

References topic_servant_.

00859 {
00860   topic_servant_->inconsistent_topic();
00861 }

void OpenDDS::DCPS::DataWriterImpl::init ( TopicImpl topic_servant,
const DDS::DataWriterQos qos,
DDS::DataWriterListener_ptr  a_listener,
const DDS::StatusMask mask,
WeakRcHandle< OpenDDS::DCPS::DomainParticipantImpl participant_servant,
OpenDDS::DCPS::PublisherImpl publisher_servant 
)

Initialize the data members.

Definition at line 131 of file DataWriterImpl.cpp.

References CORBA::LocalObject::_duplicate(), DBG_ENTRY_LVL, domain_id_, is_bit_, listener_, listener_mask_, OpenDDS::DCPS::WeakRcHandle< T >::lock(), participant_servant_, publisher_servant_, qos_, reactor_, TheServiceParticipant, topic_id_, topic_name_, topic_servant_, OpenDDS::DCPS::topicIsBIT(), and type_name_.

Referenced by OpenDDS::DCPS::PublisherImpl::create_datawriter().

00138 {
00139   DBG_ENTRY_LVL("DataWriterImpl","init",6);
00140   topic_servant_ = topic_servant;
00141   topic_name_    = topic_servant_->get_name();
00142   topic_id_      = topic_servant_->get_id();
00143   type_name_     = topic_servant_->get_type_name();
00144 
00145 #if !defined (DDS_HAS_MINIMUM_BIT)
00146   is_bit_ = topicIsBIT(topic_name_.in(), type_name_.in());
00147 #endif // !defined (DDS_HAS_MINIMUM_BIT)
00148 
00149   qos_ = qos;
00150 
00151   //Note: OK to _duplicate(nil).
00152   listener_ = DDS::DataWriterListener::_duplicate(a_listener);
00153   listener_mask_ = mask;
00154 
00155   // Only store the participant pointer, since it is our "grand"
00156   // parent, we will exist as long as it does.
00157   participant_servant_ = participant_servant;
00158 
00159   RcHandle<DomainParticipantImpl> participant = participant_servant.lock();
00160   domain_id_ = participant->get_domain_id();
00161 
00162   // Only store the publisher pointer, since it is our parent, we will
00163   // exist as long as it does.
00164   publisher_servant_ = *publisher_servant;
00165 
00166   this->reactor_ = TheServiceParticipant->timer();
00167 }

Here is the call graph for this function:

Here is the caller graph for this function:

DDS::DataWriterListener_ptr OpenDDS::DCPS::DataWriterImpl::listener_for ( DDS::StatusKind  kind  ) 

This is used to retrieve the listener for a certain status change.

If this datawriter has a registered listener and the status kind is in the listener mask then the listener is returned. Otherwise, the query for the listener is propagated up to the factory/publisher.

Definition at line 2325 of file DataWriterImpl.cpp.

References CORBA::LocalObject::_duplicate(), CORBA::is_nil(), listener_, listener_mask_, OpenDDS::DCPS::WeakRcHandle< T >::lock(), and publisher_servant_.

Referenced by association_complete_i(), OpenDDS::DCPS::OfferedDeadlineWatchdog::execute(), handle_timeout(), and update_incompatible_qos().

02326 {
02327   // per 2.1.4.3.1 Listener Access to Plain Communication Status
02328   // use this entities factory if listener is mask not enabled
02329   // for this kind.
02330   RcHandle<PublisherImpl> publisher = publisher_servant_.lock();
02331   if (!publisher)
02332     return 0;
02333 
02334   if (CORBA::is_nil(listener_.in()) || (listener_mask_ & kind) == 0) {
02335     return publisher->listener_for(kind);
02336 
02337   } else {
02338     return DDS::DataWriterListener::_duplicate(listener_.in());
02339   }
02340 }

Here is the call graph for this function:

Here is the caller graph for this function:

ACE_Time_Value OpenDDS::DCPS::DataWriterImpl::liveliness_check_interval ( DDS::LivelinessQosPolicyKind  kind  ) 

Definition at line 1173 of file DataWriterImpl.cpp.

References DDS::DataWriterQos::liveliness, liveliness_check_interval_, ACE_Time_Value::max_time, and qos_.

Referenced by OpenDDS::DCPS::DomainParticipantImpl::LivelinessTimer::add_adjust().

01174 {
01175   if (this->qos_.liveliness.kind == kind) {
01176     return liveliness_check_interval_;
01177   } else {
01178     return ACE_Time_Value::max_time;
01179   }
01180 }

Here is the caller graph for this function:

void OpenDDS::DCPS::DataWriterImpl::lookup_instance_handles ( const ReaderIdSeq ids,
DDS::InstanceHandleSeq hdls 
) [private]

Lookup the instance handles by the subscription repo ids.

Definition at line 2558 of file DataWriterImpl.cpp.

References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, LM_DEBUG, OpenDDS::DCPS::WeakRcHandle< T >::lock(), OPENDDS_STRING, and participant_servant_.

Referenced by notify_publication_disconnected(), notify_publication_lost(), and notify_publication_reconnected().

02560 {
02561   CORBA::ULong const num_rds = ids.length();
02562   RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock();
02563 
02564   if (!participant)
02565     return;
02566 
02567   if (DCPS_debug_level > 9) {
02568     OPENDDS_STRING separator;
02569     OPENDDS_STRING buffer;
02570 
02571     for (CORBA::ULong i = 0; i < num_rds; ++i) {
02572       buffer += separator + OPENDDS_STRING(GuidConverter(ids[i]));
02573       separator = ", ";
02574     }
02575 
02576     ACE_DEBUG((LM_DEBUG,
02577                ACE_TEXT("(%P|%t) DataWriterImpl::lookup_instance_handles: ")
02578                ACE_TEXT("searching for handles for reader Ids: %C.\n"),
02579                buffer.c_str()));
02580   }
02581 
02582   hdls.length(num_rds);
02583 
02584   for (CORBA::ULong i = 0; i < num_rds; ++i) {
02585     hdls[i] = participant->id_to_handle(ids[i]);
02586   }
02587 }

Here is the call graph for this function:

Here is the caller graph for this function:

bool OpenDDS::DCPS::DataWriterImpl::need_sequence_repair (  )  [private]

Definition at line 2655 of file DataWriterImpl.cpp.

References need_sequence_repair_i(), and reader_info_lock_.

Referenced by create_control_message(), and create_sample_data_message().

02656 {
02657   ACE_GUARD_RETURN(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_, false);
02658   return need_sequence_repair_i();
02659 }

Here is the call graph for this function:

Here is the caller graph for this function:

bool OpenDDS::DCPS::DataWriterImpl::need_sequence_repair_i (  )  const [private]

Definition at line 2662 of file DataWriterImpl.cpp.

References reader_info_, and sequence_number_.

Referenced by need_sequence_repair().

02663 {
02664   for (RepoIdToReaderInfoMap::const_iterator it = reader_info_.begin(),
02665        end = reader_info_.end(); it != end; ++it) {
02666     if (it->second.expected_sequence_ != sequence_number_) {
02667       return true;
02668     }
02669   }
02670 
02671   return false;
02672 }

Here is the caller graph for this function:

void OpenDDS::DCPS::DataWriterImpl::notify_publication_disconnected ( const ReaderIdSeq subids  )  [virtual]

Implements OpenDDS::DCPS::TransportSendListener.

Definition at line 2460 of file DataWriterImpl.cpp.

References OpenDDS::DCPS::LocalObject< DDS::DataWriter >::_narrow(), DBG_ENTRY_LVL, is_bit_, CORBA::is_nil(), listener_, lookup_instance_handles(), status, and OpenDDS::DCPS::PublicationLostStatus::subscription_handles.

02461 {
02462   DBG_ENTRY_LVL("DataWriterImpl","notify_publication_disconnected",6);
02463 
02464   if (!is_bit_) {
02465     // Narrow to DDS::DCPS::DataWriterListener. If a DDS::DataWriterListener
02466     // is given to this DataWriter then narrow() fails.
02467     DataWriterListener_var the_listener =
02468       DataWriterListener::_narrow(this->listener_.in());
02469 
02470     if (!CORBA::is_nil(the_listener.in())) {
02471       PublicationDisconnectedStatus status;
02472       // Since this callback may come after remove_association which
02473       // removes the reader from id_to_handle map, we can ignore this
02474       // error.
02475       this->lookup_instance_handles(subids,
02476                                     status.subscription_handles);
02477       the_listener->on_publication_disconnected(this, status);
02478     }
02479   }
02480 }

Here is the call graph for this function:

void OpenDDS::DCPS::DataWriterImpl::notify_publication_lost ( const DDS::InstanceHandleSeq handles  )  [private]

Definition at line 2530 of file DataWriterImpl.cpp.

References OpenDDS::DCPS::LocalObject< DDS::DataWriter >::_narrow(), DBG_ENTRY_LVL, is_bit_, CORBA::is_nil(), listener_, status, and OpenDDS::DCPS::PublicationLostStatus::subscription_handles.

02531 {
02532   DBG_ENTRY_LVL("DataWriterImpl","notify_publication_lost",6);
02533 
02534   if (!is_bit_) {
02535     // Narrow to DDS::DCPS::DataWriterListener. If a
02536     // DDS::DataWriterListener is given to this DataWriter then
02537     // narrow() fails.
02538     DataWriterListener_var the_listener =
02539       DataWriterListener::_narrow(this->listener_.in());
02540 
02541     if (!CORBA::is_nil(the_listener.in())) {
02542       PublicationLostStatus status;
02543 
02544       CORBA::ULong len = handles.length();
02545       status.subscription_handles.length(len);
02546 
02547       for (CORBA::ULong i = 0; i < len; ++ i) {
02548         status.subscription_handles[i] = handles[i];
02549       }
02550 
02551       the_listener->on_publication_lost(this, status);
02552     }
02553   }
02554 }

Here is the call graph for this function:

void OpenDDS::DCPS::DataWriterImpl::notify_publication_lost ( const ReaderIdSeq subids  )  [virtual]

Implements OpenDDS::DCPS::TransportSendListener.

Definition at line 2506 of file DataWriterImpl.cpp.

References OpenDDS::DCPS::LocalObject< DDS::DataWriter >::_narrow(), DBG_ENTRY_LVL, is_bit_, CORBA::is_nil(), listener_, lookup_instance_handles(), status, and OpenDDS::DCPS::PublicationLostStatus::subscription_handles.

02507 {
02508   DBG_ENTRY_LVL("DataWriterImpl","notify_publication_lost",6);
02509 
02510   if (!is_bit_) {
02511     // Narrow to DDS::DCPS::DataWriterListener. If a
02512     // DDS::DataWriterListener is given to this DataWriter then
02513     // narrow() fails.
02514     DataWriterListener_var the_listener =
02515       DataWriterListener::_narrow(this->listener_.in());
02516 
02517     if (!CORBA::is_nil(the_listener.in())) {
02518       PublicationLostStatus status;
02519 
02520       // Since this callback may come after remove_association which removes
02521       // the reader from id_to_handle map, we can ignore this error.
02522       this->lookup_instance_handles(subids,
02523                                     status.subscription_handles);
02524       the_listener->on_publication_lost(this, status);
02525     }
02526   }
02527 }

Here is the call graph for this function:

void OpenDDS::DCPS::DataWriterImpl::notify_publication_reconnected ( const ReaderIdSeq subids  )  [virtual]

Implements OpenDDS::DCPS::TransportSendListener.

Definition at line 2483 of file DataWriterImpl.cpp.

References OpenDDS::DCPS::LocalObject< DDS::DataWriter >::_narrow(), DBG_ENTRY_LVL, is_bit_, CORBA::is_nil(), listener_, lookup_instance_handles(), status, and OpenDDS::DCPS::PublicationLostStatus::subscription_handles.

02484 {
02485   DBG_ENTRY_LVL("DataWriterImpl","notify_publication_reconnected",6);
02486 
02487   if (!is_bit_) {
02488     // Narrow to DDS::DCPS::DataWriterListener. If a
02489     // DDS::DataWriterListener is given to this DataWriter then
02490     // narrow() fails.
02491     DataWriterListener_var the_listener =
02492       DataWriterListener::_narrow(this->listener_.in());
02493 
02494     if (!CORBA::is_nil(the_listener.in())) {
02495       PublicationDisconnectedStatus status;
02496 
02497       // If it's reconnected then the reader should be in id_to_handle
02498       this->lookup_instance_handles(subids, status.subscription_handles);
02499 
02500       the_listener->on_publication_reconnected(this, status);
02501     }
02502   }
02503 }

Here is the call graph for this function:

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::num_samples ( DDS::InstanceHandle_t  handle,
size_t size 
)

Return the number of samples for a given instance.

Definition at line 1940 of file DataWriterImpl.cpp.

References data_container_.

01942 {
01943   return data_container_->num_samples(handle, size);
01944 }

typedef OpenDDS::DCPS::DataWriterImpl::OPENDDS_MAP_CMP ( RepoId  ,
DDS::InstanceHandle_t  ,
GUID_tKeyLessThan   
) [private]
typedef OpenDDS::DCPS::DataWriterImpl::OPENDDS_MAP_CMP ( RepoId  ,
ReaderInfo  ,
GUID_tKeyLessThan   
) [protected]
typedef OpenDDS::DCPS::DataWriterImpl::OPENDDS_MAP_CMP ( RepoId  ,
SequenceNumber  ,
GUID_tKeyLessThan   
)
typedef OpenDDS::DCPS::DataWriterImpl::OPENDDS_VECTOR ( DDS::InstanceHandle_t   ) 
RcHandle< EntityImpl > OpenDDS::DCPS::DataWriterImpl::parent ( void   )  const [virtual]

Reimplemented from OpenDDS::DCPS::EntityImpl.

Definition at line 2180 of file DataWriterImpl.cpp.

References OpenDDS::DCPS::WeakRcHandle< T >::lock(), and publisher_servant_.

02181 {
02182   return this->publisher_servant_.lock();
02183 }

Here is the call graph for this function:

bool OpenDDS::DCPS::DataWriterImpl::participant_liveliness_activity_after ( const ACE_Time_Value tv  ) 

Definition at line 1183 of file DataWriterImpl.cpp.

References last_liveliness_activity_time_, DDS::DataWriterQos::liveliness, DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS, and qos_.

01184 {
01185   if (this->qos_.liveliness.kind == DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS) {
01186     return last_liveliness_activity_time_ > tv;
01187   } else {
01188     return false;
01189   }
01190 }

bool OpenDDS::DCPS::DataWriterImpl::persist_data (  ) 

Make sent data available beyond the lifetime of this DataWriter.

Definition at line 2591 of file DataWriterImpl.cpp.

References data_container_.

Referenced by OpenDDS::DCPS::PublisherImpl::delete_datawriter().

02592 {
02593   return this->data_container_->persist_data();
02594 }

Here is the caller graph for this function:

void OpenDDS::DCPS::DataWriterImpl::prepare_to_delete (  )  [protected]

Definition at line 2442 of file DataWriterImpl.cpp.

References OpenDDS::DCPS::EntityImpl::set_deleted(), and OpenDDS::DCPS::TransportClient::stop_associating().

Referenced by OpenDDS::DCPS::PublisherImpl::delete_datawriter().

02443 {
02444   this->set_deleted(true);
02445   this->stop_associating();
02446 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::DataWriterImpl::register_for_reader ( const RepoId participant,
const RepoId writerid,
const RepoId readerid,
const TransportLocatorSeq locators,
DiscoveryListener listener 
) [virtual]

Reimplemented from OpenDDS::DCPS::DataWriterCallbacks.

Definition at line 776 of file DataWriterImpl.cpp.

00781 {
00782   TransportClient::register_for_reader(participant, writerid, readerid, locators, listener);
00783 }

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::register_instance_from_durable_data ( DDS::InstanceHandle_t handle,
Message_Block_Ptr  data,
const DDS::Time_t source_timestamp 
)

Delegate to the WriteDataContainer to register and tell the transport to broadcast the registered instance.

Definition at line 1563 of file DataWriterImpl.cpp.

References ACE_TEXT(), DBG_ENTRY_LVL, get_lock(), LM_ERROR, OpenDDS::DCPS::move(), register_instance_i(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, and send_all_to_flush_control().

Referenced by OpenDDS::DCPS::DataDurabilityCache::get_data().

01566 {
01567   DBG_ENTRY_LVL("DataWriterImpl","register_instance_from_durable_data",6);
01568 
01569   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
01570                    guard,
01571                    get_lock(),
01572                    DDS::RETCODE_ERROR);
01573 
01574   DDS::ReturnCode_t ret = register_instance_i(handle, move(data), source_timestamp);
01575   if (ret != DDS::RETCODE_OK) {
01576     ACE_ERROR_RETURN((LM_ERROR,
01577                       ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::register_instance_from_durable_data: ")
01578                       ACE_TEXT("register instance with container failed.\n")),
01579                       ret);
01580   }
01581 
01582   send_all_to_flush_control(guard);
01583 
01584   return ret;
01585 }

Here is the call graph for this function:

Here is the caller graph for this function:

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::register_instance_i ( DDS::InstanceHandle_t handle,
Message_Block_Ptr  data,
const DDS::Time_t source_timestamp 
)

Delegate to the WriteDataContainer to register Must tell the transport to broadcast the registered instance upon returning.

Definition at line 1501 of file DataWriterImpl.cpp.

References ACE_TEXT(), create_control_message(), data_container_, DBG_ENTRY_LVL, OpenDDS::DCPS::EntityImpl::enabled_, OpenDDS::DCPS::DataSampleElement::get_header(), OpenDDS::DCPS::INSTANCE_REGISTRATION, LM_ERROR, monitor_, OpenDDS::DCPS::move(), OpenDDS::DCPS::Monitor::report(), DDS::RETCODE_NOT_ENABLED, DDS::RETCODE_OK, and OpenDDS::DCPS::DataSampleElement::set_sample().

Referenced by register_instance_from_durable_data().

01504 {
01505   DBG_ENTRY_LVL("DataWriterImpl","register_instance_i",6);
01506 
01507   if (enabled_ == false) {
01508     ACE_ERROR_RETURN((LM_ERROR,
01509                       ACE_TEXT("(%P|%t) ERROR: ")
01510                       ACE_TEXT("DataWriterImpl::register_instance_i: ")
01511                       ACE_TEXT(" Entity is not enabled. \n")),
01512                      DDS::RETCODE_NOT_ENABLED);
01513   }
01514 
01515   DDS::ReturnCode_t ret =
01516     this->data_container_->register_instance(handle, data);
01517 
01518   if (ret != DDS::RETCODE_OK) {
01519     ACE_ERROR_RETURN((LM_ERROR,
01520                       ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::register_instance_i: ")
01521                       ACE_TEXT("register instance with container failed.\n")),
01522                      ret);
01523   }
01524 
01525   if (this->monitor_) {
01526     this->monitor_->report();
01527   }
01528 
01529   DataSampleElement* element = 0;
01530   ret = this->data_container_->obtain_buffer_for_control(element);
01531 
01532   if (ret != DDS::RETCODE_OK) {
01533     ACE_ERROR_RETURN((LM_ERROR,
01534                       ACE_TEXT("(%P|%t) ERROR: ")
01535                       ACE_TEXT("DataWriterImpl::register_instance_i: ")
01536                       ACE_TEXT("obtain_buffer_for_control returned %d.\n"),
01537                       ret),
01538                      ret);
01539   }
01540 
01541   // Add header with the registration sample data.
01542   Message_Block_Ptr sample(create_control_message(INSTANCE_REGISTRATION,
01543                                              element->get_header(),
01544                                              move(data),
01545                                              source_timestamp));
01546 
01547   element->set_sample(move(sample));
01548 
01549   ret = this->data_container_->enqueue_control(element);
01550 
01551   if (ret != DDS::RETCODE_OK) {
01552     ACE_ERROR_RETURN((LM_ERROR,
01553                       ACE_TEXT("(%P|%t) ERROR: ")
01554                       ACE_TEXT("DataWriterImpl::register_instance_i: ")
01555                       ACE_TEXT("enqueue_control failed.\n")),
01556                      ret);
01557   }
01558 
01559   return ret;
01560 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::DataWriterImpl::remove_all_associations (  ) 

Definition at line 724 of file DataWriterImpl.cpp.

References ACE_TEXT(), DBG_ENTRY_LVL, LM_WARNING, lock_, pending_readers_, readers_, remove_associations(), size, and OpenDDS::DCPS::TransportClient::stop_associating().

Referenced by OpenDDS::DCPS::PublisherImpl::delete_datawriter().

00725 {
00726   DBG_ENTRY_LVL("DataWriterImpl", "remove_all_associations", 6);
00727   // stop pending associations
00728   this->stop_associating();
00729 
00730   OpenDDS::DCPS::ReaderIdSeq readers;
00731   CORBA::ULong size;
00732   CORBA::ULong num_pending_readers;
00733   {
00734     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, lock_);
00735 
00736     num_pending_readers = static_cast<CORBA::ULong>(pending_readers_.size());
00737     size = static_cast<CORBA::ULong>(readers_.size()) + num_pending_readers;
00738     readers.length(size);
00739 
00740     RepoIdSet::iterator itEnd = readers_.end();
00741     int i = 0;
00742 
00743     for (RepoIdSet::iterator it = readers_.begin(); it != itEnd; ++it) {
00744       readers[i ++] = *it;
00745     }
00746 
00747     itEnd = pending_readers_.end();
00748 
00749     for (RepoIdSet::iterator it = pending_readers_.begin(); it != itEnd; ++it) {
00750       readers[i ++] = *it;
00751     }
00752 
00753     if (num_pending_readers > 0) {
00754       ACE_DEBUG((LM_WARNING,
00755                  ACE_TEXT("(%P|%t) WARNING: DataWriterImpl::remove_all_associations() - ")
00756                  ACE_TEXT("%d subscribers were pending and never fully associated.\n"),
00757                  num_pending_readers));
00758     }
00759   }
00760 
00761   try {
00762     if (0 < size) {
00763       CORBA::Boolean dont_notify_lost = false;
00764 
00765       this->remove_associations(readers, dont_notify_lost);
00766     }
00767 
00768   } catch (const CORBA::Exception&) {
00769       ACE_DEBUG((LM_WARNING,
00770                  ACE_TEXT("(%P|%t) WARNING: DataWriterImpl::remove_all_associations() - ")
00771                  ACE_TEXT("caught exception from remove_associations.\n")));
00772   }
00773 }

Here is the call graph for this function:

Here is the caller graph for this function:

virtual void OpenDDS::DCPS::DataWriterImpl::remove_associations ( const ReaderIdSeq readers,
bool  callback 
) [virtual]

Implements OpenDDS::DCPS::TransportSendListener.

Referenced by remove_all_associations().

Here is the caller graph for this function:

void OpenDDS::DCPS::DataWriterImpl::reschedule_deadline (  ) 

Definition at line 2598 of file DataWriterImpl.cpp.

References data_container_, OpenDDS::DCPS::RcHandle< T >::in(), and watchdog_.

02599 {
02600   if (this->watchdog_.in()) {
02601     this->data_container_->reschedule_deadline();
02602   }
02603 }

Here is the call graph for this function:

void OpenDDS::DCPS::DataWriterImpl::retrieve_inline_qos_data ( TransportSendListener::InlineQosData qos_data  )  const [virtual]

Reimplemented from OpenDDS::DCPS::TransportSendListener.

Definition at line 2636 of file DataWriterImpl.cpp.

References OpenDDS::DCPS::TransportSendListener::InlineQosData::dw_qos, OpenDDS::DCPS::WeakRcHandle< T >::lock(), OpenDDS::DCPS::TransportSendListener::InlineQosData::pub_qos, publisher_servant_, qos_, OpenDDS::DCPS::TransportSendListener::InlineQosData::topic_name, and topic_name_.

02637 {
02638   RcHandle<PublisherImpl> publisher = this->publisher_servant_.lock();
02639   if (publisher) {
02640     publisher->get_qos(qos_data.pub_qos);
02641   }
02642   qos_data.dw_qos = this->qos_;
02643   qos_data.topic_name = this->topic_name_.in();
02644 }

Here is the call graph for this function:

void OpenDDS::DCPS::DataWriterImpl::send_all_to_flush_control ( ACE_Guard< ACE_Recursive_Thread_Mutex > &  guard  ) 

Definition at line 1484 of file DataWriterImpl.cpp.

References controlTracker, DBG_ENTRY_LVL, get_unsent_data(), OpenDDS::DCPS::MessageTracker::message_sent(), ACE_Guard< ACE_LOCK >::release(), and OpenDDS::DCPS::TransportClient::send().

Referenced by dispose(), dispose_and_unregister(), register_instance_from_durable_data(), send_request_ack(), and unregister_instance_i().

01485 {
01486   DBG_ENTRY_LVL("DataWriterImpl","send_all_to_flush_control",6);
01487 
01488   SendStateDataSampleList list;
01489 
01490   ACE_UINT64 transaction_id = this->get_unsent_data(list);
01491 
01492   controlTracker.message_sent();
01493 
01494   //need to release guard to call down to transport
01495   guard.release();
01496 
01497   this->send(list, transaction_id);
01498 }

Here is the call graph for this function:

Here is the caller graph for this function:

SendControlStatus OpenDDS::DCPS::DataWriterImpl::send_control ( const DataSampleHeader header,
Message_Block_Ptr  msg 
) [protected, virtual]

Reimplemented from OpenDDS::DCPS::TransportClient.

Definition at line 2675 of file DataWriterImpl.cpp.

References controlTracker, OpenDDS::DCPS::MessageTracker::message_dropped(), OpenDDS::DCPS::MessageTracker::message_sent(), OpenDDS::DCPS::move(), OpenDDS::DCPS::SEND_CONTROL_OK, and status.

Referenced by end_coherent_changes(), and send_liveliness().

02677 {
02678   controlTracker.message_sent();
02679 
02680   SendControlStatus status = TransportClient::send_control(header, move(msg));
02681 
02682   if (status != SEND_CONTROL_OK) {
02683     controlTracker.message_dropped();
02684   }
02685 
02686   return status;
02687 }

Here is the call graph for this function:

Here is the caller graph for this function:

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::send_end_historic_samples ( const RepoId readerId  )  [private]
bool OpenDDS::DCPS::DataWriterImpl::send_liveliness ( const ACE_Time_Value now  )  [private]

Send the liveliness message.

Definition at line 2415 of file DataWriterImpl.cpp.

References ACE_TEXT(), create_control_message(), OpenDDS::DCPS::DATAWRITER_LIVELINESS, domain_id_, header, last_liveliness_activity_time_, DDS::DataWriterQos::liveliness, LM_ERROR, DDS::MANUAL_BY_TOPIC_LIVELINESS_QOS, OpenDDS::DCPS::move(), qos_, send_control(), OpenDDS::DCPS::SEND_CONTROL_ERROR, TheServiceParticipant, and OpenDDS::DCPS::time_value_to_time().

Referenced by assert_liveliness(), and handle_timeout().

02416 {
02417   if (this->qos_.liveliness.kind == DDS::MANUAL_BY_TOPIC_LIVELINESS_QOS ||
02418       !TheServiceParticipant->get_discovery(domain_id_)->supports_liveliness()) {
02419     DDS::Time_t t = time_value_to_time(now);
02420     DataSampleHeader header;
02421     Message_Block_Ptr empty;
02422     Message_Block_Ptr liveliness_msg(
02423       this->create_control_message(DATAWRITER_LIVELINESS, header, move(empty), t));
02424 
02425     if (this->send_control(header, move(liveliness_msg)) == SEND_CONTROL_ERROR) {
02426       ACE_ERROR_RETURN((LM_ERROR,
02427                         ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::send_liveliness: ")
02428                         ACE_TEXT(" send_control failed. \n")),
02429                        false);
02430 
02431     } else {
02432       last_liveliness_activity_time_ = now;
02433       return true;
02434     }
02435   } else {
02436     last_liveliness_activity_time_ = now;
02437     return true;
02438   }
02439 }

Here is the call graph for this function:

Here is the caller graph for this function:

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::send_request_ack (  )  [private]

Definition at line 990 of file DataWriterImpl.cpp.

References ACE_TEXT(), create_control_message(), data_container_, OpenDDS::DCPS::DataSampleElement::get_header(), get_lock(), ACE_OS::gettimeofday(), LM_ERROR, OpenDDS::DCPS::move(), OpenDDS::DCPS::REQUEST_ACK, DDS::RETCODE_ERROR, DDS::RETCODE_OK, send_all_to_flush_control(), OpenDDS::DCPS::DataSampleElement::set_sample(), and OpenDDS::DCPS::time_value_to_time().

Referenced by wait_for_acknowledgments().

00991 {
00992   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00993                    guard,
00994                    get_lock(),
00995                    DDS::RETCODE_ERROR);
00996 
00997 
00998   DataSampleElement* element = 0;
00999   DDS::ReturnCode_t ret = this->data_container_->obtain_buffer_for_control(element);
01000 
01001   if (ret != DDS::RETCODE_OK) {
01002     ACE_ERROR_RETURN((LM_ERROR,
01003                       ACE_TEXT("(%P|%t) ERROR: ")
01004                       ACE_TEXT("DataWriterImpl::send_request_ack: ")
01005                       ACE_TEXT("obtain_buffer_for_control returned %d.\n"),
01006                       ret),
01007                      ret);
01008   }
01009 
01010   Message_Block_Ptr blk;
01011   // Add header with the registration sample data.
01012   Message_Block_Ptr sample(create_control_message(REQUEST_ACK,
01013                                              element->get_header(),
01014                                              move(blk),
01015                                              time_value_to_time( ACE_OS::gettimeofday() )));
01016   element->set_sample(move(sample));
01017 
01018   ret = this->data_container_->enqueue_control(element);
01019 
01020   if (ret != DDS::RETCODE_OK) {
01021     ACE_ERROR_RETURN((LM_ERROR,
01022                       ACE_TEXT("(%P|%t) ERROR: ")
01023                       ACE_TEXT("DataWriterImpl::send_request_ack: ")
01024                       ACE_TEXT("enqueue_control failed.\n")),
01025                      ret);
01026   }
01027 
01028 
01029   send_all_to_flush_control(guard);
01030 
01031   return DDS::RETCODE_OK;
01032 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::DataWriterImpl::send_suspended_data (  ) 

Called by the PublisherImpl to indicate that the Publisher is now resumed and any data collected while it was suspended should now be sent.

Definition at line 1861 of file DataWriterImpl.cpp.

References available_data_list_, max_suspended_transaction_id_, min_suspended_transaction_id_, OpenDDS::DCPS::SendStateDataSampleList::reset(), and OpenDDS::DCPS::TransportClient::send().

01862 {
01863   //this serves to get TransportClient's max_transaction_id_seen_
01864   //to the correct value for this list of transactions
01865   if (max_suspended_transaction_id_ != 0) {
01866     this->send(this->available_data_list_, max_suspended_transaction_id_);
01867     max_suspended_transaction_id_ = 0;
01868   }
01869 
01870   //this serves to actually have the send proceed in
01871   //sending the samples to the datalinks by passing it
01872   //the min_suspended_transaction_id_ which should be the
01873   //TransportClient's expected_transaction_id_
01874   this->send(this->available_data_list_, min_suspended_transaction_id_);
01875   min_suspended_transaction_id_ = 0;
01876   this->available_data_list_.reset();
01877 }

Here is the call graph for this function:

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::set_listener ( DDS::DataWriterListener_ptr  a_listener,
DDS::StatusMask  mask 
) [virtual]

Definition at line 945 of file DataWriterImpl.cpp.

References CORBA::LocalObject::_duplicate(), listener_, listener_mask_, and DDS::RETCODE_OK.

Referenced by cleanup().

00947 {
00948   listener_mask_ = mask;
00949   //note: OK to duplicate  a nil object ref
00950   listener_ = DDS::DataWriterListener::_duplicate(a_listener);
00951   return DDS::RETCODE_OK;
00952 }

Here is the call graph for this function:

Here is the caller graph for this function:

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::set_qos ( const DDS::DataWriterQos qos  )  [virtual]

Definition at line 864 of file DataWriterImpl.cpp.

References ACE_TEXT(), OpenDDS::DCPS::Qos_Helper::changeable(), OpenDDS::DCPS::Qos_Helper::consistent(), DDS::DataWriterQos::deadline, domain_id_, dp_id_, DDS::DURATION_INFINITE_NSEC, DDS::DURATION_INFINITE_SEC, OpenDDS::DCPS::duration_to_time_value(), OpenDDS::DCPS::EntityImpl::enabled_, last_deadline_missed_total_count_, LM_ERROR, OpenDDS::DCPS::WeakRcHandle< T >::lock(), lock_, offered_deadline_missed_status_, OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK, OPENDDS_NO_DURABILITY_SERVICE_COMPATIBILITY_CHECK, OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK, OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK, OPENDDS_NO_OWNERSHIP_STRENGTH_COMPATIBILITY_CHECK, publication_id_, publisher_servant_, qos_, OpenDDS::DCPS::ref(), OpenDDS::DCPS::RcHandle< T >::reset(), DDS::RETCODE_ERROR, DDS::RETCODE_IMMUTABLE_POLICY, DDS::RETCODE_INCONSISTENT_POLICY, DDS::RETCODE_OK, DDS::RETCODE_UNSUPPORTED, status, TheServiceParticipant, OpenDDS::DCPS::Qos_Helper::valid(), and watchdog_.

00865 {
00866 
00867   OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00868   OPENDDS_NO_OWNERSHIP_STRENGTH_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00869   OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00870   OPENDDS_NO_DURABILITY_SERVICE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00871   OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00872 
00873   if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
00874     if (qos_ == qos)
00875       return DDS::RETCODE_OK;
00876 
00877     if (!Qos_Helper::changeable(qos_, qos) && enabled_ == true) {
00878       return DDS::RETCODE_IMMUTABLE_POLICY;
00879 
00880     } else {
00881       Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
00882       DDS::PublisherQos publisherQos;
00883       RcHandle<PublisherImpl> publisher = this->publisher_servant_.lock();
00884 
00885       bool status = false;
00886       if (publisher) {
00887         publisher->get_qos(publisherQos);
00888         status
00889           = disco->update_publication_qos(domain_id_,
00890                                           dp_id_,
00891                                           this->publication_id_,
00892                                           qos,
00893                                           publisherQos);
00894       }
00895       if (!status) {
00896         ACE_ERROR_RETURN((LM_ERROR,
00897                           ACE_TEXT("(%P|%t) DataWriterImpl::set_qos, ")
00898                           ACE_TEXT("qos not updated. \n")),
00899                          DDS::RETCODE_ERROR);
00900       }
00901     }
00902 
00903     if (!(qos_ == qos)) {
00904       // Reset the deadline timer if the period has changed.
00905       if (qos_.deadline.period.sec != qos.deadline.period.sec
00906           || qos_.deadline.period.nanosec != qos.deadline.period.nanosec) {
00907         if (qos_.deadline.period.sec == DDS::DURATION_INFINITE_SEC
00908             && qos_.deadline.period.nanosec == DDS::DURATION_INFINITE_NSEC) {
00909           this->watchdog_= make_rch<OfferedDeadlineWatchdog>(
00910                                ref(this->lock_),
00911                                qos.deadline,
00912                                ref(*this),
00913                                ref(this->offered_deadline_missed_status_),
00914                                ref(this->last_deadline_missed_total_count_));
00915 
00916         } else if (qos.deadline.period.sec == DDS::DURATION_INFINITE_SEC
00917                    && qos.deadline.period.nanosec == DDS::DURATION_INFINITE_NSEC) {
00918           this->watchdog_->cancel_all();
00919           this->watchdog_.reset();
00920 
00921         } else {
00922           this->watchdog_->reset_interval(
00923             duration_to_time_value(qos.deadline.period));
00924         }
00925       }
00926 
00927       qos_ = qos;
00928     }
00929 
00930     return DDS::RETCODE_OK;
00931 
00932   } else {
00933     return DDS::RETCODE_INCONSISTENT_POLICY;
00934   }
00935 }

Here is the call graph for this function:

bool OpenDDS::DCPS::DataWriterImpl::should_ack (  )  const

Does this writer have samples to be acknowledged?

Definition at line 967 of file DataWriterImpl.cpp.

References readers_.

00968 {
00969   // N.B. It may be worthwhile to investigate a more efficient
00970   // heuristic for determining if a writer should send SAMPLE_ACK
00971   // control samples. Perhaps based on a sequence number delta?
00972   return this->readers_.size() != 0;
00973 }

void OpenDDS::DCPS::DataWriterImpl::track_sequence_number ( GUIDSeq filter_out  )  [private]

Definition at line 1828 of file DataWriterImpl.cpp.

References reader_info_, reader_info_lock_, and sequence_number_.

Referenced by write().

01829 {
01830   ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
01831 
01832 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
01833   // Track individual expected sequence numbers in ReaderInfo
01834   RepoIdSet excluded;
01835 
01836   if (filter_out && !reader_info_.empty()) {
01837     const GUID_t* buf = filter_out->get_buffer();
01838     excluded.insert(buf, buf + filter_out->length());
01839   }
01840 
01841   for (RepoIdToReaderInfoMap::iterator iter = reader_info_.begin(),
01842        end = reader_info_.end(); iter != end; ++iter) {
01843     // If not excluding this reader, update expected sequence
01844     if (excluded.count(iter->first) == 0) {
01845       iter->second.expected_sequence_ = sequence_number_;
01846     }
01847   }
01848 
01849 #else
01850   ACE_UNUSED_ARG(filter_out);
01851   for (RepoIdToReaderInfoMap::iterator iter = reader_info_.begin(),
01852        end = reader_info_.end(); iter != end; ++iter) {
01853     iter->second.expected_sequence_ = sequence_number_;
01854   }
01855 
01856 #endif // OPENDDS_NO_CONTENT_FILTERED_TOPIC
01857 
01858 }

Here is the caller graph for this function:

void OpenDDS::DCPS::DataWriterImpl::transport_assoc_done ( int  flags,
const RepoId remote_id 
) [virtual]

Reimplemented from OpenDDS::DCPS::TransportClient.

Definition at line 254 of file DataWriterImpl.cpp.

References ACE_TEXT(), OpenDDS::DCPS::TransportClient::ASSOC_ACTIVE, assoc_complete_readers_, OpenDDS::DCPS::TransportClient::ASSOC_OK, association_complete_i(), DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, domain_id_, dp_id_, OpenDDS::DCPS::insert(), LM_DEBUG, LM_ERROR, LM_INFO, lock_, OPENDDS_STRING, pending_readers_, publication_id_, and TheServiceParticipant.

00255 {
00256   DBG_ENTRY_LVL("DataWriterImpl", "transport_assoc_done", 6);
00257 
00258   if (!(flags & ASSOC_OK)) {
00259     if (DCPS_debug_level) {
00260       const GuidConverter conv(remote_id);
00261       ACE_ERROR((LM_ERROR,
00262                  ACE_TEXT("(%P|%t) DataWriterImpl::transport_assoc_done: ")
00263                  ACE_TEXT("ERROR: transport layer failed to associate %C\n"),
00264                  OPENDDS_STRING(conv).c_str()));
00265     }
00266 
00267     return;
00268   }
00269   if (DCPS_debug_level) {
00270     const GuidConverter writer_conv(publication_id_);
00271     const GuidConverter conv(remote_id);
00272     ACE_DEBUG((LM_INFO,
00273                ACE_TEXT("(%P|%t) DataWriterImpl::transport_assoc_done: ")
00274                ACE_TEXT(" writer %C succeeded in associating with reader %C\n"),
00275                OPENDDS_STRING(writer_conv).c_str(),
00276                OPENDDS_STRING(conv).c_str()));
00277   }
00278   if (flags & ASSOC_ACTIVE) {
00279 
00280     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, lock_);
00281 
00282     // Have we already received an association_complete() callback?
00283     if (assoc_complete_readers_.count(remote_id)) {
00284       if (DCPS_debug_level) {
00285         const GuidConverter writer_conv(publication_id_);
00286         const GuidConverter converter(remote_id);
00287         ACE_DEBUG((LM_DEBUG,
00288                    ACE_TEXT("(%P|%t) DataWriterImpl::transport_assoc_done: ")
00289                    ACE_TEXT("writer %C found assoc_complete_reader %C, continue with association_complete_i\n"),
00290                    OPENDDS_STRING(writer_conv).c_str(),
00291                    OPENDDS_STRING(converter).c_str()));
00292       }
00293       assoc_complete_readers_.erase(remote_id);
00294       association_complete_i(remote_id);
00295 
00296       // Add to pending_readers_ -> pending means we are waiting
00297       // for the association_complete() callback.
00298 
00299     } else if (OpenDDS::DCPS::insert(pending_readers_, remote_id) == -1) {
00300       const GuidConverter converter(remote_id);
00301       ACE_ERROR((LM_ERROR,
00302                  ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::transport_assoc_done: ")
00303                  ACE_TEXT("failed to mark %C as pending.\n"),
00304                  OPENDDS_STRING(converter).c_str()));
00305 
00306     } else {
00307       if (DCPS_debug_level) {
00308         const GuidConverter converter(remote_id);
00309         ACE_DEBUG((LM_DEBUG,
00310                    ACE_TEXT("(%P|%t) DataWriterImpl::transport_assoc_done: ")
00311                    ACE_TEXT("marked %C as pending.\n"),
00312                    OPENDDS_STRING(converter).c_str()));
00313       }
00314     }
00315 
00316   } else {
00317     // In the current implementation, DataWriter is always active, so this
00318     // code will not be applicable.
00319     if (DCPS_debug_level) {
00320       const GuidConverter conv(publication_id_);
00321       ACE_ERROR((LM_ERROR,
00322                  ACE_TEXT("(%P|%t) DataWriterImpl::transport_assoc_done: ")
00323                  ACE_TEXT("ERROR: DataWriter (%C) should always be active in current implementation\n"),
00324                  OPENDDS_STRING(conv).c_str()));
00325     }
00326     Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
00327     disco->association_complete(domain_id_, dp_id_,
00328                                 publication_id_, remote_id);
00329   }
00330 }

Here is the call graph for this function:

void OpenDDS::DCPS::DataWriterImpl::unregister_all (  ) 

Delegate to WriteDataContainer to unregister all instances.

Definition at line 1947 of file DataWriterImpl.cpp.

References data_container_.

Referenced by OpenDDS::DCPS::PublisherImpl::delete_datawriter().

01948 {
01949   data_container_->unregister_all();
01950 }

Here is the caller graph for this function:

void OpenDDS::DCPS::DataWriterImpl::unregister_for_reader ( const RepoId participant,
const RepoId writerid,
const RepoId readerid 
) [virtual]

Reimplemented from OpenDDS::DCPS::DataWriterCallbacks.

Definition at line 786 of file DataWriterImpl.cpp.

00789 {
00790   TransportClient::unregister_for_reader(participant, writerid, readerid);
00791 }

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::unregister_instance_i ( DDS::InstanceHandle_t  handle,
const DDS::Time_t source_timestamp 
)

Delegate to the WriteDataContainer to unregister and tell the transport to broadcast the unregistered instance.

Definition at line 1588 of file DataWriterImpl.cpp.

References ACE_TEXT(), create_control_message(), data_container_, DBG_ENTRY_LVL, dispose_and_unregister(), OpenDDS::DCPS::EntityImpl::enabled_, OpenDDS::DCPS::DataSampleElement::get_header(), get_lock(), LM_ERROR, OpenDDS::DCPS::move(), qos_, DDS::RETCODE_ERROR, DDS::RETCODE_NOT_ENABLED, DDS::RETCODE_OK, send_all_to_flush_control(), OpenDDS::DCPS::DataSampleElement::set_sample(), OpenDDS::DCPS::UNREGISTER_INSTANCE, and DDS::DataWriterQos::writer_data_lifecycle.

Referenced by OpenDDS::DCPS::DataWriterImpl_T< MessageType >::unregister_instance_w_timestamp(), and unregister_instances().

01590 {
01591   DBG_ENTRY_LVL("DataWriterImpl","unregister_instance_i",6);
01592 
01593   if (enabled_ == false) {
01594     ACE_ERROR_RETURN((LM_ERROR,
01595                       ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::unregister_instance_i: ")
01596                       ACE_TEXT(" Entity is not enabled.\n")),
01597                      DDS::RETCODE_NOT_ENABLED);
01598   }
01599 
01600   // According to spec 1.2, autodispose_unregistered_instances true causes
01601   // dispose on the instance prior to calling unregister operation.
01602   if (this->qos_.writer_data_lifecycle.autodispose_unregistered_instances) {
01603     return this->dispose_and_unregister(handle, source_timestamp);
01604   }
01605 
01606   DDS::ReturnCode_t ret = DDS::RETCODE_ERROR;
01607   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, get_lock(), ret);
01608   Message_Block_Ptr unregistered_sample_data;
01609   ret = this->data_container_->unregister(handle, unregistered_sample_data);
01610 
01611   if (ret != DDS::RETCODE_OK) {
01612     ACE_ERROR_RETURN((LM_ERROR,
01613                       ACE_TEXT("(%P|%t) ERROR: ")
01614                       ACE_TEXT("DataWriterImpl::unregister_instance_i: ")
01615                       ACE_TEXT(" unregister with container failed. \n")),
01616                      ret);
01617   }
01618 
01619   DataSampleElement* element = 0;
01620   ret = this->data_container_->obtain_buffer_for_control(element);
01621 
01622   if (ret != DDS::RETCODE_OK) {
01623     ACE_ERROR_RETURN((LM_ERROR,
01624                       ACE_TEXT("(%P|%t) ERROR: ")
01625                       ACE_TEXT("DataWriterImpl::unregister_instance_i: ")
01626                       ACE_TEXT("obtain_buffer_for_control returned %d.\n"),
01627                       ret),
01628                      ret);
01629   }
01630 
01631   Message_Block_Ptr sample(create_control_message(UNREGISTER_INSTANCE,
01632                                                   element->get_header(),
01633                                                   move(unregistered_sample_data),
01634                                                   source_timestamp));
01635   element->set_sample(move(sample));
01636   ret = this->data_container_->enqueue_control(element);
01637 
01638   if (ret != DDS::RETCODE_OK) {
01639     ACE_ERROR_RETURN((LM_ERROR,
01640                       ACE_TEXT("(%P|%t) ERROR: ")
01641                       ACE_TEXT("DataWriterImpl::unregister_instance_i: ")
01642                       ACE_TEXT("enqueue_control failed.\n")),
01643                      ret);
01644   }
01645 
01646   send_all_to_flush_control(guard);
01647   return DDS::RETCODE_OK;
01648 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::DataWriterImpl::unregister_instances ( const DDS::Time_t source_timestamp  ) 

Unregister all registered instances and tell the transport to broadcast the unregistered instances.

Definition at line 1712 of file DataWriterImpl.cpp.

References data_container_, sync_unreg_rem_assocs_lock_, and unregister_instance_i().

Referenced by OpenDDS::DCPS::PublisherImpl::delete_datawriter().

01713 {
01714   {
01715     ACE_GUARD(ACE_Thread_Mutex, guard, sync_unreg_rem_assocs_lock_);
01716 
01717     PublicationInstanceMapType::iterator it =
01718       this->data_container_->instances_.begin();
01719 
01720     while (it != this->data_container_->instances_.end()) {
01721       if (!it->second->unregistered_) {
01722         const DDS::InstanceHandle_t handle = it->first;
01723         ++it; // avoid mangling the iterator
01724         this->unregister_instance_i(handle, source_timestamp);
01725       } else {
01726         ++it;
01727       }
01728     }
01729   }
01730 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::DataWriterImpl::update_incompatible_qos ( const IncompatibleQosStatus status  )  [virtual]

Implements OpenDDS::DCPS::DataWriterCallbacks.

Definition at line 794 of file DataWriterImpl.cpp.

References OpenDDS::DCPS::IncompatibleQosStatus::count_since_last_send, CORBA::is_nil(), OpenDDS::DCPS::IncompatibleQosStatus::last_policy_id, DDS::OfferedIncompatibleQosStatus::last_policy_id, listener_for(), lock_, OpenDDS::DCPS::EntityImpl::notify_status_condition(), DDS::OFFERED_INCOMPATIBLE_QOS_STATUS, offered_incompatible_qos_status_, OpenDDS::DCPS::IncompatibleQosStatus::policies, DDS::OfferedIncompatibleQosStatus::policies, OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), OpenDDS::DCPS::IncompatibleQosStatus::total_count, DDS::OfferedIncompatibleQosStatus::total_count, and DDS::OfferedIncompatibleQosStatus::total_count_change.

00795 {
00796   DDS::DataWriterListener_var listener =
00797     listener_for(DDS::OFFERED_INCOMPATIBLE_QOS_STATUS);
00798 
00799   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00800 
00801 #if 0
00802 
00803   if (this->offered_incompatible_qos_status_.total_count == status.total_count) {
00804     // This test should make the method idempotent.
00805     return;
00806   }
00807 
00808 #endif
00809 
00810   set_status_changed_flag(DDS::OFFERED_INCOMPATIBLE_QOS_STATUS, true);
00811 
00812   // copy status and increment change
00813   offered_incompatible_qos_status_.total_count = status.total_count;
00814   offered_incompatible_qos_status_.total_count_change +=
00815     status.count_since_last_send;
00816   offered_incompatible_qos_status_.last_policy_id = status.last_policy_id;
00817   offered_incompatible_qos_status_.policies = status.policies;
00818 
00819   if (!CORBA::is_nil(listener.in())) {
00820     listener->on_offered_incompatible_qos(this, offered_incompatible_qos_status_);
00821 
00822     // TBD - Why does the spec say to change this but not change the
00823     //       ChangeFlagStatus after a listener call?
00824     offered_incompatible_qos_status_.total_count_change = 0;
00825   }
00826 
00827   notify_status_condition();
00828 }

Here is the call graph for this function:

void OpenDDS::DCPS::DataWriterImpl::update_subscription_params ( const RepoId readerId,
const DDS::StringSeq params 
) [virtual]

Implements OpenDDS::DCPS::DataWriterCallbacks.

Definition at line 831 of file DataWriterImpl.cpp.

References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, LM_WARNING, lock_, OPENDDS_STRING, publication_id_, reader_info_, reader_info_lock_, and TheServiceParticipant.

00833 {
00834 #ifdef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00835   ACE_UNUSED_ARG(readerId);
00836   ACE_UNUSED_ARG(params);
00837 #else
00838   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00839   ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
00840   RepoIdToReaderInfoMap::iterator iter = reader_info_.find(readerId);
00841 
00842   if (iter != reader_info_.end()) {
00843     iter->second.expression_params_ = params;
00844 
00845   } else if (DCPS_debug_level > 4 &&
00846              TheServiceParticipant->publisher_content_filter()) {
00847     GuidConverter pubConv(this->publication_id_), subConv(readerId);
00848     ACE_DEBUG((LM_WARNING,
00849                ACE_TEXT("(%P|%t) WARNING: DataWriterImpl::update_subscription_params()")
00850                ACE_TEXT(" - writer: %C has no info about reader: %C\n"),
00851                OPENDDS_STRING(pubConv).c_str(), OPENDDS_STRING(subConv).c_str()));
00852   }
00853 
00854 #endif
00855 }

Here is the call graph for this function:

void OpenDDS::DCPS::DataWriterImpl::wait_control_pending (  ) 

Wait until pending control elements have either been delivered or dropped.

Definition at line 2606 of file DataWriterImpl.cpp.

References controlTracker, OpenDDS::DCPS::TransportRegistry::instance(), OPENDDS_STRING, and OpenDDS::DCPS::MessageTracker::wait_messages_pending().

Referenced by OpenDDS::DCPS::PublisherImpl::delete_datawriter().

02607 {
02608   if (!TransportRegistry::instance()->released()) {
02609     OPENDDS_STRING caller_string("DataWriterImpl::wait_control_pending");
02610     controlTracker.wait_messages_pending(caller_string);
02611   }
02612 }

Here is the call graph for this function:

Here is the caller graph for this function:

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::wait_for_acknowledgments ( const DDS::Duration_t max_wait  )  [virtual]

Definition at line 1035 of file DataWriterImpl.cpp.

References ACE_TEXT(), create_ack_token(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::SequenceNumber::getValue(), LM_DEBUG, qos_, DDS::DataWriterQos::reliability, DDS::RELIABLE_RELIABILITY_QOS, DDS::RETCODE_OK, send_request_ack(), OpenDDS::DCPS::DataWriterImpl::AckToken::sequence_, and wait_for_specific_ack().

01036 {
01037   if (this->qos_.reliability.kind != DDS::RELIABLE_RELIABILITY_QOS)
01038     return DDS::RETCODE_OK;
01039 
01040   DDS::ReturnCode_t ret = send_request_ack();
01041 
01042   if (ret != DDS::RETCODE_OK)
01043     return ret;
01044 
01045   DataWriterImpl::AckToken token = create_ack_token(max_wait);
01046   if (DCPS_debug_level) {
01047     ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) DataWriterImpl::wait_for_acknowledgments")
01048                           ACE_TEXT(" waiting for acknowledgment of sequence %q at %T\n"),
01049                           token.sequence_.getValue()));
01050   }
01051   return wait_for_specific_ack(token);
01052 }

Here is the call graph for this function:

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::wait_for_specific_ack ( const AckToken token  )  [protected]

Definition at line 1055 of file DataWriterImpl.cpp.

References data_container_, OpenDDS::DCPS::DataWriterImpl::AckToken::deadline(), and OpenDDS::DCPS::DataWriterImpl::AckToken::sequence_.

Referenced by wait_for_acknowledgments().

01056 {
01057   return this->data_container_->wait_ack_of_seq(token.deadline(), token.sequence_);
01058 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::DataWriterImpl::wait_pending (  ) 

Wait for pending samples to drain.

Definition at line 2615 of file DataWriterImpl.cpp.

References data_container_, and OpenDDS::DCPS::TransportRegistry::instance().

Referenced by OpenDDS::DCPS::PublisherImpl::delete_datawriter().

02616 {
02617   if (!TransportRegistry::instance()->released()) {
02618     data_container_->wait_pending();
02619   }
02620 }

Here is the call graph for this function:

Here is the caller graph for this function:

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::write ( Message_Block_Ptr  sample,
DDS::InstanceHandle_t  handle,
const DDS::Time_t source_timestamp,
GUIDSeq filter_out 
)

Delegate to the WriteDataContainer to queue the instance sample and finally tell the transport to send the sample.

Parameters:
filter_out can either be null (if the writer can't or won't evaluate the filters), or a list of associated reader RepoIds that should NOT get the data sample due to content filtering.

Definition at line 1733 of file DataWriterImpl.cpp.

References ACE_TEXT(), available_data_list_, coherent_, coherent_samples_, create_sample_data_message(), data_container_, DBG_ENTRY_LVL, OpenDDS::DCPS::EntityImpl::enabled_, OpenDDS::DCPS::SendStateDataSampleList::enqueue_tail(), OpenDDS::DCPS::DataSampleElement::get_header(), get_lock(), get_unsent_data(), ACE_OS::gettimeofday(), last_liveliness_activity_time_, LM_ERROR, OpenDDS::DCPS::WeakRcHandle< T >::lock(), max_suspended_transaction_id_, min_suspended_transaction_id_, OpenDDS::DCPS::move(), publisher_servant_, DDS::RETCODE_ERROR, DDS::RETCODE_NOT_ENABLED, DDS::RETCODE_OK, DDS::RETCODE_TIMEOUT, OpenDDS::DCPS::TransportClient::send(), OpenDDS::DCPS::DataSampleElement::set_filter_out(), OpenDDS::DCPS::DataSampleElement::set_sample(), and track_sequence_number().

Referenced by OpenDDS::DCPS::DataDurabilityCache::get_data().

01737 {
01738   DBG_ENTRY_LVL("DataWriterImpl","write",6);
01739 
01740   ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
01741                     guard,
01742                     get_lock (),
01743                     DDS::RETCODE_ERROR);
01744 
01745   // take ownership of sequence allocated in FooDWImpl::write_w_timestamp()
01746   GUIDSeq_var filter_out_var(filter_out);
01747 
01748   if (enabled_ == false) {
01749     ACE_ERROR_RETURN((LM_ERROR,
01750                       ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::write: ")
01751                       ACE_TEXT(" Entity is not enabled. \n")),
01752                      DDS::RETCODE_NOT_ENABLED);
01753   }
01754 
01755   DataSampleElement* element = 0;
01756   DDS::ReturnCode_t ret = this->data_container_->obtain_buffer(element, handle);
01757 
01758   if (ret == DDS::RETCODE_TIMEOUT) {
01759     return ret; // silent for timeout
01760 
01761   } else if (ret != DDS::RETCODE_OK) {
01762     ACE_ERROR_RETURN((LM_ERROR,
01763                       ACE_TEXT("(%P|%t) ERROR: ")
01764                       ACE_TEXT("DataWriterImpl::write: ")
01765                       ACE_TEXT("obtain_buffer returned %d.\n"),
01766                       ret),
01767                      ret);
01768   }
01769 
01770   Message_Block_Ptr temp;
01771   ret = create_sample_data_message(move(data),
01772                                    handle,
01773                                    element->get_header(),
01774                                    temp,
01775                                    source_timestamp,
01776                                    (filter_out != 0));
01777   element->set_sample(move(temp));
01778 
01779   if (ret != DDS::RETCODE_OK) {
01780     return ret;
01781   }
01782 
01783   element->set_filter_out(filter_out_var._retn()); // ownership passed to element
01784 
01785   ret = this->data_container_->enqueue(element, handle);
01786 
01787   if (ret != DDS::RETCODE_OK) {
01788     ACE_ERROR_RETURN((LM_ERROR,
01789                       ACE_TEXT("(%P|%t) ERROR: ")
01790                       ACE_TEXT("DataWriterImpl::write: ")
01791                       ACE_TEXT("enqueue failed.\n")),
01792                      ret);
01793   }
01794   this->last_liveliness_activity_time_ = ACE_OS::gettimeofday();
01795 
01796   track_sequence_number(filter_out);
01797 
01798   if (this->coherent_) {
01799     ++this->coherent_samples_;
01800   }
01801   SendStateDataSampleList list;
01802 
01803   ACE_UINT64 transaction_id = this->get_unsent_data(list);
01804 
01805   RcHandle<PublisherImpl> publisher = this->publisher_servant_.lock();
01806   if (!publisher || publisher->is_suspended()) {
01807     if (min_suspended_transaction_id_ == 0) {
01808       //provides transaction id for lower bound of suspended transactions
01809       //or transaction id for single suspended write transaction
01810       min_suspended_transaction_id_ = transaction_id;
01811     } else {
01812       //when multiple write transactions have suspended, provides the upper bound
01813       //for suspended transactions.
01814       max_suspended_transaction_id_ = transaction_id;
01815     }
01816     this->available_data_list_.enqueue_tail(list);
01817 
01818   } else {
01819     guard.release();
01820 
01821     this->send(list, transaction_id);
01822   }
01823 
01824   return DDS::RETCODE_OK;
01825 }

Here is the call graph for this function:

Here is the caller graph for this function:


Friends And Related Function Documentation

friend class ::DDS_TEST [friend]

Reimplemented from OpenDDS::DCPS::TransportClient.

Reimplemented in OpenDDS::DCPS::DataWriterImpl_T< MessageType >.

Definition at line 569 of file DataWriterImpl.h.

friend class PublisherImpl [friend]

Definition at line 91 of file DataWriterImpl.h.

friend class WriteDataContainer [friend]

Definition at line 90 of file DataWriterImpl.h.

Referenced by enable().


Member Data Documentation

Definition at line 661 of file DataWriterImpl.h.

Referenced by association_complete(), and transport_assoc_done().

The multiplier for allocators affected by associations.

Definition at line 480 of file DataWriterImpl.h.

Referenced by enable().

Definition at line 666 of file DataWriterImpl.h.

Referenced by association_complete_i(), send_suspended_data(), and write().

Flag indicating DataWriter current belongs to a coherent change set.

Definition at line 598 of file DataWriterImpl.h.

Referenced by begin_coherent_changes(), coherent_changes_pending(), create_sample_data_message(), end_coherent_changes(), and write().

The number of samples belonging to the current coherent change set.

Definition at line 601 of file DataWriterImpl.h.

Referenced by end_coherent_changes(), and write().

Definition at line 405 of file DataWriterImpl.h.

Referenced by data_delivered().

Statistics counter.

Definition at line 404 of file DataWriterImpl.h.

Referenced by data_dropped().

The data block allocator.

Reimplemented in OpenDDS::DCPS::DataWriterImpl_T< MessageType >.

Definition at line 639 of file DataWriterImpl.h.

Referenced by create_control_message(), create_sample_data_message(), and enable().

Definition at line 573 of file DataWriterImpl.h.

The domain id.

Definition at line 588 of file DataWriterImpl.h.

Referenced by enable(), init(), send_liveliness(), set_qos(), and transport_assoc_done().

Definition at line 589 of file DataWriterImpl.h.

Referenced by enable(), get_dp_id(), set_qos(), and transport_assoc_done().

The header data allocator.

Definition at line 641 of file DataWriterImpl.h.

Referenced by create_sample_data_message(), and enable().

Definition at line 610 of file DataWriterImpl.h.

Referenced by association_complete_i(), and get_matched_subscriptions().

Flag indicates that this datawriter is a builtin topic datawriter.

Definition at line 659 of file DataWriterImpl.h.

Referenced by add_association(), association_complete(), association_complete_i(), init(), notify_publication_disconnected(), notify_publication_lost(), and notify_publication_reconnected().

Total number of offered deadlines missed during last offered deadline status check.

Definition at line 652 of file DataWriterImpl.h.

Referenced by enable(), get_offered_deadline_missed_status(), and set_qos().

Timestamp of last write/dispose/assert_liveliness.

Definition at line 649 of file DataWriterImpl.h.

Referenced by handle_timeout(), participant_liveliness_activity_after(), send_liveliness(), and write().

DDS::DataWriterListener_var OpenDDS::DCPS::DataWriterImpl::listener_ [private]

Used to notify the entity for relevant events.

Definition at line 586 of file DataWriterImpl.h.

Referenced by get_listener(), init(), listener_for(), notify_publication_disconnected(), notify_publication_lost(), notify_publication_reconnected(), and set_listener().

The StatusKind bit mask indicates which status condition change can be notified by the listener of this entity.

Definition at line 584 of file DataWriterImpl.h.

Referenced by init(), listener_for(), and set_listener().

Definition at line 683 of file DataWriterImpl.h.

Referenced by assert_liveliness_by_participant(), and handle_timeout().

The time interval for sending liveliness message.

Definition at line 647 of file DataWriterImpl.h.

Referenced by enable(), handle_timeout(), and liveliness_check_interval().

True if the writer failed to actively signal its liveliness within its offered liveliness period.

Definition at line 622 of file DataWriterImpl.h.

Referenced by handle_timeout().

Status conditions.

Definition at line 615 of file DataWriterImpl.h.

Referenced by DataWriterImpl(), get_liveliness_lost_status(), and handle_timeout().

Definition at line 688 of file DataWriterImpl.h.

Referenced by enable(), and handle_timeout().

Definition at line 665 of file DataWriterImpl.h.

Referenced by send_suspended_data(), and write().

The message block allocator.

Todo:
The publication_lost_status_ and publication_reconnecting_status_ are left here for future use when we add get_publication_lost_status() and get_publication_reconnecting_status() methods.

Reimplemented in OpenDDS::DCPS::DataWriterImpl_T< MessageType >.

Definition at line 637 of file DataWriterImpl.h.

Referenced by create_control_message(), create_sample_data_message(), and enable().

The cached available data while suspending and associated transaction ids.

Definition at line 664 of file DataWriterImpl.h.

Referenced by send_suspended_data(), and write().

Monitor object for this entity.

Definition at line 669 of file DataWriterImpl.h.

Referenced by association_complete_i(), DataWriterImpl(), enable(), and register_instance_i().

The number of chunks for the cached allocator.

Definition at line 477 of file DataWriterImpl.h.

Referenced by enable().

The participant servant which creats the publisher that creates this datawriter.

Definition at line 491 of file DataWriterImpl.h.

Referenced by add_association(), assert_liveliness(), association_complete_i(), enable(), get_instance_handle(), get_matched_subscription_data(), get_next_handle(), init(), and lookup_instance_handles().

Periodic Monitor object for this entity.

Definition at line 672 of file DataWriterImpl.h.

Referenced by DataWriterImpl().

The orb's reactor to be used to register the liveliness timer.

Definition at line 645 of file DataWriterImpl.h.

Referenced by enable(), handle_timeout(), and init().

RepoIdToReaderInfoMap OpenDDS::DCPS::DataWriterImpl::reader_info_ [protected]

The sequence number unique in DataWriter scope.

Definition at line 595 of file DataWriterImpl.h.

Referenced by create_ack_token(), create_control_message(), create_sample_data_message(), end_coherent_changes(), need_sequence_repair_i(), and track_sequence_number().

Definition at line 687 of file DataWriterImpl.h.

Referenced by unregister_instances().

The associated topic repository id.

Definition at line 578 of file DataWriterImpl.h.

Referenced by init().

The name of associated topic.

Definition at line 576 of file DataWriterImpl.h.

Referenced by enable(), init(), and retrieve_inline_qos_data().

The topic servant.

Definition at line 580 of file DataWriterImpl.h.

Referenced by cleanup(), enable(), filter_out(), get_topic(), inconsistent_topic(), and init().

The type name of associated topic.

Definition at line 484 of file DataWriterImpl.h.

Referenced by get_type_name(), and init().


The documentation for this class was generated from the following files:
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1