OpenDDS::DCPS::PublisherImpl Class Reference

Implements the OpenDDS::DCPS::Publisher interfaces. More...

#include <PublisherImpl.h>

Inheritance diagram for OpenDDS::DCPS::PublisherImpl:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::PublisherImpl:
Collaboration graph
[legend]

List of all members.

Public Member Functions

 PublisherImpl (DDS::InstanceHandle_t handle, RepoId id, const DDS::PublisherQos &qos, DDS::PublisherListener_ptr a_listener, const DDS::StatusMask &mask, DomainParticipantImpl *participant)
virtual ~PublisherImpl ()
virtual DDS::InstanceHandle_t get_instance_handle ()
bool contains_writer (DDS::InstanceHandle_t a_handle)
virtual DDS::DataWriter_ptr create_datawriter (DDS::Topic_ptr a_topic, const DDS::DataWriterQos &qos, DDS::DataWriterListener_ptr a_listener, DDS::StatusMask mask)
virtual DDS::ReturnCode_t delete_datawriter (DDS::DataWriter_ptr a_datawriter)
virtual DDS::DataWriter_ptr lookup_datawriter (const char *topic_name)
virtual DDS::ReturnCode_t delete_contained_entities ()
virtual DDS::ReturnCode_t set_qos (const DDS::PublisherQos &qos)
virtual DDS::ReturnCode_t get_qos (DDS::PublisherQos &qos)
virtual DDS::ReturnCode_t set_listener (DDS::PublisherListener_ptr a_listener, DDS::StatusMask mask)
virtual DDS::PublisherListener_ptr get_listener ()
virtual DDS::ReturnCode_t suspend_publications ()
virtual DDS::ReturnCode_t resume_publications ()
virtual DDS::ReturnCode_t begin_coherent_changes ()
virtual DDS::ReturnCode_t end_coherent_changes ()
virtual DDS::ReturnCode_t wait_for_acknowledgments (const DDS::Duration_t &max_wait)
virtual DDS::DomainParticipant_ptr get_participant ()
virtual DDS::ReturnCode_t set_default_datawriter_qos (const DDS::DataWriterQos &qos)
virtual DDS::ReturnCode_t get_default_datawriter_qos (DDS::DataWriterQos &qos)
virtual DDS::ReturnCode_t copy_from_topic_qos (DDS::DataWriterQos &a_datawriter_qos, const DDS::TopicQos &a_topic_qos)
virtual DDS::ReturnCode_t enable ()
ACE_Recursive_Thread_Mutexget_pi_lock ()
bool is_clean () const
DDS::ReturnCode_t writer_enabled (const char *topic_name, DataWriterImpl *impl)
DDS::PublisherListener_ptr listener_for (::DDS::StatusKind kind)
DDS::ReturnCode_t assert_liveliness_by_participant ()
ACE_Time_Value liveliness_check_interval (DDS::LivelinessQosPolicyKind kind)
bool participant_liveliness_activity_after (const ACE_Time_Value &tv)
typedef OPENDDS_VECTOR (PublicationId) PublicationIdVec
void get_publication_ids (PublicationIdVec &pubs)
bool is_suspended () const
virtual RcHandle< EntityImplparent () const

Static Public Member Functions

static bool validate_datawriter_qos (const DDS::DataWriterQos &qos, const DDS::DataWriterQos &default_qos, DDS::Topic_ptr a_topic, DDS::DataWriterQos &dw_qos)

Private Types

typedef ACE_Recursive_Thread_Mutex lock_type
typedef ACE_Reverse_Lock
< lock_type
reverse_lock_type

Private Member Functions

typedef OPENDDS_MULTIMAP (OPENDDS_STRING, DataWriterImpl_rch) DataWriterMap
typedef OPENDDS_MAP_CMP (PublicationId, DataWriterImpl_rch, GUID_tKeyLessThan) PublicationMap
typedef OPENDDS_MAP_CMP (RepoId, DDS::DataWriterQos, GUID_tKeyLessThan) DwIdToQosMap
typedef OPENDDS_SET (DataWriterImpl_rch) DataWriterSet

Private Attributes

DDS::InstanceHandle_t handle_
DDS::PublisherQos qos_
 Publisher QoS policy list.
DDS::DataWriterQos default_datawriter_qos_
 Default datawriter Qos policy list.
DDS::StatusMask listener_mask_
DDS::PublisherListener_var listener_
 Used to notify the entity for relevant events.
DataWriterSet writers_not_enabled_
DataWriterMap datawriter_map_
 This map is used to support datawriter lookup by topic name.
PublicationMap publication_map_
std::size_t change_depth_
 The number of times begin_coherent_changes as been called.
DDS::DomainId_t domain_id_
 Domain in which we are contained.
WeakRcHandle
< DomainParticipantImpl
participant_
 The DomainParticipant servant that owns this Publisher.
CORBA::Short suspend_depth_count_
 The suspend depth count.
SequenceNumber sequence_number_
ACE_Time_Value aggregation_period_start_
 Start of current aggregation period. - NOT USED IN FIRST IMPL.
lock_type pi_lock_
 The recursive lock to protect datawriter map and suspend count.
reverse_lock_type reverse_pi_lock_
Monitormonitor_
 Monitor object for this entity.
RepoId publisher_id_

Friends

class DataWriterImpl

Detailed Description

Implements the OpenDDS::DCPS::Publisher interfaces.

This class acts as a factory and container of the datawriter.

See the DDS specification, OMG formal/04-12-02, for a description of the interface this class is implementing.

Definition at line 38 of file PublisherImpl.h.


Member Typedef Documentation

Definition at line 198 of file PublisherImpl.h.

Definition at line 199 of file PublisherImpl.h.


Constructor & Destructor Documentation

OpenDDS::DCPS::PublisherImpl::PublisherImpl ( DDS::InstanceHandle_t  handle,
RepoId  id,
const DDS::PublisherQos qos,
DDS::PublisherListener_ptr  a_listener,
const DDS::StatusMask mask,
DomainParticipantImpl participant 
)

Definition at line 30 of file PublisherImpl.cpp.

References _duplicate(), monitor_, and TheServiceParticipant.

00036 : handle_(handle),
00037   qos_(qos),
00038   default_datawriter_qos_(TheServiceParticipant->initial_DataWriterQos()),
00039   listener_mask_(mask),
00040   listener_(DDS::PublisherListener::_duplicate(a_listener)),
00041 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00042   change_depth_(0),
00043 #endif
00044   domain_id_(participant->get_domain_id()),
00045   participant_(*participant),
00046   suspend_depth_count_(0),
00047   sequence_number_(),
00048   aggregation_period_start_(ACE_Time_Value::zero),
00049   reverse_pi_lock_(pi_lock_),
00050   monitor_(0),
00051   publisher_id_(id)
00052 {
00053   monitor_ = TheServiceParticipant->monitor_factory_->create_publisher_monitor(this);
00054 }

Here is the call graph for this function:

OpenDDS::DCPS::PublisherImpl::~PublisherImpl (  )  [virtual]

Definition at line 56 of file PublisherImpl.cpp.

References ACE_TEXT(), datawriter_map_, is_clean(), LM_ERROR, and publication_map_.

00057 {
00058   //The datawriters should be deleted already before calling delete
00059   //publisher.
00060   if (!is_clean()) {
00061     ACE_ERROR((LM_ERROR,
00062         ACE_TEXT("(%P|%t) ERROR: ")
00063         ACE_TEXT("PublisherImpl::~PublisherImpl, ")
00064         ACE_TEXT("%B datawriters and %B publications still exist.\n"),
00065         datawriter_map_.size(), publication_map_.size()));
00066   }
00067 }

Here is the call graph for this function:


Member Function Documentation

DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::assert_liveliness_by_participant (  ) 

Definition at line 861 of file PublisherImpl.cpp.

References datawriter_map_, and DDS::RETCODE_OK.

00862 {
00863   DDS::ReturnCode_t ret = DDS::RETCODE_OK;
00864 
00865   for (DataWriterMap::iterator it(datawriter_map_.begin());
00866       it != datawriter_map_.end(); ++it) {
00867     DDS::ReturnCode_t dw_ret = it->second->assert_liveliness_by_participant();
00868 
00869     if (dw_ret != DDS::RETCODE_OK) {
00870       ret = dw_ret;
00871     }
00872   }
00873 
00874   return ret;
00875 }

DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::begin_coherent_changes (  )  [virtual]

Implements DDS::Publisher.

Definition at line 544 of file PublisherImpl.cpp.

References ACE_TEXT(), change_depth_, OpenDDS::DCPS::EntityImpl::enabled_, DDS::INSTANCE_PRESENTATION_QOS, LM_ERROR, pi_lock_, DDS::PublisherQos::presentation, publication_map_, qos_, DDS::RETCODE_ERROR, DDS::RETCODE_NOT_ENABLED, and DDS::RETCODE_OK.

00545 {
00546   if (enabled_ == false) {
00547     ACE_ERROR_RETURN((LM_ERROR,
00548         ACE_TEXT("(%P|%t) ERROR: PublisherImpl::begin_coherent_changes:")
00549         ACE_TEXT(" Publisher is not enabled!\n")),
00550         DDS::RETCODE_NOT_ENABLED);
00551   }
00552 
00553   if (!qos_.presentation.coherent_access) {
00554     ACE_ERROR_RETURN((LM_ERROR,
00555         ACE_TEXT("(%P|%t) ERROR: PublisherImpl::begin_coherent_changes:")
00556         ACE_TEXT(" QoS policy does not support coherent access!\n")),
00557         DDS::RETCODE_ERROR);
00558   }
00559 
00560   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00561       guard,
00562       this->pi_lock_,
00563       DDS::RETCODE_ERROR);
00564 
00565   ++this->change_depth_;
00566 
00567   if (qos_.presentation.access_scope == DDS::INSTANCE_PRESENTATION_QOS) {
00568     // INSTANCE access scope essentially behaves
00569     // as a no-op. (see: 7.1.3.6)
00570     return DDS::RETCODE_OK;
00571   }
00572 
00573   // We should only notify publications on the first
00574   // and last change to the current change set:
00575   if (this->change_depth_ == 1) {
00576     for (PublicationMap::iterator it = this->publication_map_.begin();
00577         it != this->publication_map_.end(); ++it) {
00578       it->second->begin_coherent_changes();
00579     }
00580   }
00581 
00582   return DDS::RETCODE_OK;
00583 }

Here is the call graph for this function:

bool OpenDDS::DCPS::PublisherImpl::contains_writer ( DDS::InstanceHandle_t  a_handle  ) 

Definition at line 76 of file PublisherImpl.cpp.

References datawriter_map_, pi_lock_, and DDS::RETCODE_ERROR.

00077 {
00078   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00079       guard,
00080       this->pi_lock_,
00081       DDS::RETCODE_ERROR);
00082 
00083   for (DataWriterMap::iterator it(datawriter_map_.begin());
00084       it != datawriter_map_.end(); ++it) {
00085     if (a_handle == it->second->get_instance_handle()) {
00086       return true;
00087     }
00088   }
00089 
00090   return false;
00091 }

DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::copy_from_topic_qos ( DDS::DataWriterQos a_datawriter_qos,
const DDS::TopicQos a_topic_qos 
) [virtual]

Definition at line 750 of file PublisherImpl.cpp.

References OpenDDS::DCPS::Qos_Helper::copy_from_topic_qos(), DDS::RETCODE_INCONSISTENT_POLICY, and DDS::RETCODE_OK.

Referenced by validate_datawriter_qos().

00752 {
00753   if (Qos_Helper::copy_from_topic_qos(a_datawriter_qos, a_topic_qos)) {
00754     return DDS::RETCODE_OK;
00755   } else {
00756     return DDS::RETCODE_INCONSISTENT_POLICY;
00757   }
00758 }

Here is the call graph for this function:

Here is the caller graph for this function:

DDS::DataWriter_ptr OpenDDS::DCPS::PublisherImpl::create_datawriter ( DDS::Topic_ptr  a_topic,
const DDS::DataWriterQos qos,
DDS::DataWriterListener_ptr  a_listener,
DDS::StatusMask  mask 
) [virtual]

Definition at line 94 of file PublisherImpl.cpp.

References CORBA::LocalObject::_duplicate(), CORBA::LocalObject::_nil(), ACE_TEXT(), default_datawriter_qos_, OpenDDS::DCPS::DataWriterImpl::enable(), OpenDDS::DCPS::EntityImpl::enabled_, DDS::PublisherQos::entity_factory, OpenDDS::DCPS::TopicDescriptionImpl::get_name(), OpenDDS::DCPS::TopicDescriptionImpl::get_type_support(), OpenDDS::DCPS::DataWriterImpl::init(), LM_ERROR, LM_WARNING, participant_, pi_lock_, qos_, OpenDDS::DCPS::rchandle_from(), DDS::RETCODE_OK, validate_datawriter_qos(), and writers_not_enabled_.

00099 {
00100   DDS::DataWriterQos dw_qos;
00101 
00102   if (!validate_datawriter_qos(qos, default_datawriter_qos_, a_topic, dw_qos)) {
00103     return DDS::DataWriter::_nil();
00104   }
00105 
00106   TopicImpl* topic_servant = dynamic_cast<TopicImpl*>(a_topic);
00107 
00108   if (!topic_servant) {
00109     CORBA::String_var name = a_topic->get_name();
00110     ACE_ERROR((LM_ERROR,
00111       ACE_TEXT("(%P|%t) ERROR: ")
00112       ACE_TEXT("PublisherImpl::create_datawriter, ")
00113       ACE_TEXT("topic_servant(topic_name=%C) is nil.\n"),
00114       name.in()));
00115     return 0;
00116   }
00117 
00118   OpenDDS::DCPS::TypeSupport_ptr typesupport =
00119       topic_servant->get_type_support();
00120 
00121   if (typesupport == 0) {
00122     CORBA::String_var name = topic_servant->get_name();
00123     ACE_ERROR((LM_ERROR,
00124         ACE_TEXT("(%P|%t) ERROR: ")
00125         ACE_TEXT("PublisherImpl::create_datawriter, ")
00126         ACE_TEXT("typesupport(topic_name=%C) is nil.\n"),
00127         name.in()));
00128     return DDS::DataWriter::_nil();
00129   }
00130 
00131   DDS::DataWriter_var dw_obj = typesupport->create_datawriter();
00132 
00133   DataWriterImpl* dw_servant =
00134       dynamic_cast <DataWriterImpl*>(dw_obj.in());
00135 
00136   if (dw_servant == 0) {
00137     ACE_ERROR((LM_ERROR,
00138         ACE_TEXT("(%P|%t) ERROR: ")
00139         ACE_TEXT("PublisherImpl::create_datawriter, ")
00140         ACE_TEXT("servant is nil.\n")));
00141     return DDS::DataWriter::_nil();
00142   }
00143 
00144   dw_servant->init(
00145       topic_servant,
00146       dw_qos,
00147       a_listener,
00148       mask,
00149       participant_,
00150       this);
00151 
00152   if ((this->enabled_ == true) && (qos_.entity_factory.autoenable_created_entities)) {
00153     const DDS::ReturnCode_t ret = dw_servant->enable();
00154 
00155     if (ret != DDS::RETCODE_OK) {
00156       ACE_ERROR((LM_WARNING,
00157           ACE_TEXT("(%P|%t) WARNING: ")
00158           ACE_TEXT("PublisherImpl::create_datawriter, ")
00159           ACE_TEXT("enable failed.\n")));
00160       return DDS::DataWriter::_nil();
00161     }
00162   } else {
00163     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, pi_lock_, 0);
00164     writers_not_enabled_.insert(rchandle_from(dw_servant));
00165   }
00166 
00167   return DDS::DataWriter::_duplicate(dw_obj.in());
00168 }

Here is the call graph for this function:

DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::delete_contained_entities (  )  [virtual]

Implements DDS::Publisher.

Definition at line 334 of file PublisherImpl.cpp.

References ACE_TEXT(), datawriter_map_, delete_datawriter(), OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::RcHandle< T >::in(), LM_ERROR, OPENDDS_STRING, pi_lock_, DDS::RETCODE_ERROR, DDS::RETCODE_OK, and OpenDDS::DCPS::EntityImpl::set_deleted().

00335 {
00336   // mark that the entity is being deleted
00337   set_deleted(true);
00338 
00339   while (true) {
00340     PublicationId pub_id = GUID_UNKNOWN;
00341     DataWriterImpl_rch a_datawriter;
00342 
00343     {
00344       ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00345           guard,
00346           this->pi_lock_,
00347           DDS::RETCODE_ERROR);
00348 
00349       if (datawriter_map_.empty()) {
00350         break;
00351       } else {
00352         a_datawriter = datawriter_map_.begin()->second;
00353         pub_id = a_datawriter->get_publication_id();
00354       }
00355     }
00356 
00357     const DDS::ReturnCode_t ret = delete_datawriter(a_datawriter.in());
00358 
00359     if (ret != DDS::RETCODE_OK) {
00360       GuidConverter converter(pub_id);
00361       ACE_ERROR_RETURN((LM_ERROR,
00362           ACE_TEXT("(%P|%t) ERROR: ")
00363           ACE_TEXT("PublisherImpl::")
00364           ACE_TEXT("delete_contained_entities: ")
00365           ACE_TEXT("failed to delete ")
00366           ACE_TEXT("datawriter %C.\n"),
00367           OPENDDS_STRING(converter).c_str()),ret);
00368     }
00369   }
00370 
00371   // the publisher can now start creating new publications
00372   set_deleted(false);
00373 
00374   return DDS::RETCODE_OK;
00375 }

Here is the call graph for this function:

DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::delete_datawriter ( DDS::DataWriter_ptr  a_datawriter  )  [virtual]

Definition at line 171 of file PublisherImpl.cpp.

References ACE_TEXT(), OpenDDS::DCPS::DataWriterImpl::cleanup(), datawriter_map_, OpenDDS::DCPS::DCPS_debug_level, domain_id_, OpenDDS::DCPS::DataWriterImpl::get_publication_id(), OpenDDS::DCPS::DataWriterImpl::get_publisher(), ACE_OS::gettimeofday(), OpenDDS::DCPS::GUID_UNKNOWN, LM_ERROR, OpenDDS::DCPS::WeakRcHandle< T >::lock(), monitor_, OPENDDS_STRING, participant_, OpenDDS::DCPS::DataWriterImpl::persist_data(), pi_lock_, OpenDDS::DCPS::DataWriterImpl::prepare_to_delete(), publication_map_, OpenDDS::DCPS::DataWriterImpl::remove_all_associations(), OpenDDS::DCPS::Monitor::report(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, DDS::RETCODE_PRECONDITION_NOT_MET, reverse_pi_lock_, TheServiceParticipant, OpenDDS::DCPS::time_value_to_time(), OpenDDS::DCPS::DataWriterImpl::unregister_all(), OpenDDS::DCPS::DataWriterImpl::unregister_instances(), OpenDDS::DCPS::DataWriterImpl::wait_control_pending(), and OpenDDS::DCPS::DataWriterImpl::wait_pending().

Referenced by delete_contained_entities().

00172 {
00173   DataWriterImpl* dw_servant = dynamic_cast<DataWriterImpl*>(a_datawriter);
00174   if (!dw_servant) {
00175     ACE_ERROR((LM_ERROR,
00176               "(%P|%t) PublisherImpl::delete_datawriter - dynamic cast to DataWriterImpl failed\n"
00177     ));
00178     return DDS::RETCODE_ERROR;
00179   }
00180 
00181   // marks entity as deleted and stops future associating
00182   dw_servant->prepare_to_delete();
00183 
00184   {
00185     DDS::Publisher_var dw_publisher(dw_servant->get_publisher());
00186 
00187     if (dw_publisher.in() != this) {
00188       RepoId id = dw_servant->get_publication_id();
00189       GuidConverter converter(id);
00190       ACE_ERROR((LM_ERROR,
00191           ACE_TEXT("(%P|%t) PublisherImpl::delete_datawriter: ")
00192           ACE_TEXT("the data writer %C doesn't ")
00193           ACE_TEXT("belong to this subscriber \n"),
00194           OPENDDS_STRING(converter).c_str()));
00195       return DDS::RETCODE_PRECONDITION_NOT_MET;
00196     }
00197   }
00198 
00199 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
00200   // Trigger data to be persisted, i.e. made durable, if so
00201   // configured. This needs be called before unregister_instances
00202   // because unregister_instances may cause instance dispose.
00203   if (!dw_servant->persist_data() && DCPS_debug_level >= 2) {
00204     ACE_ERROR((LM_ERROR,
00205         ACE_TEXT("(%P|%t) ERROR: ")
00206         ACE_TEXT("PublisherImpl::delete_datawriter, ")
00207         ACE_TEXT("failed to make data durable.\n")));
00208   }
00209 #endif
00210 
00211   // Unregister all registered instances prior to deletion.
00212   DDS::Time_t source_timestamp = time_value_to_time(ACE_OS::gettimeofday());
00213   dw_servant->unregister_instances(source_timestamp);
00214 
00215   // Wait for any control messages to be transported during
00216   // unregistering of instances.
00217   dw_servant->wait_pending();
00218   dw_servant->wait_control_pending();
00219 
00220   RepoId publication_id  = GUID_UNKNOWN;
00221   {
00222     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00223         guard,
00224         this->pi_lock_,
00225         DDS::RETCODE_ERROR);
00226 
00227     publication_id = dw_servant->get_publication_id();
00228 
00229     PublicationMap::iterator it = publication_map_.find(publication_id);
00230 
00231     if (it == publication_map_.end()) {
00232       GuidConverter converter(publication_id);
00233       ACE_ERROR_RETURN((LM_ERROR,
00234           ACE_TEXT("(%P|%t) ERROR: ")
00235           ACE_TEXT("PublisherImpl::delete_datawriter, ")
00236           ACE_TEXT("datawriter %C not found.\n"),
00237           OPENDDS_STRING(converter).c_str()), DDS::RETCODE_ERROR);
00238     }
00239 
00240     // We can not erase the datawriter from datawriter map by the topic name
00241     // because the map might have multiple datawriters with the same topic
00242     // name.
00243     // Find the iterator to the datawriter in the datawriter map and erase
00244     // by the iterator.
00245     DataWriterMap::iterator writ;
00246     DataWriterMap::iterator the_writ = datawriter_map_.end();
00247 
00248     for (writ = datawriter_map_.begin();
00249         writ != datawriter_map_.end();
00250         ++writ) {
00251       if (writ->second == it->second) {
00252         the_writ = writ;
00253         break;
00254       }
00255     }
00256 
00257     if (the_writ != datawriter_map_.end()) {
00258       datawriter_map_.erase(the_writ);
00259     }
00260 
00261     publication_map_.erase(it);
00262 
00263     // Release pi_lock_ before making call to transport layer to avoid
00264     // some deadlock situations that threads acquire locks(PublisherImpl
00265     // pi_lock_, TransportClient reservation_lock and TransportImpl
00266     // lock_) in reverse order.
00267     ACE_GUARD_RETURN(reverse_lock_type, reverse_monitor, this->reverse_pi_lock_,
00268         DDS::RETCODE_ERROR);
00269     // Wait for pending samples to drain prior to removing associations
00270     // and unregistering the publication.
00271     dw_servant->wait_pending();
00272     // Call remove association before unregistering the datawriter
00273     // with the transport, otherwise some callbacks resulted from
00274     // remove_association may lost.
00275     dw_servant->remove_all_associations();
00276     dw_servant->cleanup();
00277   }
00278 
00279   if (this->monitor_) {
00280     this->monitor_->report();
00281   }
00282 
00283   // not just unregister but remove any pending writes/sends.
00284   dw_servant->unregister_all();
00285 
00286   RcHandle<DomainParticipantImpl> participant = this->participant_.lock();
00287 
00288   Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
00289   if (!disco->remove_publication(
00290       this->domain_id_,
00291       participant->get_id(),
00292       publication_id)) {
00293     ACE_ERROR_RETURN((LM_ERROR,
00294         ACE_TEXT("(%P|%t) ERROR: ")
00295         ACE_TEXT("PublisherImpl::delete_datawriter, ")
00296         ACE_TEXT("publication not removed from discovery.\n")),
00297         DDS::RETCODE_ERROR);
00298   }
00299 
00300   participant->remove_adjust_liveliness_timers();
00301 
00302   return DDS::RETCODE_OK;
00303 }

Here is the call graph for this function:

Here is the caller graph for this function:

DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::enable (  )  [virtual]

Implements DDS::Entity.

Definition at line 761 of file PublisherImpl.cpp.

References DDS::PublisherQos::entity_factory, OpenDDS::DCPS::EntityImpl::is_enabled(), OpenDDS::DCPS::WeakRcHandle< T >::lock(), monitor_, participant_, pi_lock_, qos_, OpenDDS::DCPS::Monitor::report(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, DDS::RETCODE_PRECONDITION_NOT_MET, OpenDDS::DCPS::EntityImpl::set_enabled(), and writers_not_enabled_.

Referenced by OpenDDS::DCPS::DomainParticipantImpl::create_publisher().

00762 {
00763   //According spec:
00764   // - Calling enable on an already enabled Entity returns OK and has no
00765   // effect.
00766   // - Calling enable on an Entity whose factory is not enabled will fail
00767   // and return PRECONDITION_NOT_MET.
00768 
00769   if (this->is_enabled()) {
00770     return DDS::RETCODE_OK;
00771   }
00772 
00773   RcHandle<DomainParticipantImpl> participant = this->participant_.lock();
00774   if (!participant || participant->is_enabled() == false) {
00775     return DDS::RETCODE_PRECONDITION_NOT_MET;
00776   }
00777 
00778   if (this->monitor_) {
00779     this->monitor_->report();
00780   }
00781 
00782   this->set_enabled();
00783 
00784   if (qos_.entity_factory.autoenable_created_entities) {
00785     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, pi_lock_, DDS::RETCODE_ERROR);
00786     DataWriterSet writers;
00787     writers_not_enabled_.swap(writers);
00788     for (DataWriterSet::iterator it = writers.begin(); it != writers.end(); ++it) {
00789       (*it)->enable();
00790     }
00791   }
00792 
00793   return DDS::RETCODE_OK;
00794 }

Here is the call graph for this function:

Here is the caller graph for this function:

DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::end_coherent_changes (  )  [virtual]

Implements DDS::Publisher.

Definition at line 586 of file PublisherImpl.cpp.

References ACE_TEXT(), change_depth_, OpenDDS::DCPS::EntityImpl::enabled_, DDS::INSTANCE_PRESENTATION_QOS, LM_ERROR, pi_lock_, DDS::PublisherQos::presentation, publication_map_, qos_, DDS::RETCODE_ERROR, DDS::RETCODE_NOT_ENABLED, DDS::RETCODE_OK, and DDS::RETCODE_PRECONDITION_NOT_MET.

00587 {
00588   if (enabled_ == false) {
00589     ACE_ERROR_RETURN((LM_ERROR,
00590         ACE_TEXT("(%P|%t) ERROR: PublisherImpl::end_coherent_changes:")
00591         ACE_TEXT(" Publisher is not enabled!\n")),
00592         DDS::RETCODE_NOT_ENABLED);
00593   }
00594 
00595   if (!qos_.presentation.coherent_access) {
00596     ACE_ERROR_RETURN((LM_ERROR,
00597         ACE_TEXT("(%P|%t) ERROR: PublisherImpl::end_coherent_changes:")
00598         ACE_TEXT(" QoS policy does not support coherent access!\n")),
00599         DDS::RETCODE_ERROR);
00600   }
00601 
00602   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00603       guard,
00604       this->pi_lock_,
00605       DDS::RETCODE_ERROR);
00606 
00607   if (this->change_depth_ == 0) {
00608     ACE_ERROR_RETURN((LM_ERROR,
00609         ACE_TEXT("(%P|%t) ERROR: PublisherImpl::end_coherent_changes:")
00610         ACE_TEXT(" No matching call to begin_coherent_changes!\n")),
00611         DDS::RETCODE_PRECONDITION_NOT_MET);
00612   }
00613 
00614   --this->change_depth_;
00615 
00616   if (qos_.presentation.access_scope == DDS::INSTANCE_PRESENTATION_QOS) {
00617     // INSTANCE access scope essentially behaves
00618     // as a no-op. (see: 7.1.3.6)
00619     return DDS::RETCODE_OK;
00620   }
00621 
00622   // We should only notify publications on the first
00623   // and last change to the current change set:
00624   if (this->change_depth_ == 0) {
00625     GroupCoherentSamples group_samples;
00626     for (PublicationMap::iterator it = this->publication_map_.begin();
00627         it != this->publication_map_.end(); ++it) {
00628 
00629       if (it->second->coherent_samples_ == 0) {
00630         continue;
00631       }
00632 
00633       std::pair<GroupCoherentSamples::iterator, bool> pair =
00634           group_samples.insert(GroupCoherentSamples::value_type(
00635               it->second->get_publication_id(),
00636               WriterCoherentSample(it->second->coherent_samples_,
00637                   it->second->sequence_number_)));
00638 
00639       if (pair.second == false) {
00640         ACE_ERROR_RETURN((LM_ERROR,
00641             ACE_TEXT("(%P|%t) ERROR: PublisherImpl::end_coherent_changes: ")
00642             ACE_TEXT("failed to insert to GroupCoherentSamples.\n")),
00643             DDS::RETCODE_ERROR);
00644       }
00645     }
00646 
00647     for (PublicationMap::iterator it = this->publication_map_.begin();
00648         it != this->publication_map_.end(); ++it) {
00649       if (it->second->coherent_samples_ == 0) {
00650         continue;
00651       }
00652 
00653       it->second->end_coherent_changes(group_samples);
00654     }
00655   }
00656 
00657   return DDS::RETCODE_OK;
00658 }

Here is the call graph for this function:

DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::get_default_datawriter_qos ( DDS::DataWriterQos qos  )  [virtual]

Definition at line 743 of file PublisherImpl.cpp.

References default_datawriter_qos_, and DDS::RETCODE_OK.

00744 {
00745   qos = default_datawriter_qos_;
00746   return DDS::RETCODE_OK;
00747 }

DDS::InstanceHandle_t OpenDDS::DCPS::PublisherImpl::get_instance_handle (  )  [virtual]

Implements OpenDDS::DCPS::EntityImpl.

Definition at line 70 of file PublisherImpl.cpp.

References handle_.

Referenced by OpenDDS::DCPS::PublisherMonitorImpl::report().

00071 {
00072   return handle_;
00073 }

Here is the caller graph for this function:

DDS::PublisherListener_ptr OpenDDS::DCPS::PublisherImpl::get_listener (  )  [virtual]

Implements DDS::Publisher.

Definition at line 473 of file PublisherImpl.cpp.

References CORBA::LocalObject::_duplicate(), and listener_.

00474 {
00475   return DDS::PublisherListener::_duplicate(listener_.in());
00476 }

Here is the call graph for this function:

DDS::DomainParticipant_ptr OpenDDS::DCPS::PublisherImpl::get_participant (  )  [virtual]

Implements DDS::Publisher.

Definition at line 725 of file PublisherImpl.cpp.

References OpenDDS::DCPS::RcHandle< T >::_retn(), OpenDDS::DCPS::WeakRcHandle< T >::lock(), and participant_.

Referenced by OpenDDS::DCPS::PublisherMonitorImpl::report().

00726 {
00727   return participant_.lock()._retn();
00728 }

Here is the call graph for this function:

Here is the caller graph for this function:

ACE_Recursive_Thread_Mutex& OpenDDS::DCPS::PublisherImpl::get_pi_lock (  )  [inline]

Definition at line 113 of file PublisherImpl.h.

00113 { return pi_lock_; }

void OpenDDS::DCPS::PublisherImpl::get_publication_ids ( PublicationIdVec &  pubs  ) 

Populates a std::vector with the PublicationIds (GUIDs) of this Publisher's Data Writers

Definition at line 901 of file PublisherImpl.cpp.

References pi_lock_, and publication_map_.

Referenced by OpenDDS::DCPS::PublisherMonitorImpl::report().

00902 {
00903   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00904       guard,
00905       this->pi_lock_,
00906   );
00907 
00908   pubs.reserve(publication_map_.size());
00909   for (PublicationMap::iterator iter = publication_map_.begin();
00910       iter != publication_map_.end();
00911       ++iter) {
00912     pubs.push_back(iter->first);
00913   }
00914 }

Here is the caller graph for this function:

DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::get_qos ( DDS::PublisherQos qos  )  [virtual]

Definition at line 456 of file PublisherImpl.cpp.

References qos_, and DDS::RETCODE_OK.

00457 {
00458   qos = qos_;
00459   return DDS::RETCODE_OK;
00460 }

bool OpenDDS::DCPS::PublisherImpl::is_clean (  )  const

This method is not defined in the IDL and is defined for internal use. Check if there is any datawriter associated with this publisher.

Definition at line 797 of file PublisherImpl.cpp.

References datawriter_map_, pi_lock_, and publication_map_.

Referenced by OpenDDS::DCPS::DomainParticipantImpl::delete_publisher(), and ~PublisherImpl().

00798 {
00799   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00800       guard,
00801       this->pi_lock_,
00802       false);
00803   return datawriter_map_.empty() && publication_map_.empty();
00804 }

Here is the caller graph for this function:

bool OpenDDS::DCPS::PublisherImpl::is_suspended ( void   )  const

Definition at line 498 of file PublisherImpl.cpp.

References pi_lock_, and suspend_depth_count_.

00499 {
00500   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00501       guard,
00502       this->pi_lock_,
00503       false);
00504   return suspend_depth_count_;
00505 }

DDS::PublisherListener_ptr OpenDDS::DCPS::PublisherImpl::listener_for ( ::DDS::StatusKind  kind  ) 

This is used to retrieve the listener for a certain status change. If this publisher has a registered listener and the status kind is in the listener mask then the listener is returned. Otherwise, the query for listener is propagated up to the factory/DomainParticipant.

ACE_Time_Value OpenDDS::DCPS::PublisherImpl::liveliness_check_interval ( DDS::LivelinessQosPolicyKind  kind  ) 

Definition at line 878 of file PublisherImpl.cpp.

References datawriter_map_, and ACE_Time_Value::max_time.

00879 {
00880   ACE_Time_Value tv = ACE_Time_Value::max_time;
00881   for (DataWriterMap::iterator it(datawriter_map_.begin());
00882       it != datawriter_map_.end(); ++it) {
00883     tv = std::min (tv, it->second->liveliness_check_interval(kind));
00884   }
00885   return tv;
00886 }

DDS::DataWriter_ptr OpenDDS::DCPS::PublisherImpl::lookup_datawriter ( const char *  topic_name  )  [virtual]

Definition at line 306 of file PublisherImpl.cpp.

References CORBA::LocalObject::_duplicate(), CORBA::LocalObject::_nil(), ACE_TEXT(), datawriter_map_, OpenDDS::DCPS::DCPS_debug_level, LM_DEBUG, and pi_lock_.

00307 {
00308   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00309       guard,
00310       this->pi_lock_,
00311       DDS::DataWriter::_nil());
00312 
00313   // If multiple entries whose key is "topic_name" then which one is
00314   // returned ? Spec does not limit which one should give.
00315   DataWriterMap::iterator it = datawriter_map_.find(topic_name);
00316 
00317   if (it == datawriter_map_.end()) {
00318     if (DCPS_debug_level >= 2) {
00319       ACE_DEBUG((LM_DEBUG,
00320           ACE_TEXT("(%P|%t) ")
00321           ACE_TEXT("PublisherImpl::lookup_datawriter, ")
00322           ACE_TEXT("The datawriter(topic_name=%C) is not found\n"),
00323           topic_name));
00324     }
00325 
00326     return DDS::DataWriter::_nil();
00327 
00328   } else {
00329     return DDS::DataWriter::_duplicate(it->second.in());
00330   }
00331 }

Here is the call graph for this function:

typedef OpenDDS::DCPS::PublisherImpl::OPENDDS_MAP_CMP ( RepoId  ,
DDS::DataWriterQos  ,
GUID_tKeyLessThan   
) [private]
typedef OpenDDS::DCPS::PublisherImpl::OPENDDS_MAP_CMP ( PublicationId  ,
DataWriterImpl_rch  ,
GUID_tKeyLessThan   
) [private]
typedef OpenDDS::DCPS::PublisherImpl::OPENDDS_MULTIMAP ( OPENDDS_STRING  ,
DataWriterImpl_rch   
) [private]
typedef OpenDDS::DCPS::PublisherImpl::OPENDDS_SET ( DataWriterImpl_rch   )  [private]
typedef OpenDDS::DCPS::PublisherImpl::OPENDDS_VECTOR ( PublicationId   ) 
RcHandle< EntityImpl > OpenDDS::DCPS::PublisherImpl::parent ( void   )  const [virtual]

Reimplemented from OpenDDS::DCPS::EntityImpl.

Definition at line 917 of file PublisherImpl.cpp.

References OpenDDS::DCPS::WeakRcHandle< T >::lock(), and participant_.

00918 {
00919   return this->participant_.lock();
00920 }

Here is the call graph for this function:

bool OpenDDS::DCPS::PublisherImpl::participant_liveliness_activity_after ( const ACE_Time_Value tv  ) 

Definition at line 889 of file PublisherImpl.cpp.

References datawriter_map_.

00890 {
00891   for (DataWriterMap::iterator it(datawriter_map_.begin());
00892       it != datawriter_map_.end(); ++it) {
00893     if (it->second->participant_liveliness_activity_after(tv)) {
00894       return true;
00895     }
00896   }
00897   return false;
00898 }

DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::resume_publications (  )  [virtual]

Implements DDS::Publisher.

Definition at line 508 of file PublisherImpl.cpp.

References ACE_TEXT(), OpenDDS::DCPS::EntityImpl::enabled_, LM_ERROR, pi_lock_, publication_map_, DDS::RETCODE_ERROR, DDS::RETCODE_NOT_ENABLED, DDS::RETCODE_OK, DDS::RETCODE_PRECONDITION_NOT_MET, and suspend_depth_count_.

00509 {
00510   if (enabled_ == false) {
00511     ACE_ERROR_RETURN((LM_ERROR,
00512         ACE_TEXT("(%P|%t) ERROR: ")
00513         ACE_TEXT("PublisherImpl::resume_publications, ")
00514         ACE_TEXT(" Entity is not enabled. \n")),
00515         DDS::RETCODE_NOT_ENABLED);
00516   }
00517 
00518   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00519       guard,
00520       this->pi_lock_,
00521       DDS::RETCODE_ERROR);
00522 
00523   --suspend_depth_count_;
00524 
00525   if (suspend_depth_count_ < 0) {
00526     suspend_depth_count_ = 0;
00527     return DDS::RETCODE_PRECONDITION_NOT_MET;
00528   }
00529 
00530   if (suspend_depth_count_ == 0) {
00531 
00532     for (PublicationMap::iterator it = this->publication_map_.begin();
00533         it != this->publication_map_.end(); ++it) {
00534       it->second->send_suspended_data();
00535     }
00536   }
00537 
00538   return DDS::RETCODE_OK;
00539 }

Here is the call graph for this function:

DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::set_default_datawriter_qos ( const DDS::DataWriterQos qos  )  [virtual]

Definition at line 731 of file PublisherImpl.cpp.

References OpenDDS::DCPS::Qos_Helper::consistent(), default_datawriter_qos_, DDS::RETCODE_INCONSISTENT_POLICY, DDS::RETCODE_OK, and OpenDDS::DCPS::Qos_Helper::valid().

00732 {
00733   if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
00734     default_datawriter_qos_ = qos;
00735     return DDS::RETCODE_OK;
00736 
00737   } else {
00738     return DDS::RETCODE_INCONSISTENT_POLICY;
00739   }
00740 }

Here is the call graph for this function:

DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::set_listener ( DDS::PublisherListener_ptr  a_listener,
DDS::StatusMask  mask 
) [virtual]

Definition at line 463 of file PublisherImpl.cpp.

References CORBA::LocalObject::_duplicate(), listener_, listener_mask_, and DDS::RETCODE_OK.

00465 {
00466   listener_mask_ = mask;
00467   //note: OK to duplicate  a nil object ref
00468   listener_ = DDS::PublisherListener::_duplicate(a_listener);
00469   return DDS::RETCODE_OK;
00470 }

Here is the call graph for this function:

DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::set_qos ( const DDS::PublisherQos qos  )  [virtual]

Definition at line 378 of file PublisherImpl.cpp.

References ACE_TEXT(), OpenDDS::DCPS::Qos_Helper::changeable(), OpenDDS::DCPS::Qos_Helper::consistent(), domain_id_, OpenDDS::DCPS::EntityImpl::enabled_, LM_ERROR, OpenDDS::DCPS::WeakRcHandle< T >::lock(), OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK, OPENDDS_STRING, participant_, pi_lock_, publication_map_, qos_, DDS::RETCODE_ERROR, DDS::RETCODE_IMMUTABLE_POLICY, DDS::RETCODE_INCONSISTENT_POLICY, DDS::RETCODE_OK, DDS::RETCODE_UNSUPPORTED, status, TheServiceParticipant, and OpenDDS::DCPS::Qos_Helper::valid().

00379 {
00380 
00381   OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00382 
00383   if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
00384     if (qos_ == qos)
00385       return DDS::RETCODE_OK;
00386 
00387     // for the not changeable qos, it can be changed before enable
00388     if (!Qos_Helper::changeable(qos_, qos) && enabled_ == true) {
00389       return DDS::RETCODE_IMMUTABLE_POLICY;
00390 
00391     } else {
00392       qos_ = qos;
00393 
00394       DwIdToQosMap idToQosMap;
00395       {
00396         ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00397             guard,
00398             this->pi_lock_,
00399             DDS::RETCODE_ERROR);
00400 
00401         for (PublicationMap::iterator iter = publication_map_.begin();
00402             iter != publication_map_.end();
00403             ++iter) {
00404           DDS::DataWriterQos qos;
00405           iter->second->get_qos(qos);
00406           RepoId id = iter->second->get_publication_id();
00407           std::pair<DwIdToQosMap::iterator, bool> pair =
00408               idToQosMap.insert(DwIdToQosMap::value_type(id, qos));
00409 
00410           if (pair.second == false) {
00411             GuidConverter converter(id);
00412             ACE_ERROR_RETURN((LM_ERROR,
00413                 ACE_TEXT("(%P|%t) ")
00414                 ACE_TEXT("PublisherImpl::set_qos: ")
00415                 ACE_TEXT("insert id %C to DwIdToQosMap ")
00416                 ACE_TEXT("failed.\n"),
00417                 OPENDDS_STRING(converter).c_str()), DDS::RETCODE_ERROR);
00418           }
00419         }
00420       }
00421 
00422       DwIdToQosMap::iterator iter = idToQosMap.begin();
00423 
00424       while (iter != idToQosMap.end()) {
00425         Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
00426         bool status = false;
00427 
00428         RcHandle<DomainParticipantImpl> participant = this->participant_.lock();
00429         if (participant)
00430           status = disco->update_publication_qos(
00431               participant->get_domain_id(),
00432               participant->get_id(),
00433               iter->first,
00434               iter->second,
00435               this->qos_);
00436 
00437         if (!status) {
00438           ACE_ERROR_RETURN((LM_ERROR,
00439               ACE_TEXT("(%P|%t) PublisherImpl::set_qos, ")
00440               ACE_TEXT("failed. \n")),
00441               DDS::RETCODE_ERROR);
00442         }
00443 
00444         ++iter;
00445       }
00446     }
00447 
00448     return DDS::RETCODE_OK;
00449 
00450   } else {
00451     return DDS::RETCODE_INCONSISTENT_POLICY;
00452   }
00453 }

Here is the call graph for this function:

DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::suspend_publications (  )  [virtual]

Implements DDS::Publisher.

Definition at line 479 of file PublisherImpl.cpp.

References ACE_TEXT(), OpenDDS::DCPS::EntityImpl::enabled_, LM_ERROR, pi_lock_, DDS::RETCODE_ERROR, DDS::RETCODE_NOT_ENABLED, DDS::RETCODE_OK, and suspend_depth_count_.

00480 {
00481   if (enabled_ == false) {
00482     ACE_ERROR_RETURN((LM_ERROR,
00483         ACE_TEXT("(%P|%t) ERROR: ")
00484         ACE_TEXT("PublisherImpl::suspend_publications, ")
00485         ACE_TEXT(" Entity is not enabled. \n")),
00486         DDS::RETCODE_NOT_ENABLED);
00487   }
00488 
00489   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00490       guard,
00491       this->pi_lock_,
00492       DDS::RETCODE_ERROR);
00493   ++suspend_depth_count_;
00494   return DDS::RETCODE_OK;
00495 }

Here is the call graph for this function:

bool OpenDDS::DCPS::PublisherImpl::validate_datawriter_qos ( const DDS::DataWriterQos qos,
const DDS::DataWriterQos default_qos,
DDS::Topic_ptr  a_topic,
DDS::DataWriterQos dw_qos 
) [static]

Definition at line 923 of file PublisherImpl.cpp.

References CORBA::LocalObject::_nil(), ACE_TEXT(), OpenDDS::DCPS::Qos_Helper::consistent(), copy_from_topic_qos(), DATAWRITER_QOS_DEFAULT, DATAWRITER_QOS_USE_TOPIC_QOS, CORBA::is_nil(), LM_ERROR, OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK, OPENDDS_NO_DURABILITY_SERVICE_COMPATIBILITY_CHECK, OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK, OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK, OPENDDS_NO_OWNERSHIP_STRENGTH_COMPATIBILITY_CHECK, and OpenDDS::DCPS::Qos_Helper::valid().

Referenced by create_datawriter(), and OpenDDS::DCPS::DomainParticipantImpl::create_replayer().

00927 {
00928   if (CORBA::is_nil(a_topic)) {
00929     ACE_ERROR((LM_ERROR,
00930         ACE_TEXT("(%P|%t) ERROR: ")
00931         ACE_TEXT("PublisherImpl::create_datawriter, ")
00932         ACE_TEXT("topic is nil.\n")));
00933     return DDS::DataWriter::_nil();
00934   }
00935 
00936   if (qos == DATAWRITER_QOS_DEFAULT) {
00937     dw_qos = default_qos;
00938 
00939   } else if (qos == DATAWRITER_QOS_USE_TOPIC_QOS) {
00940     DDS::TopicQos topic_qos;
00941     a_topic->get_qos(topic_qos);
00942     dw_qos = default_qos;
00943 
00944     Qos_Helper::copy_from_topic_qos(dw_qos, topic_qos);
00945 
00946   } else {
00947     dw_qos = qos;
00948   }
00949 
00950   OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(dw_qos, DDS::DataWriter::_nil());
00951   OPENDDS_NO_OWNERSHIP_STRENGTH_COMPATIBILITY_CHECK(dw_qos, DDS::DataWriter::_nil());
00952   OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(dw_qos, DDS::DataWriter::_nil());
00953   OPENDDS_NO_DURABILITY_SERVICE_COMPATIBILITY_CHECK(dw_qos, DDS::DataWriter::_nil());
00954   OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(dw_qos, DDS::DataWriter::_nil());
00955 
00956   if (!Qos_Helper::valid(dw_qos)) {
00957     ACE_ERROR((LM_ERROR,
00958         ACE_TEXT("(%P|%t) ERROR: ")
00959         ACE_TEXT("PublisherImpl::create_datawriter, ")
00960         ACE_TEXT("invalid qos.\n")));
00961     return DDS::DataWriter::_nil();
00962   }
00963 
00964   if (!Qos_Helper::consistent(dw_qos)) {
00965     ACE_ERROR((LM_ERROR,
00966         ACE_TEXT("(%P|%t) ERROR: ")
00967         ACE_TEXT("PublisherImpl::create_datawriter, ")
00968         ACE_TEXT("inconsistent qos.\n")));
00969     return DDS::DataWriter::_nil();
00970   }
00971   return true;
00972 }

Here is the call graph for this function:

Here is the caller graph for this function:

DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::wait_for_acknowledgments ( const DDS::Duration_t max_wait  )  [virtual]

Definition at line 663 of file PublisherImpl.cpp.

References ACE_TEXT(), datawriter_map_, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::EntityImpl::enabled_, OpenDDS::DCPS::RcHandle< T >::in(), LM_DEBUG, LM_ERROR, OpenDDS::DCPS::OPENDDS_MAP(), pi_lock_, DDS::RELIABLE_RELIABILITY_QOS, DDS::RETCODE_ERROR, DDS::RETCODE_NOT_ENABLED, and DDS::RETCODE_OK.

00665 {
00666   if (enabled_ == false) {
00667     ACE_ERROR_RETURN((LM_ERROR,
00668         ACE_TEXT("(%P|%t) ERROR: PublisherImpl::wait_for_acknowledgments, ")
00669         ACE_TEXT("Entity is not enabled.\n")),
00670         DDS::RETCODE_NOT_ENABLED);
00671   }
00672 
00673   typedef OPENDDS_MAP(DataWriterImpl*, DataWriterImpl::AckToken) DataWriterAckMap;
00674   DataWriterAckMap ack_writers;
00675   {
00676     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00677         guard,
00678         this->pi_lock_,
00679         DDS::RETCODE_ERROR);
00680 
00681     // Collect writers to request acks
00682     for (DataWriterMap::iterator it(this->datawriter_map_.begin());
00683         it != this->datawriter_map_.end(); ++it) {
00684       DataWriterImpl_rch writer = it->second;
00685       if (writer->qos_.reliability.kind != DDS::RELIABLE_RELIABILITY_QOS)
00686         continue;
00687       if (writer->should_ack()) {
00688         DataWriterImpl::AckToken token = writer->create_ack_token(max_wait);
00689 
00690         std::pair<DataWriterAckMap::iterator, bool> pair =
00691             ack_writers.insert(DataWriterAckMap::value_type(writer.in(), token));
00692 
00693         if (!pair.second) {
00694           ACE_ERROR_RETURN((LM_ERROR,
00695               ACE_TEXT("(%P|%t) ERROR: PublisherImpl::wait_for_acknowledgments, ")
00696               ACE_TEXT("Unable to insert AckToken into DataWriterAckMap!\n")),
00697               DDS::RETCODE_ERROR);
00698         }
00699       }
00700     }
00701   }
00702 
00703   if (ack_writers.empty()) {
00704     if (DCPS_debug_level > 0) {
00705       ACE_DEBUG((LM_DEBUG,
00706           ACE_TEXT("(%P|%t) PublisherImpl::wait_for_acknowledgments() - ")
00707           ACE_TEXT("not blocking due to no writers requiring acks.\n")));
00708     }
00709 
00710     return DDS::RETCODE_OK;
00711   }
00712 
00713   // Wait for ack responses from all associated readers
00714   for (DataWriterAckMap::iterator it(ack_writers.begin());
00715       it != ack_writers.end(); ++it) {
00716     DataWriterImpl::AckToken token = it->second;
00717 
00718     it->first->wait_for_specific_ack(token);
00719   }
00720 
00721   return DDS::RETCODE_OK;
00722 }

Here is the call graph for this function:

DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::writer_enabled ( const char *  topic_name,
DataWriterImpl impl 
)

This method is called when the datawriter created by this publisher was enabled.

Definition at line 807 of file PublisherImpl.cpp.

References ACE_TEXT(), datawriter_map_, LM_ERROR, monitor_, OPENDDS_STRING, pi_lock_, publication_map_, OpenDDS::DCPS::rchandle_from(), OpenDDS::DCPS::Monitor::report(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, and writers_not_enabled_.

00809 {
00810   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00811       guard,
00812       this->pi_lock_,
00813       DDS::RETCODE_ERROR);
00814   DataWriterImpl_rch writer = rchandle_from(writer_ptr);
00815   writers_not_enabled_.erase(writer);
00816 
00817   datawriter_map_.insert(DataWriterMap::value_type(topic_name, writer));
00818 
00819   const RepoId publication_id = writer->get_publication_id();
00820 
00821   std::pair<PublicationMap::iterator, bool> pair =
00822       publication_map_.insert(PublicationMap::value_type(publication_id, writer));
00823 
00824   if (pair.second == false) {
00825     GuidConverter converter(publication_id);
00826     ACE_ERROR_RETURN((LM_ERROR,
00827         ACE_TEXT("(%P|%t) ERROR: ")
00828         ACE_TEXT("PublisherImpl::writer_enabled: ")
00829         ACE_TEXT("insert publication %C failed.\n"),
00830         OPENDDS_STRING(converter).c_str()), DDS::RETCODE_ERROR);
00831   }
00832 
00833   if (this->monitor_) {
00834     this->monitor_->report();
00835   }
00836 
00837   return DDS::RETCODE_OK;
00838 }

Here is the call graph for this function:


Friends And Related Function Documentation

friend class DataWriterImpl [friend]

Definition at line 43 of file PublisherImpl.h.


Member Data Documentation

Start of current aggregation period. - NOT USED IN FIRST IMPL.

Definition at line 196 of file PublisherImpl.h.

The number of times begin_coherent_changes as been called.

Definition at line 184 of file PublisherImpl.h.

Referenced by begin_coherent_changes(), and end_coherent_changes().

Default datawriter Qos policy list.

Definition at line 166 of file PublisherImpl.h.

Referenced by create_datawriter(), get_default_datawriter_qos(), and set_default_datawriter_qos().

Domain in which we are contained.

Definition at line 187 of file PublisherImpl.h.

Referenced by delete_datawriter(), and set_qos().

Definition at line 161 of file PublisherImpl.h.

Referenced by get_instance_handle().

DDS::PublisherListener_var OpenDDS::DCPS::PublisherImpl::listener_ [private]

Used to notify the entity for relevant events.

Definition at line 172 of file PublisherImpl.h.

Referenced by get_listener(), and set_listener().

The StatusKind bit mask indicates which status condition change can be notified by the listener of this entity.

Definition at line 170 of file PublisherImpl.h.

Referenced by set_listener().

Monitor object for this entity.

Definition at line 205 of file PublisherImpl.h.

Referenced by delete_datawriter(), enable(), PublisherImpl(), and writer_enabled().

The DomainParticipant servant that owns this Publisher.

Definition at line 189 of file PublisherImpl.h.

Referenced by create_datawriter(), delete_datawriter(), enable(), get_participant(), parent(), and set_qos().

This map is used to support datawriter lookup by datawriter repository id.

Definition at line 181 of file PublisherImpl.h.

Referenced by begin_coherent_changes(), delete_datawriter(), end_coherent_changes(), get_publication_ids(), is_clean(), resume_publications(), set_qos(), writer_enabled(), and ~PublisherImpl().

Note:
The publisher_id_ is not generated by repository, it's unique in DomainParticipant scope.

Definition at line 209 of file PublisherImpl.h.

Publisher QoS policy list.

Definition at line 164 of file PublisherImpl.h.

Referenced by begin_coherent_changes(), create_datawriter(), enable(), end_coherent_changes(), get_qos(), and set_qos().

Definition at line 202 of file PublisherImpl.h.

Referenced by delete_datawriter().

Unique sequence number used when the scope_access = GROUP.

  • NOT USED IN FIRST IMPL - not supporting GROUP scope

Definition at line 194 of file PublisherImpl.h.

The suspend depth count.

Definition at line 191 of file PublisherImpl.h.

Referenced by is_suspended(), resume_publications(), and suspend_publications().

Definition at line 175 of file PublisherImpl.h.

Referenced by create_datawriter(), enable(), and writer_enabled().


The documentation for this class was generated from the following files:
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1