OpenDDS::DCPS::PublisherImpl Class Reference

Implements the OpenDDS::DCPS::Publisher interfaces. More...

#include <PublisherImpl.h>

Inheritance diagram for OpenDDS::DCPS::PublisherImpl:

Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::PublisherImpl:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 PublisherImpl (DDS::InstanceHandle_t handle, RepoId id, const DDS::PublisherQos &qos, DDS::PublisherListener_ptr a_listener, const DDS::StatusMask &mask, DomainParticipantImpl *participant)
virtual ~PublisherImpl ()
virtual DDS::InstanceHandle_t get_instance_handle ()
bool contains_writer (DDS::InstanceHandle_t a_handle)
virtual DDS::DataWriter_ptr create_datawriter (DDS::Topic_ptr a_topic, const DDS::DataWriterQos &qos, DDS::DataWriterListener_ptr a_listener, DDS::StatusMask mask)
virtual DDS::ReturnCode_t delete_datawriter (DDS::DataWriter_ptr a_datawriter)
virtual DDS::DataWriter_ptr lookup_datawriter (const char *topic_name)
virtual DDS::ReturnCode_t delete_contained_entities ()
virtual DDS::ReturnCode_t set_qos (const DDS::PublisherQos &qos)
virtual DDS::ReturnCode_t get_qos (DDS::PublisherQos &qos)
virtual DDS::ReturnCode_t set_listener (DDS::PublisherListener_ptr a_listener, DDS::StatusMask mask)
virtual DDS::PublisherListener_ptr get_listener ()
virtual DDS::ReturnCode_t suspend_publications ()
virtual DDS::ReturnCode_t resume_publications ()
virtual DDS::ReturnCode_t begin_coherent_changes ()
virtual DDS::ReturnCode_t end_coherent_changes ()
virtual DDS::ReturnCode_t wait_for_acknowledgments (const DDS::Duration_t &max_wait)
virtual DDS::DomainParticipant_ptr get_participant ()
virtual DDS::ReturnCode_t set_default_datawriter_qos (const DDS::DataWriterQos &qos)
virtual DDS::ReturnCode_t get_default_datawriter_qos (DDS::DataWriterQos &qos)
virtual DDS::ReturnCode_t copy_from_topic_qos (DDS::DataWriterQos &a_datawriter_qos, const DDS::TopicQos &a_topic_qos)
virtual DDS::ReturnCode_t enable ()
ACE_Recursive_Thread_Mutex & get_pi_lock ()
bool is_clean () const
DDS::ReturnCode_t writer_enabled (const char *topic_name, DataWriterImpl *impl)
DDS::PublisherListener_ptr listener_for (::DDS::StatusKind kind)
DDS::ReturnCode_t assert_liveliness_by_participant ()
ACE_Time_Value liveliness_check_interval (DDS::LivelinessQosPolicyKind kind)
bool participant_liveliness_activity_after (const ACE_Time_Value &tv)
typedef OPENDDS_VECTOR (PublicationId) PublicationIdVec
void get_publication_ids (PublicationIdVec &pubs)
bool is_suspended () const
virtual EntityImplparent () const

Static Public Member Functions

static bool validate_datawriter_qos (const DDS::DataWriterQos &qos, const DDS::DataWriterQos &default_qos, DDS::Topic_ptr a_topic, DDS::DataWriterQos &dw_qos)

Private Types

typedef ACE_Recursive_Thread_Mutex lock_type
typedef ACE_Reverse_Lock<
lock_type
reverse_lock_type

Private Member Functions

typedef OPENDDS_MULTIMAP (OPENDDS_STRING, DataWriterImpl *) DataWriterMap
typedef OPENDDS_MAP_CMP (PublicationId, DataWriterImpl *, GUID_tKeyLessThan) PublicationMap
typedef OPENDDS_MAP_CMP (RepoId, DDS::DataWriterQos, GUID_tKeyLessThan) DwIdToQosMap

Private Attributes

DDS::InstanceHandle_t handle_
DDS::PublisherQos qos_
 Publisher QoS policy list.
DDS::DataWriterQos default_datawriter_qos_
 Default datawriter Qos policy list.
DDS::StatusMask listener_mask_
DDS::PublisherListener_var listener_
 Used to notify the entity for relevant events.
DataWriterMap datawriter_map_
 This map is used to support datawriter lookup by topic name.
PublicationMap publication_map_
std::size_t change_depth_
 The number of times begin_coherent_changes as been called.
DDS::DomainId_t domain_id_
 Domain in which we are contained.
DomainParticipantImplparticipant_
 The DomainParticipant servant that owns this Publisher.
CORBA::Short suspend_depth_count_
 The suspend depth count.
SequenceNumber sequence_number_
ACE_Time_Value aggregation_period_start_
 Start of current aggregation period. - NOT USED IN FIRST IMPL.
lock_type pi_lock_
 The recursive lock to protect datawriter map and suspend count.
reverse_lock_type reverse_pi_lock_
Monitormonitor_
 Monitor object for this entity.
RepoId publisher_id_

Friends

class DataWriterImpl

Detailed Description

Implements the OpenDDS::DCPS::Publisher interfaces.

This class acts as a factory and container of the datawriter.

See the DDS specification, OMG formal/04-12-02, for a description of the interface this class is implementing.

Definition at line 37 of file PublisherImpl.h.


Member Typedef Documentation

typedef ACE_Recursive_Thread_Mutex OpenDDS::DCPS::PublisherImpl::lock_type [private]

Definition at line 193 of file PublisherImpl.h.

typedef ACE_Reverse_Lock<lock_type> OpenDDS::DCPS::PublisherImpl::reverse_lock_type [private]

Definition at line 194 of file PublisherImpl.h.


Constructor & Destructor Documentation

OpenDDS::DCPS::PublisherImpl::PublisherImpl ( DDS::InstanceHandle_t  handle,
RepoId  id,
const DDS::PublisherQos qos,
DDS::PublisherListener_ptr  a_listener,
const DDS::StatusMask mask,
DomainParticipantImpl participant 
)

Definition at line 29 of file PublisherImpl.cpp.

References monitor_, and TheServiceParticipant.

00035 : handle_(handle),
00036   qos_(qos),
00037   default_datawriter_qos_(TheServiceParticipant->initial_DataWriterQos()),
00038   listener_mask_(mask),
00039   listener_(DDS::PublisherListener::_duplicate(a_listener)),
00040 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00041   change_depth_(0),
00042 #endif
00043   domain_id_(participant->get_domain_id()),
00044   participant_(participant),
00045   suspend_depth_count_(0),
00046   sequence_number_(),
00047   aggregation_period_start_(ACE_Time_Value::zero),
00048   reverse_pi_lock_(pi_lock_),
00049   monitor_(0),
00050   publisher_id_(id)
00051 {
00052   monitor_ = TheServiceParticipant->monitor_factory_->create_publisher_monitor(this);
00053 }

OpenDDS::DCPS::PublisherImpl::~PublisherImpl (  )  [virtual]

Definition at line 56 of file PublisherImpl.cpp.

References is_clean().

00057 {
00058   //The datawriters should be deleted already before calling delete
00059   //publisher.
00060   if (!is_clean()) {
00061     ACE_ERROR((LM_ERROR,
00062         ACE_TEXT("(%P|%t) ERROR: ")
00063         ACE_TEXT("PublisherImpl::~PublisherImpl, ")
00064         ACE_TEXT("some datawriters still exist.\n")));
00065   }
00066 }


Member Function Documentation

DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::assert_liveliness_by_participant (  ) 

Definition at line 818 of file PublisherImpl.cpp.

References datawriter_map_, and DDS::RETCODE_OK.

00819 {
00820   DDS::ReturnCode_t ret = DDS::RETCODE_OK;
00821 
00822   for (DataWriterMap::iterator it(datawriter_map_.begin());
00823       it != datawriter_map_.end(); ++it) {
00824     DDS::ReturnCode_t dw_ret = it->second->assert_liveliness_by_participant();
00825 
00826     if (dw_ret != DDS::RETCODE_OK) {
00827       ret = dw_ret;
00828     }
00829   }
00830 
00831   return ret;
00832 }

DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::begin_coherent_changes (  )  [virtual]

Implements DDS::Publisher.

Definition at line 519 of file PublisherImpl.cpp.

References change_depth_, OpenDDS::DCPS::EntityImpl::enabled_, DDS::INSTANCE_PRESENTATION_QOS, DDS::PublisherQos::presentation, publication_map_, qos_, DDS::RETCODE_ERROR, DDS::RETCODE_NOT_ENABLED, and DDS::RETCODE_OK.

00520 {
00521   if (enabled_ == false) {
00522     ACE_ERROR_RETURN((LM_ERROR,
00523         ACE_TEXT("(%P|%t) ERROR: PublisherImpl::begin_coherent_changes:")
00524         ACE_TEXT(" Publisher is not enabled!\n")),
00525         DDS::RETCODE_NOT_ENABLED);
00526   }
00527 
00528   if (!qos_.presentation.coherent_access) {
00529     ACE_ERROR_RETURN((LM_ERROR,
00530         ACE_TEXT("(%P|%t) ERROR: PublisherImpl::begin_coherent_changes:")
00531         ACE_TEXT(" QoS policy does not support coherent access!\n")),
00532         DDS::RETCODE_ERROR);
00533   }
00534 
00535   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00536       guard,
00537       this->pi_lock_,
00538       DDS::RETCODE_ERROR);
00539 
00540   ++this->change_depth_;
00541 
00542   if (qos_.presentation.access_scope == DDS::INSTANCE_PRESENTATION_QOS) {
00543     // INSTANCE access scope essentially behaves
00544     // as a no-op. (see: 7.1.3.6)
00545     return DDS::RETCODE_OK;
00546   }
00547 
00548   // We should only notify publications on the first
00549   // and last change to the current change set:
00550   if (this->change_depth_ == 1) {
00551     for (PublicationMap::iterator it = this->publication_map_.begin();
00552         it != this->publication_map_.end(); ++it) {
00553       it->second->begin_coherent_changes();
00554     }
00555   }
00556 
00557   return DDS::RETCODE_OK;
00558 }

bool OpenDDS::DCPS::PublisherImpl::contains_writer ( DDS::InstanceHandle_t  a_handle  ) 

Definition at line 75 of file PublisherImpl.cpp.

References datawriter_map_, and DDS::RETCODE_ERROR.

00076 {
00077   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00078       guard,
00079       this->pi_lock_,
00080       DDS::RETCODE_ERROR);
00081 
00082   for (DataWriterMap::iterator it(datawriter_map_.begin());
00083       it != datawriter_map_.end(); ++it) {
00084     if (a_handle == it->second->get_instance_handle()) {
00085       return true;
00086     }
00087   }
00088 
00089   return false;
00090 }

DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::copy_from_topic_qos ( DDS::DataWriterQos a_datawriter_qos,
const DDS::TopicQos a_topic_qos 
) [virtual]

Definition at line 725 of file PublisherImpl.cpp.

References OpenDDS::DCPS::Qos_Helper::copy_from_topic_qos(), DDS::RETCODE_INCONSISTENT_POLICY, and DDS::RETCODE_OK.

00727 {
00728   if (Qos_Helper::copy_from_topic_qos(a_datawriter_qos, a_topic_qos)) {
00729     return DDS::RETCODE_OK;
00730   } else {
00731     return DDS::RETCODE_INCONSISTENT_POLICY;
00732   }
00733 }

DDS::DataWriter_ptr OpenDDS::DCPS::PublisherImpl::create_datawriter ( DDS::Topic_ptr  a_topic,
const DDS::DataWriterQos qos,
DDS::DataWriterListener_ptr  a_listener,
DDS::StatusMask  mask 
) [virtual]

Definition at line 93 of file PublisherImpl.cpp.

References default_datawriter_qos_, OpenDDS::DCPS::DataWriterImpl::enable(), DDS::PublisherQos::entity_factory, OpenDDS::DCPS::TopicDescriptionImpl::get_name(), OpenDDS::DCPS::TopicDescriptionImpl::get_type_support(), OpenDDS::DCPS::DataWriterImpl::init(), participant_, qos_, DDS::RETCODE_OK, and validate_datawriter_qos().

00098 {
00099   DDS::DataWriterQos dw_qos;
00100 
00101   if (!validate_datawriter_qos(qos, default_datawriter_qos_, a_topic, dw_qos)) {
00102     return DDS::DataWriter::_nil();
00103   }
00104 
00105   TopicImpl* topic_servant = dynamic_cast<TopicImpl*>(a_topic);
00106 
00107   OpenDDS::DCPS::TypeSupport_ptr typesupport =
00108       topic_servant->get_type_support();
00109 
00110   if (typesupport == 0) {
00111     CORBA::String_var name = topic_servant->get_name();
00112     ACE_ERROR((LM_ERROR,
00113         ACE_TEXT("(%P|%t) ERROR: ")
00114         ACE_TEXT("PublisherImpl::create_datawriter, ")
00115         ACE_TEXT("typesupport(topic_name=%C) is nil.\n"),
00116         name.in()));
00117     return DDS::DataWriter::_nil();
00118   }
00119 
00120   DDS::DataWriter_var dw_obj = typesupport->create_datawriter();
00121 
00122   DataWriterImpl* dw_servant =
00123       dynamic_cast <DataWriterImpl*>(dw_obj.in());
00124 
00125   dw_servant->init(a_topic,
00126       topic_servant,
00127       dw_qos,
00128       a_listener,
00129       mask,
00130       participant_,
00131       this,
00132       dw_obj.in());
00133 
00134   if (this->enabled_ == true
00135       && qos_.entity_factory.autoenable_created_entities == 1) {
00136 
00137     DDS::ReturnCode_t ret = dw_servant->enable();
00138 
00139     if (ret != DDS::RETCODE_OK) {
00140       ACE_ERROR((LM_ERROR,
00141           ACE_TEXT("(%P|%t) ERROR: ")
00142           ACE_TEXT("PublisherImpl::create_datawriter, ")
00143           ACE_TEXT("enable failed.\n")));
00144       return DDS::DataWriter::_nil();
00145     }
00146   }
00147 
00148   return DDS::DataWriter::_duplicate(dw_obj.in());
00149 }

DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::delete_contained_entities (  )  [virtual]

Implements DDS::Publisher.

Definition at line 312 of file PublisherImpl.cpp.

References datawriter_map_, delete_datawriter(), OpenDDS::DCPS::DataWriterImpl::get_publication_id(), OpenDDS::DCPS::GUID_UNKNOWN, OPENDDS_STRING, DDS::RETCODE_ERROR, DDS::RETCODE_OK, and OpenDDS::DCPS::EntityImpl::set_deleted().

00313 {
00314   // mark that the entity is being deleted
00315   set_deleted(true);
00316 
00317   while (true) {
00318     PublicationId pub_id = GUID_UNKNOWN;
00319     DataWriterImpl* a_datawriter;
00320 
00321     {
00322       ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00323           guard,
00324           this->pi_lock_,
00325           DDS::RETCODE_ERROR);
00326 
00327       if (datawriter_map_.empty()) {
00328         break;
00329       } else {
00330         a_datawriter = datawriter_map_.begin()->second;
00331         pub_id = a_datawriter->get_publication_id();
00332       }
00333     }
00334 
00335     DDS::ReturnCode_t ret = delete_datawriter(a_datawriter);
00336 
00337     if (ret != DDS::RETCODE_OK) {
00338       GuidConverter converter(pub_id);
00339       ACE_ERROR_RETURN((LM_ERROR,
00340           ACE_TEXT("(%P|%t) ERROR: ")
00341           ACE_TEXT("PublisherImpl::")
00342           ACE_TEXT("delete_contained_entities: ")
00343           ACE_TEXT("failed to delete ")
00344           ACE_TEXT("datawriter %C.\n"),
00345           OPENDDS_STRING(converter).c_str()),ret);
00346     }
00347   }
00348 
00349   // the publisher can now start creating new publications
00350   set_deleted(false);
00351 
00352   return DDS::RETCODE_OK;
00353 }

DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::delete_datawriter ( DDS::DataWriter_ptr  a_datawriter  )  [virtual]

Definition at line 152 of file PublisherImpl.cpp.

References OpenDDS::DCPS::DataWriterImpl::cleanup(), datawriter_map_, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::DataWriterImpl::get_publication_id(), OpenDDS::DCPS::DataWriterImpl::get_publisher(), OpenDDS::DCPS::DataWriterImpl::get_topic_name(), OpenDDS::DCPS::GUID_UNKNOWN, OPENDDS_STRING, participant_, OpenDDS::DCPS::DataWriterImpl::persist_data(), OpenDDS::DCPS::DataWriterImpl::prepare_to_delete(), publication_map_, OpenDDS::DCPS::DomainParticipantImpl::remove_adjust_liveliness_timers(), OpenDDS::DCPS::DataWriterImpl::remove_all_associations(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, DDS::RETCODE_PRECONDITION_NOT_MET, TheServiceParticipant, OpenDDS::DCPS::time_value_to_time(), OpenDDS::DCPS::DataWriterImpl::unregister_all(), OpenDDS::DCPS::DataWriterImpl::unregister_instances(), OpenDDS::DCPS::DataWriterImpl::wait_control_pending(), and OpenDDS::DCPS::DataWriterImpl::wait_pending().

Referenced by delete_contained_entities().

00153 {
00154   DataWriterImpl* dw_servant = dynamic_cast<DataWriterImpl*>(a_datawriter);
00155   if (dw_servant) {
00156     // marks entity as deleted and stops future associating
00157     dw_servant->prepare_to_delete();
00158   }
00159 
00160   if (!dw_servant) {
00161     ACE_ERROR((LM_ERROR,
00162               "(%P|%t) PublisherImpl::delete_datawriter - dynamic cast to DataWriterImpl failed\n"
00163     ));
00164     return DDS::RETCODE_ERROR;
00165   }
00166 
00167   {
00168     DDS::Publisher_var dw_publisher(dw_servant->get_publisher());
00169 
00170     if (dw_publisher.in() != this) {
00171       RepoId id = dw_servant->get_publication_id();
00172       GuidConverter converter(id);
00173       ACE_ERROR((LM_ERROR,
00174           ACE_TEXT("(%P|%t) PublisherImpl::delete_datawriter: ")
00175           ACE_TEXT("the data writer %C doesn't ")
00176           ACE_TEXT("belong to this subscriber \n"),
00177           OPENDDS_STRING(converter).c_str()));
00178       return DDS::RETCODE_PRECONDITION_NOT_MET;
00179     }
00180   }
00181 
00182 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
00183   // Trigger data to be persisted, i.e. made durable, if so
00184   // configured. This needs be called before unregister_instances
00185   // because unregister_instances may cause instance dispose.
00186   if (!dw_servant->persist_data() && DCPS_debug_level >= 2) {
00187     ACE_ERROR((LM_ERROR,
00188         ACE_TEXT("(%P|%t) ERROR: ")
00189         ACE_TEXT("PublisherImpl::delete_datawriter, ")
00190         ACE_TEXT("failed to make data durable.\n")));
00191   }
00192 #endif
00193 
00194   // Unregister all registered instances prior to deletion.
00195   DDS::Time_t source_timestamp = time_value_to_time(ACE_OS::gettimeofday());
00196   dw_servant->unregister_instances(source_timestamp);
00197 
00198   // Wait for any control messages to be transported during
00199   // unregistering of instances.
00200   dw_servant->wait_pending();
00201   dw_servant->wait_control_pending();
00202 
00203   CORBA::String_var topic_name = dw_servant->get_topic_name();
00204   RepoId publication_id  = GUID_UNKNOWN;
00205   {
00206     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00207         guard,
00208         this->pi_lock_,
00209         DDS::RETCODE_ERROR);
00210 
00211     publication_id = dw_servant->get_publication_id();
00212 
00213     PublicationMap::iterator it = publication_map_.find(publication_id);
00214 
00215     if (it == publication_map_.end()) {
00216       GuidConverter converter(publication_id);
00217       ACE_ERROR_RETURN((LM_ERROR,
00218           ACE_TEXT("(%P|%t) ERROR: ")
00219           ACE_TEXT("PublisherImpl::delete_datawriter, ")
00220           ACE_TEXT("datawriter %C not found.\n"),
00221           OPENDDS_STRING(converter).c_str()), DDS::RETCODE_ERROR);
00222     }
00223 
00224     // We can not erase the datawriter from datawriter map by the topic name
00225     // because the map might have multiple datawriters with the same topic
00226     // name.
00227     // Find the iterator to the datawriter in the datawriter map and erase
00228     // by the iterator.
00229     DataWriterMap::iterator writ;
00230     DataWriterMap::iterator the_writ = datawriter_map_.end();
00231 
00232     for (writ = datawriter_map_.begin();
00233         writ != datawriter_map_.end();
00234         ++writ) {
00235       if (writ->second == it->second) {
00236         the_writ = writ;
00237         break;
00238       }
00239     }
00240 
00241     if (the_writ != datawriter_map_.end()) {
00242       datawriter_map_.erase(the_writ);
00243     }
00244 
00245     publication_map_.erase(it);
00246 
00247     // Release pi_lock_ before making call to transport layer to avoid
00248     // some deadlock situations that threads acquire locks(PublisherImpl
00249     // pi_lock_, TransportClient reservation_lock and TransportImpl
00250     // lock_) in reverse order.
00251     ACE_GUARD_RETURN(reverse_lock_type, reverse_monitor, this->reverse_pi_lock_,
00252         DDS::RETCODE_ERROR);
00253     // Wait for pending samples to drain prior to removing associations
00254     // and unregistering the publication.
00255     dw_servant->wait_pending();
00256     // Call remove association before unregistering the datawriter
00257     // with the transport, otherwise some callbacks resulted from
00258     // remove_association may lost.
00259     dw_servant->remove_all_associations();
00260     dw_servant->cleanup();
00261   }
00262   // not just unregister but remove any pending writes/sends.
00263   dw_servant->unregister_all();
00264   Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
00265   if (!disco->remove_publication(
00266       this->domain_id_,
00267       this->participant_->get_id(),
00268       publication_id)) {
00269     ACE_ERROR_RETURN((LM_ERROR,
00270         ACE_TEXT("(%P|%t) ERROR: ")
00271         ACE_TEXT("PublisherImpl::delete_datawriter, ")
00272         ACE_TEXT("publication not removed from discovery.\n")),
00273         DDS::RETCODE_ERROR);
00274   }
00275   // Decrease ref count after the servant is removed from the maps.
00276   dw_servant->_remove_ref();
00277 
00278   participant_->remove_adjust_liveliness_timers();
00279 
00280   return DDS::RETCODE_OK;
00281 }

DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::enable (  )  [virtual]

Implements DDS::Entity.

Definition at line 736 of file PublisherImpl.cpp.

References monitor_, OpenDDS::DCPS::Monitor::report(), DDS::RETCODE_OK, DDS::RETCODE_PRECONDITION_NOT_MET, and OpenDDS::DCPS::EntityImpl::set_enabled().

Referenced by OpenDDS::DCPS::DomainParticipantImpl::create_publisher().

00737 {
00738   //According spec:
00739   // - Calling enable on an already enabled Entity returns OK and has no
00740   // effect.
00741   // - Calling enable on an Entity whose factory is not enabled will fail
00742   // and return PRECONDITION_NOT_MET.
00743 
00744   if (this->is_enabled()) {
00745     return DDS::RETCODE_OK;
00746   }
00747 
00748   if (this->participant_->is_enabled() == false) {
00749     return DDS::RETCODE_PRECONDITION_NOT_MET;
00750   }
00751 
00752   if (this->monitor_) {
00753     this->monitor_->report();
00754   }
00755 
00756   this->set_enabled();
00757   return DDS::RETCODE_OK;
00758 }

DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::end_coherent_changes (  )  [virtual]

Implements DDS::Publisher.

Definition at line 561 of file PublisherImpl.cpp.

References change_depth_, OpenDDS::DCPS::EntityImpl::enabled_, DDS::INSTANCE_PRESENTATION_QOS, DDS::PublisherQos::presentation, publication_map_, qos_, DDS::RETCODE_ERROR, DDS::RETCODE_NOT_ENABLED, DDS::RETCODE_OK, and DDS::RETCODE_PRECONDITION_NOT_MET.

00562 {
00563   if (enabled_ == false) {
00564     ACE_ERROR_RETURN((LM_ERROR,
00565         ACE_TEXT("(%P|%t) ERROR: PublisherImpl::end_coherent_changes:")
00566         ACE_TEXT(" Publisher is not enabled!\n")),
00567         DDS::RETCODE_NOT_ENABLED);
00568   }
00569 
00570   if (!qos_.presentation.coherent_access) {
00571     ACE_ERROR_RETURN((LM_ERROR,
00572         ACE_TEXT("(%P|%t) ERROR: PublisherImpl::end_coherent_changes:")
00573         ACE_TEXT(" QoS policy does not support coherent access!\n")),
00574         DDS::RETCODE_ERROR);
00575   }
00576 
00577   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00578       guard,
00579       this->pi_lock_,
00580       DDS::RETCODE_ERROR);
00581 
00582   if (this->change_depth_ == 0) {
00583     ACE_ERROR_RETURN((LM_ERROR,
00584         ACE_TEXT("(%P|%t) ERROR: PublisherImpl::end_coherent_changes:")
00585         ACE_TEXT(" No matching call to begin_coherent_changes!\n")),
00586         DDS::RETCODE_PRECONDITION_NOT_MET);
00587   }
00588 
00589   --this->change_depth_;
00590 
00591   if (qos_.presentation.access_scope == DDS::INSTANCE_PRESENTATION_QOS) {
00592     // INSTANCE access scope essentially behaves
00593     // as a no-op. (see: 7.1.3.6)
00594     return DDS::RETCODE_OK;
00595   }
00596 
00597   // We should only notify publications on the first
00598   // and last change to the current change set:
00599   if (this->change_depth_ == 0) {
00600     GroupCoherentSamples group_samples;
00601     for (PublicationMap::iterator it = this->publication_map_.begin();
00602         it != this->publication_map_.end(); ++it) {
00603 
00604       if (it->second->coherent_samples_ == 0) {
00605         continue;
00606       }
00607 
00608       std::pair<GroupCoherentSamples::iterator, bool> pair =
00609           group_samples.insert(GroupCoherentSamples::value_type(
00610               it->second->get_publication_id(),
00611               WriterCoherentSample(it->second->coherent_samples_,
00612                   it->second->sequence_number_)));
00613 
00614       if (pair.second == false) {
00615         ACE_ERROR_RETURN((LM_ERROR,
00616             ACE_TEXT("(%P|%t) ERROR: PublisherImpl::end_coherent_changes: ")
00617             ACE_TEXT("failed to insert to GroupCoherentSamples.\n")),
00618             DDS::RETCODE_ERROR);
00619       }
00620     }
00621 
00622     for (PublicationMap::iterator it = this->publication_map_.begin();
00623         it != this->publication_map_.end(); ++it) {
00624       if (it->second->coherent_samples_ == 0) {
00625         continue;
00626       }
00627 
00628       it->second->end_coherent_changes(group_samples);
00629     }
00630   }
00631 
00632   return DDS::RETCODE_OK;
00633 }

DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::get_default_datawriter_qos ( DDS::DataWriterQos qos  )  [virtual]

Definition at line 718 of file PublisherImpl.cpp.

References default_datawriter_qos_, and DDS::RETCODE_OK.

00719 {
00720   qos = default_datawriter_qos_;
00721   return DDS::RETCODE_OK;
00722 }

DDS::InstanceHandle_t OpenDDS::DCPS::PublisherImpl::get_instance_handle (  )  [virtual]

Implements OpenDDS::DCPS::EntityImpl.

Definition at line 69 of file PublisherImpl.cpp.

References handle_.

Referenced by OpenDDS::DCPS::PublisherMonitorImpl::report().

00070 {
00071   return handle_;
00072 }

DDS::PublisherListener_ptr OpenDDS::DCPS::PublisherImpl::get_listener (  )  [virtual]

Implements DDS::Publisher.

Definition at line 448 of file PublisherImpl.cpp.

References listener_.

00449 {
00450   return DDS::PublisherListener::_duplicate(listener_.in());
00451 }

DDS::DomainParticipant_ptr OpenDDS::DCPS::PublisherImpl::get_participant (  )  [virtual]

Implements DDS::Publisher.

Definition at line 700 of file PublisherImpl.cpp.

References participant_.

Referenced by OpenDDS::DCPS::PublisherMonitorImpl::report().

00701 {
00702   return DDS::DomainParticipant::_duplicate(participant_);
00703 }

ACE_Recursive_Thread_Mutex& OpenDDS::DCPS::PublisherImpl::get_pi_lock (  )  [inline]

Definition at line 112 of file PublisherImpl.h.

00112 { return pi_lock_; }

void OpenDDS::DCPS::PublisherImpl::get_publication_ids ( PublicationIdVec &  pubs  ) 

Populates a std::vector with the PublicationIds (GUIDs) of this Publisher's Data Writers

Definition at line 858 of file PublisherImpl.cpp.

References publication_map_.

Referenced by OpenDDS::DCPS::PublisherMonitorImpl::report().

00859 {
00860   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00861       guard,
00862       this->pi_lock_,
00863   );
00864 
00865   pubs.reserve(publication_map_.size());
00866   for (PublicationMap::iterator iter = publication_map_.begin();
00867       iter != publication_map_.end();
00868       ++iter) {
00869     pubs.push_back(iter->first);
00870   }
00871 }

DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::get_qos ( DDS::PublisherQos qos  )  [virtual]

Definition at line 431 of file PublisherImpl.cpp.

References qos_, and DDS::RETCODE_OK.

Referenced by OpenDDS::DCPS::DataWriterImpl::enable(), OpenDDS::DCPS::DataWriterImpl::retrieve_inline_qos_data(), and OpenDDS::DCPS::DataWriterImpl::set_qos().

00432 {
00433   qos = qos_;
00434   return DDS::RETCODE_OK;
00435 }

bool OpenDDS::DCPS::PublisherImpl::is_clean (  )  const

This method is not defined in the IDL and is defined for internal use. Check if there is any datawriter associated with this publisher.

Definition at line 761 of file PublisherImpl.cpp.

References datawriter_map_, and publication_map_.

Referenced by OpenDDS::DCPS::DomainParticipantImpl::delete_publisher(), and ~PublisherImpl().

00762 {
00763   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00764       guard,
00765       this->pi_lock_,
00766       false);
00767   return datawriter_map_.empty() && publication_map_.empty();
00768 }

bool OpenDDS::DCPS::PublisherImpl::is_suspended (  )  const

Definition at line 473 of file PublisherImpl.cpp.

References suspend_depth_count_.

00474 {
00475   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00476       guard,
00477       this->pi_lock_,
00478       false);
00479   return suspend_depth_count_;
00480 }

DDS::PublisherListener_ptr OpenDDS::DCPS::PublisherImpl::listener_for ( ::DDS::StatusKind  kind  ) 

This is used to retrieve the listener for a certain status change. If this publisher has a registered listener and the status kind is in the listener mask then the listener is returned. Otherwise, the query for listener is propagated up to the factory/DomainParticipant.

Referenced by OpenDDS::DCPS::DataWriterImpl::listener_for().

ACE_Time_Value OpenDDS::DCPS::PublisherImpl::liveliness_check_interval ( DDS::LivelinessQosPolicyKind  kind  ) 

Definition at line 835 of file PublisherImpl.cpp.

References datawriter_map_.

00836 {
00837   ACE_Time_Value tv = ACE_Time_Value::max_time;
00838   for (DataWriterMap::iterator it(datawriter_map_.begin());
00839       it != datawriter_map_.end(); ++it) {
00840     tv = std::min (tv, it->second->liveliness_check_interval(kind));
00841   }
00842   return tv;
00843 }

DDS::DataWriter_ptr OpenDDS::DCPS::PublisherImpl::lookup_datawriter ( const char *  topic_name  )  [virtual]

Definition at line 284 of file PublisherImpl.cpp.

References datawriter_map_, and OpenDDS::DCPS::DCPS_debug_level.

00285 {
00286   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00287       guard,
00288       this->pi_lock_,
00289       DDS::DataWriter::_nil());
00290 
00291   // If multiple entries whose key is "topic_name" then which one is
00292   // returned ? Spec does not limit which one should give.
00293   DataWriterMap::iterator it = datawriter_map_.find(topic_name);
00294 
00295   if (it == datawriter_map_.end()) {
00296     if (DCPS_debug_level >= 2) {
00297       ACE_DEBUG((LM_DEBUG,
00298           ACE_TEXT("(%P|%t) ")
00299           ACE_TEXT("PublisherImpl::lookup_datawriter, ")
00300           ACE_TEXT("The datawriter(topic_name=%C) is not found\n"),
00301           topic_name));
00302     }
00303 
00304     return DDS::DataWriter::_nil();
00305 
00306   } else {
00307     return DDS::DataWriter::_duplicate(it->second);
00308   }
00309 }

typedef OpenDDS::DCPS::PublisherImpl::OPENDDS_MAP_CMP ( RepoId  ,
DDS::DataWriterQos  ,
GUID_tKeyLessThan   
) [private]

typedef OpenDDS::DCPS::PublisherImpl::OPENDDS_MAP_CMP ( PublicationId  ,
DataWriterImpl ,
GUID_tKeyLessThan   
) [private]

typedef OpenDDS::DCPS::PublisherImpl::OPENDDS_MULTIMAP ( OPENDDS_STRING  ,
DataWriterImpl  
) [private]

typedef OpenDDS::DCPS::PublisherImpl::OPENDDS_VECTOR ( PublicationId   ) 

EntityImpl * OpenDDS::DCPS::PublisherImpl::parent (  )  const [virtual]

Reimplemented from OpenDDS::DCPS::EntityImpl.

Definition at line 874 of file PublisherImpl.cpp.

References participant_.

00875 {
00876   return this->participant_;
00877 }

bool OpenDDS::DCPS::PublisherImpl::participant_liveliness_activity_after ( const ACE_Time_Value &  tv  ) 

Definition at line 846 of file PublisherImpl.cpp.

References datawriter_map_.

00847 {
00848   for (DataWriterMap::iterator it(datawriter_map_.begin());
00849       it != datawriter_map_.end(); ++it) {
00850     if (it->second->participant_liveliness_activity_after(tv)) {
00851       return true;
00852     }
00853   }
00854   return false;
00855 }

DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::resume_publications (  )  [virtual]

Implements DDS::Publisher.

Definition at line 483 of file PublisherImpl.cpp.

References OpenDDS::DCPS::EntityImpl::enabled_, publication_map_, DDS::RETCODE_ERROR, DDS::RETCODE_NOT_ENABLED, DDS::RETCODE_OK, DDS::RETCODE_PRECONDITION_NOT_MET, and suspend_depth_count_.

00484 {
00485   if (enabled_ == false) {
00486     ACE_ERROR_RETURN((LM_ERROR,
00487         ACE_TEXT("(%P|%t) ERROR: ")
00488         ACE_TEXT("PublisherImpl::resume_publications, ")
00489         ACE_TEXT(" Entity is not enabled. \n")),
00490         DDS::RETCODE_NOT_ENABLED);
00491   }
00492 
00493   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00494       guard,
00495       this->pi_lock_,
00496       DDS::RETCODE_ERROR);
00497 
00498   --suspend_depth_count_;
00499 
00500   if (suspend_depth_count_ < 0) {
00501     suspend_depth_count_ = 0;
00502     return DDS::RETCODE_PRECONDITION_NOT_MET;
00503   }
00504 
00505   if (suspend_depth_count_ == 0) {
00506 
00507     for (PublicationMap::iterator it = this->publication_map_.begin();
00508         it != this->publication_map_.end(); ++it) {
00509       it->second->send_suspended_data();
00510     }
00511   }
00512 
00513   return DDS::RETCODE_OK;
00514 }

DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::set_default_datawriter_qos ( const DDS::DataWriterQos qos  )  [virtual]

Definition at line 706 of file PublisherImpl.cpp.

References OpenDDS::DCPS::Qos_Helper::consistent(), default_datawriter_qos_, DDS::RETCODE_INCONSISTENT_POLICY, DDS::RETCODE_OK, and OpenDDS::DCPS::Qos_Helper::valid().

00707 {
00708   if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
00709     default_datawriter_qos_ = qos;
00710     return DDS::RETCODE_OK;
00711 
00712   } else {
00713     return DDS::RETCODE_INCONSISTENT_POLICY;
00714   }
00715 }

DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::set_listener ( DDS::PublisherListener_ptr  a_listener,
DDS::StatusMask  mask 
) [virtual]

Definition at line 438 of file PublisherImpl.cpp.

References listener_, listener_mask_, and DDS::RETCODE_OK.

00440 {
00441   listener_mask_ = mask;
00442   //note: OK to duplicate  a nil object ref
00443   listener_ = DDS::PublisherListener::_duplicate(a_listener);
00444   return DDS::RETCODE_OK;
00445 }

DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::set_qos ( const DDS::PublisherQos qos  )  [virtual]

Definition at line 356 of file PublisherImpl.cpp.

References OpenDDS::DCPS::Qos_Helper::changeable(), OpenDDS::DCPS::Qos_Helper::consistent(), OpenDDS::DCPS::EntityImpl::enabled_, OpenDDS::DCPS::DomainParticipantImpl::get_domain_id(), OpenDDS::DCPS::DomainParticipantImpl::get_id(), OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK, OPENDDS_STRING, participant_, publication_map_, qos_, DDS::RETCODE_ERROR, DDS::RETCODE_IMMUTABLE_POLICY, DDS::RETCODE_INCONSISTENT_POLICY, DDS::RETCODE_OK, DDS::RETCODE_UNSUPPORTED, TheServiceParticipant, and OpenDDS::DCPS::Qos_Helper::valid().

00357 {
00358 
00359   OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00360 
00361   if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
00362     if (qos_ == qos)
00363       return DDS::RETCODE_OK;
00364 
00365     // for the not changeable qos, it can be changed before enable
00366     if (!Qos_Helper::changeable(qos_, qos) && enabled_ == true) {
00367       return DDS::RETCODE_IMMUTABLE_POLICY;
00368 
00369     } else {
00370       qos_ = qos;
00371 
00372       DwIdToQosMap idToQosMap;
00373       {
00374         ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00375             guard,
00376             this->pi_lock_,
00377             DDS::RETCODE_ERROR);
00378 
00379         for (PublicationMap::iterator iter = publication_map_.begin();
00380             iter != publication_map_.end();
00381             ++iter) {
00382           DDS::DataWriterQos qos;
00383           iter->second->get_qos(qos);
00384           RepoId id = iter->second->get_publication_id();
00385           std::pair<DwIdToQosMap::iterator, bool> pair =
00386               idToQosMap.insert(DwIdToQosMap::value_type(id, qos));
00387 
00388           if (pair.second == false) {
00389             GuidConverter converter(id);
00390             ACE_ERROR_RETURN((LM_ERROR,
00391                 ACE_TEXT("(%P|%t) ")
00392                 ACE_TEXT("PublisherImpl::set_qos: ")
00393                 ACE_TEXT("insert id %d to DwIdToQosMap ")
00394                 ACE_TEXT("failed.\n"),
00395                 OPENDDS_STRING(converter).c_str()), DDS::RETCODE_ERROR);
00396           }
00397         }
00398       }
00399 
00400       DwIdToQosMap::iterator iter = idToQosMap.begin();
00401 
00402       while (iter != idToQosMap.end()) {
00403         Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
00404         const bool status
00405         = disco->update_publication_qos(
00406             participant_->get_domain_id(),
00407             participant_->get_id(),
00408             iter->first,
00409             iter->second,
00410             this->qos_);
00411 
00412         if (!status) {
00413           ACE_ERROR_RETURN((LM_ERROR,
00414               ACE_TEXT("(%P|%t) PublisherImpl::set_qos, ")
00415               ACE_TEXT("failed. \n")),
00416               DDS::RETCODE_ERROR);
00417         }
00418 
00419         ++iter;
00420       }
00421     }
00422 
00423     return DDS::RETCODE_OK;
00424 
00425   } else {
00426     return DDS::RETCODE_INCONSISTENT_POLICY;
00427   }
00428 }

DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::suspend_publications (  )  [virtual]

Implements DDS::Publisher.

Definition at line 454 of file PublisherImpl.cpp.

References OpenDDS::DCPS::EntityImpl::enabled_, DDS::RETCODE_ERROR, DDS::RETCODE_NOT_ENABLED, DDS::RETCODE_OK, and suspend_depth_count_.

00455 {
00456   if (enabled_ == false) {
00457     ACE_ERROR_RETURN((LM_ERROR,
00458         ACE_TEXT("(%P|%t) ERROR: ")
00459         ACE_TEXT("PublisherImpl::suspend_publications, ")
00460         ACE_TEXT(" Entity is not enabled. \n")),
00461         DDS::RETCODE_NOT_ENABLED);
00462   }
00463 
00464   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00465       guard,
00466       this->pi_lock_,
00467       DDS::RETCODE_ERROR);
00468   ++suspend_depth_count_;
00469   return DDS::RETCODE_OK;
00470 }

bool OpenDDS::DCPS::PublisherImpl::validate_datawriter_qos ( const DDS::DataWriterQos qos,
const DDS::DataWriterQos default_qos,
DDS::Topic_ptr  a_topic,
DDS::DataWriterQos dw_qos 
) [static]

Definition at line 880 of file PublisherImpl.cpp.

References OpenDDS::DCPS::Qos_Helper::consistent(), OpenDDS::DCPS::Qos_Helper::copy_from_topic_qos(), DATAWRITER_QOS_DEFAULT, DATAWRITER_QOS_USE_TOPIC_QOS, OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK, OPENDDS_NO_DURABILITY_SERVICE_COMPATIBILITY_CHECK, OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK, OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK, OPENDDS_NO_OWNERSHIP_STRENGTH_COMPATIBILITY_CHECK, and OpenDDS::DCPS::Qos_Helper::valid().

Referenced by create_datawriter(), and OpenDDS::DCPS::DomainParticipantImpl::create_replayer().

00884 {
00885   if (CORBA::is_nil(a_topic)) {
00886     ACE_ERROR((LM_ERROR,
00887         ACE_TEXT("(%P|%t) ERROR: ")
00888         ACE_TEXT("PublisherImpl::create_datawriter, ")
00889         ACE_TEXT("topic is nil.\n")));
00890     return DDS::DataWriter::_nil();
00891   }
00892 
00893   if (qos == DATAWRITER_QOS_DEFAULT) {
00894     dw_qos = default_qos;
00895 
00896   } else if (qos == DATAWRITER_QOS_USE_TOPIC_QOS) {
00897     DDS::TopicQos topic_qos;
00898     a_topic->get_qos(topic_qos);
00899     dw_qos = default_qos;
00900 
00901     Qos_Helper::copy_from_topic_qos(dw_qos, topic_qos);
00902 
00903   } else {
00904     dw_qos = qos;
00905   }
00906 
00907   OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(dw_qos, DDS::DataWriter::_nil());
00908   OPENDDS_NO_OWNERSHIP_STRENGTH_COMPATIBILITY_CHECK(dw_qos, DDS::DataWriter::_nil());
00909   OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(dw_qos, DDS::DataWriter::_nil());
00910   OPENDDS_NO_DURABILITY_SERVICE_COMPATIBILITY_CHECK(dw_qos, DDS::DataWriter::_nil());
00911   OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(dw_qos, DDS::DataWriter::_nil());
00912 
00913   if (!Qos_Helper::valid(dw_qos)) {
00914     ACE_ERROR((LM_ERROR,
00915         ACE_TEXT("(%P|%t) ERROR: ")
00916         ACE_TEXT("PublisherImpl::create_datawriter, ")
00917         ACE_TEXT("invalid qos.\n")));
00918     return DDS::DataWriter::_nil();
00919   }
00920 
00921   if (!Qos_Helper::consistent(dw_qos)) {
00922     ACE_ERROR((LM_ERROR,
00923         ACE_TEXT("(%P|%t) ERROR: ")
00924         ACE_TEXT("PublisherImpl::create_datawriter, ")
00925         ACE_TEXT("inconsistent qos.\n")));
00926     return DDS::DataWriter::_nil();
00927   }
00928   return true;
00929 }

DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::wait_for_acknowledgments ( const DDS::Duration_t max_wait  )  [virtual]

Definition at line 638 of file PublisherImpl.cpp.

References datawriter_map_, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::EntityImpl::enabled_, OpenDDS::DCPS::OPENDDS_MAP(), DDS::RELIABLE_RELIABILITY_QOS, DDS::RETCODE_ERROR, DDS::RETCODE_NOT_ENABLED, and DDS::RETCODE_OK.

00640 {
00641   if (enabled_ == false) {
00642     ACE_ERROR_RETURN((LM_ERROR,
00643         ACE_TEXT("(%P|%t) ERROR: PublisherImpl::wait_for_acknowledgments, ")
00644         ACE_TEXT("Entity is not enabled.\n")),
00645         DDS::RETCODE_NOT_ENABLED);
00646   }
00647 
00648   typedef OPENDDS_MAP(DataWriterImpl*, DataWriterImpl::AckToken) DataWriterAckMap;
00649   DataWriterAckMap ack_writers;
00650   {
00651     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00652         guard,
00653         this->pi_lock_,
00654         DDS::RETCODE_ERROR);
00655 
00656     // Collect writers to request acks
00657     for (DataWriterMap::iterator it(this->datawriter_map_.begin());
00658         it != this->datawriter_map_.end(); ++it) {
00659       DataWriterImpl* writer = it->second;
00660       if (writer->qos_.reliability.kind != DDS::RELIABLE_RELIABILITY_QOS)
00661         continue;
00662       if (writer->should_ack()) {
00663         DataWriterImpl::AckToken token = writer->create_ack_token(max_wait);
00664 
00665         std::pair<DataWriterAckMap::iterator, bool> pair =
00666             ack_writers.insert(DataWriterAckMap::value_type(writer, token));
00667 
00668         if (!pair.second) {
00669           ACE_ERROR_RETURN((LM_ERROR,
00670               ACE_TEXT("(%P|%t) ERROR: PublisherImpl::wait_for_acknowledgments, ")
00671               ACE_TEXT("Unable to insert AckToken into DataWriterAckMap!\n")),
00672               DDS::RETCODE_ERROR);
00673         }
00674       }
00675     }
00676   }
00677 
00678   if (ack_writers.empty()) {
00679     if (DCPS_debug_level > 0) {
00680       ACE_DEBUG((LM_DEBUG,
00681           ACE_TEXT("(%P|%t) PublisherImpl::wait_for_acknowledgments() - ")
00682           ACE_TEXT("not blocking due to no writers requiring acks.\n")));
00683     }
00684 
00685     return DDS::RETCODE_OK;
00686   }
00687 
00688   // Wait for ack responses from all associated readers
00689   for (DataWriterAckMap::iterator it(ack_writers.begin());
00690       it != ack_writers.end(); ++it) {
00691     DataWriterImpl::AckToken token = it->second;
00692 
00693     it->first->wait_for_specific_ack(token);
00694   }
00695 
00696   return DDS::RETCODE_OK;
00697 }

DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::writer_enabled ( const char *  topic_name,
DataWriterImpl impl 
)

This method is called when the datawriter created by this publisher was enabled.

Definition at line 771 of file PublisherImpl.cpp.

References datawriter_map_, OpenDDS::DCPS::DataWriterImpl::get_publication_id(), OPENDDS_STRING, publication_map_, DDS::RETCODE_ERROR, and DDS::RETCODE_OK.

Referenced by OpenDDS::DCPS::DataWriterImpl::enable().

00773 {
00774   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00775       guard,
00776       this->pi_lock_,
00777       DDS::RETCODE_ERROR);
00778 
00779   datawriter_map_.insert(DataWriterMap::value_type(topic_name, writer));
00780 
00781   const RepoId publication_id = writer->get_publication_id();
00782 
00783   std::pair<PublicationMap::iterator, bool> pair =
00784       publication_map_.insert(PublicationMap::value_type(publication_id, writer));
00785 
00786   if (pair.second == false) {
00787     GuidConverter converter(publication_id);
00788     ACE_ERROR_RETURN((LM_ERROR,
00789         ACE_TEXT("(%P|%t) ERROR: ")
00790         ACE_TEXT("PublisherImpl::writer_enabled: ")
00791         ACE_TEXT("insert publication %C failed.\n"),
00792         OPENDDS_STRING(converter).c_str()), DDS::RETCODE_ERROR);
00793   }
00794 
00795   // Increase ref count when the servant is added to the
00796   // datawriter/publication map.
00797   writer->_add_ref();
00798 
00799   return DDS::RETCODE_OK;
00800 }


Friends And Related Function Documentation

friend class DataWriterImpl [friend]

Definition at line 42 of file PublisherImpl.h.


Member Data Documentation

ACE_Time_Value OpenDDS::DCPS::PublisherImpl::aggregation_period_start_ [private]

Start of current aggregation period. - NOT USED IN FIRST IMPL.

Definition at line 191 of file PublisherImpl.h.

std::size_t OpenDDS::DCPS::PublisherImpl::change_depth_ [private]

The number of times begin_coherent_changes as been called.

Definition at line 179 of file PublisherImpl.h.

Referenced by begin_coherent_changes(), and end_coherent_changes().

DataWriterMap OpenDDS::DCPS::PublisherImpl::datawriter_map_ [private]

This map is used to support datawriter lookup by topic name.

Definition at line 173 of file PublisherImpl.h.

Referenced by assert_liveliness_by_participant(), contains_writer(), delete_contained_entities(), delete_datawriter(), is_clean(), liveliness_check_interval(), lookup_datawriter(), participant_liveliness_activity_after(), wait_for_acknowledgments(), and writer_enabled().

DDS::DataWriterQos OpenDDS::DCPS::PublisherImpl::default_datawriter_qos_ [private]

Default datawriter Qos policy list.

Definition at line 165 of file PublisherImpl.h.

Referenced by create_datawriter(), get_default_datawriter_qos(), and set_default_datawriter_qos().

DDS::DomainId_t OpenDDS::DCPS::PublisherImpl::domain_id_ [private]

Domain in which we are contained.

Definition at line 182 of file PublisherImpl.h.

DDS::InstanceHandle_t OpenDDS::DCPS::PublisherImpl::handle_ [private]

Definition at line 160 of file PublisherImpl.h.

Referenced by get_instance_handle().

DDS::PublisherListener_var OpenDDS::DCPS::PublisherImpl::listener_ [private]

Used to notify the entity for relevant events.

Definition at line 171 of file PublisherImpl.h.

Referenced by get_listener(), and set_listener().

DDS::StatusMask OpenDDS::DCPS::PublisherImpl::listener_mask_ [private]

The StatusKind bit mask indicates which status condition change can be notified by the listener of this entity.

Definition at line 169 of file PublisherImpl.h.

Referenced by set_listener().

Monitor* OpenDDS::DCPS::PublisherImpl::monitor_ [private]

Monitor object for this entity.

Definition at line 200 of file PublisherImpl.h.

Referenced by enable(), and PublisherImpl().

DomainParticipantImpl* OpenDDS::DCPS::PublisherImpl::participant_ [private]

The DomainParticipant servant that owns this Publisher.

Definition at line 184 of file PublisherImpl.h.

Referenced by create_datawriter(), delete_datawriter(), get_participant(), parent(), and set_qos().

lock_type OpenDDS::DCPS::PublisherImpl::pi_lock_ [mutable, private]

The recursive lock to protect datawriter map and suspend count.

Definition at line 196 of file PublisherImpl.h.

PublicationMap OpenDDS::DCPS::PublisherImpl::publication_map_ [private]

This map is used to support datawriter lookup by datawriter repository id.

Definition at line 176 of file PublisherImpl.h.

Referenced by begin_coherent_changes(), delete_datawriter(), end_coherent_changes(), get_publication_ids(), is_clean(), resume_publications(), set_qos(), and writer_enabled().

RepoId OpenDDS::DCPS::PublisherImpl::publisher_id_ [private]

NOTE: The publisher_id_ is not generated by repository, it's uniqueue in DomainParticipant scope.

Definition at line 204 of file PublisherImpl.h.

Referenced by OpenDDS::DCPS::DataWriterImpl::create_control_message(), OpenDDS::DCPS::DataWriterImpl::create_sample_data_message(), and OpenDDS::DCPS::DataWriterImpl::end_coherent_changes().

DDS::PublisherQos OpenDDS::DCPS::PublisherImpl::qos_ [private]

Publisher QoS policy list.

Definition at line 163 of file PublisherImpl.h.

Referenced by begin_coherent_changes(), create_datawriter(), OpenDDS::DCPS::DataWriterImpl::create_sample_data_message(), end_coherent_changes(), OpenDDS::DCPS::DataWriterImpl::end_coherent_changes(), get_qos(), and set_qos().

reverse_lock_type OpenDDS::DCPS::PublisherImpl::reverse_pi_lock_ [private]

Definition at line 197 of file PublisherImpl.h.

SequenceNumber OpenDDS::DCPS::PublisherImpl::sequence_number_ [private]

Unique sequence number used when the scope_access = GROUP.

Definition at line 189 of file PublisherImpl.h.

CORBA::Short OpenDDS::DCPS::PublisherImpl::suspend_depth_count_ [private]

The suspend depth count.

Definition at line 186 of file PublisherImpl.h.

Referenced by is_suspended(), resume_publications(), and suspend_publications().


The documentation for this class was generated from the following files:
Generated on Fri Feb 12 20:06:21 2016 for OpenDDS by  doxygen 1.4.7