#include <PublisherImpl.h>
Inheritance diagram for OpenDDS::DCPS::PublisherImpl:
Public Member Functions | |
PublisherImpl (DDS::InstanceHandle_t handle, RepoId id, const DDS::PublisherQos &qos, DDS::PublisherListener_ptr a_listener, const DDS::StatusMask &mask, DomainParticipantImpl *participant) | |
virtual | ~PublisherImpl () |
virtual DDS::InstanceHandle_t | get_instance_handle () |
bool | contains_writer (DDS::InstanceHandle_t a_handle) |
virtual DDS::DataWriter_ptr | create_datawriter (DDS::Topic_ptr a_topic, const DDS::DataWriterQos &qos, DDS::DataWriterListener_ptr a_listener, DDS::StatusMask mask) |
virtual DDS::ReturnCode_t | delete_datawriter (DDS::DataWriter_ptr a_datawriter) |
virtual DDS::DataWriter_ptr | lookup_datawriter (const char *topic_name) |
virtual DDS::ReturnCode_t | delete_contained_entities () |
virtual DDS::ReturnCode_t | set_qos (const DDS::PublisherQos &qos) |
virtual DDS::ReturnCode_t | get_qos (DDS::PublisherQos &qos) |
virtual DDS::ReturnCode_t | set_listener (DDS::PublisherListener_ptr a_listener, DDS::StatusMask mask) |
virtual DDS::PublisherListener_ptr | get_listener () |
virtual DDS::ReturnCode_t | suspend_publications () |
virtual DDS::ReturnCode_t | resume_publications () |
virtual DDS::ReturnCode_t | begin_coherent_changes () |
virtual DDS::ReturnCode_t | end_coherent_changes () |
virtual DDS::ReturnCode_t | wait_for_acknowledgments (const DDS::Duration_t &max_wait) |
virtual DDS::DomainParticipant_ptr | get_participant () |
virtual DDS::ReturnCode_t | set_default_datawriter_qos (const DDS::DataWriterQos &qos) |
virtual DDS::ReturnCode_t | get_default_datawriter_qos (DDS::DataWriterQos &qos) |
virtual DDS::ReturnCode_t | copy_from_topic_qos (DDS::DataWriterQos &a_datawriter_qos, const DDS::TopicQos &a_topic_qos) |
virtual DDS::ReturnCode_t | enable () |
ACE_Recursive_Thread_Mutex & | get_pi_lock () |
bool | is_clean () const |
DDS::ReturnCode_t | writer_enabled (const char *topic_name, DataWriterImpl *impl) |
DDS::PublisherListener_ptr | listener_for (::DDS::StatusKind kind) |
DDS::ReturnCode_t | assert_liveliness_by_participant () |
ACE_Time_Value | liveliness_check_interval (DDS::LivelinessQosPolicyKind kind) |
bool | participant_liveliness_activity_after (const ACE_Time_Value &tv) |
typedef | OPENDDS_VECTOR (PublicationId) PublicationIdVec |
void | get_publication_ids (PublicationIdVec &pubs) |
bool | is_suspended () const |
virtual EntityImpl * | parent () const |
Static Public Member Functions | |
static bool | validate_datawriter_qos (const DDS::DataWriterQos &qos, const DDS::DataWriterQos &default_qos, DDS::Topic_ptr a_topic, DDS::DataWriterQos &dw_qos) |
Private Types | |
typedef ACE_Recursive_Thread_Mutex | lock_type |
typedef ACE_Reverse_Lock< lock_type > | reverse_lock_type |
Private Member Functions | |
typedef | OPENDDS_MULTIMAP (OPENDDS_STRING, DataWriterImpl *) DataWriterMap |
typedef | OPENDDS_MAP_CMP (PublicationId, DataWriterImpl *, GUID_tKeyLessThan) PublicationMap |
typedef | OPENDDS_MAP_CMP (RepoId, DDS::DataWriterQos, GUID_tKeyLessThan) DwIdToQosMap |
Private Attributes | |
DDS::InstanceHandle_t | handle_ |
DDS::PublisherQos | qos_ |
Publisher QoS policy list. | |
DDS::DataWriterQos | default_datawriter_qos_ |
Default datawriter Qos policy list. | |
DDS::StatusMask | listener_mask_ |
DDS::PublisherListener_var | listener_ |
Used to notify the entity for relevant events. | |
DataWriterMap | datawriter_map_ |
This map is used to support datawriter lookup by topic name. | |
PublicationMap | publication_map_ |
std::size_t | change_depth_ |
The number of times begin_coherent_changes as been called. | |
DDS::DomainId_t | domain_id_ |
Domain in which we are contained. | |
DomainParticipantImpl * | participant_ |
The DomainParticipant servant that owns this Publisher. | |
CORBA::Short | suspend_depth_count_ |
The suspend depth count. | |
SequenceNumber | sequence_number_ |
ACE_Time_Value | aggregation_period_start_ |
Start of current aggregation period. - NOT USED IN FIRST IMPL. | |
lock_type | pi_lock_ |
The recursive lock to protect datawriter map and suspend count. | |
reverse_lock_type | reverse_pi_lock_ |
Monitor * | monitor_ |
Monitor object for this entity. | |
RepoId | publisher_id_ |
Friends | |
class | DataWriterImpl |
This class acts as a factory and container of the datawriter.
See the DDS specification, OMG formal/04-12-02, for a description of the interface this class is implementing.
Definition at line 37 of file PublisherImpl.h.
typedef ACE_Recursive_Thread_Mutex OpenDDS::DCPS::PublisherImpl::lock_type [private] |
Definition at line 193 of file PublisherImpl.h.
typedef ACE_Reverse_Lock<lock_type> OpenDDS::DCPS::PublisherImpl::reverse_lock_type [private] |
Definition at line 194 of file PublisherImpl.h.
OpenDDS::DCPS::PublisherImpl::PublisherImpl | ( | DDS::InstanceHandle_t | handle, | |
RepoId | id, | |||
const DDS::PublisherQos & | qos, | |||
DDS::PublisherListener_ptr | a_listener, | |||
const DDS::StatusMask & | mask, | |||
DomainParticipantImpl * | participant | |||
) |
Definition at line 29 of file PublisherImpl.cpp.
References monitor_, and TheServiceParticipant.
00035 : handle_(handle), 00036 qos_(qos), 00037 default_datawriter_qos_(TheServiceParticipant->initial_DataWriterQos()), 00038 listener_mask_(mask), 00039 listener_(DDS::PublisherListener::_duplicate(a_listener)), 00040 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 00041 change_depth_(0), 00042 #endif 00043 domain_id_(participant->get_domain_id()), 00044 participant_(participant), 00045 suspend_depth_count_(0), 00046 sequence_number_(), 00047 aggregation_period_start_(ACE_Time_Value::zero), 00048 reverse_pi_lock_(pi_lock_), 00049 monitor_(0), 00050 publisher_id_(id) 00051 { 00052 monitor_ = TheServiceParticipant->monitor_factory_->create_publisher_monitor(this); 00053 }
OpenDDS::DCPS::PublisherImpl::~PublisherImpl | ( | ) | [virtual] |
Definition at line 56 of file PublisherImpl.cpp.
References is_clean().
00057 { 00058 //The datawriters should be deleted already before calling delete 00059 //publisher. 00060 if (!is_clean()) { 00061 ACE_ERROR((LM_ERROR, 00062 ACE_TEXT("(%P|%t) ERROR: ") 00063 ACE_TEXT("PublisherImpl::~PublisherImpl, ") 00064 ACE_TEXT("some datawriters still exist.\n"))); 00065 } 00066 }
DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::assert_liveliness_by_participant | ( | ) |
Definition at line 818 of file PublisherImpl.cpp.
References datawriter_map_, and DDS::RETCODE_OK.
00819 { 00820 DDS::ReturnCode_t ret = DDS::RETCODE_OK; 00821 00822 for (DataWriterMap::iterator it(datawriter_map_.begin()); 00823 it != datawriter_map_.end(); ++it) { 00824 DDS::ReturnCode_t dw_ret = it->second->assert_liveliness_by_participant(); 00825 00826 if (dw_ret != DDS::RETCODE_OK) { 00827 ret = dw_ret; 00828 } 00829 } 00830 00831 return ret; 00832 }
DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::begin_coherent_changes | ( | ) | [virtual] |
Implements DDS::Publisher.
Definition at line 519 of file PublisherImpl.cpp.
References change_depth_, OpenDDS::DCPS::EntityImpl::enabled_, DDS::INSTANCE_PRESENTATION_QOS, DDS::PublisherQos::presentation, publication_map_, qos_, DDS::RETCODE_ERROR, DDS::RETCODE_NOT_ENABLED, and DDS::RETCODE_OK.
00520 { 00521 if (enabled_ == false) { 00522 ACE_ERROR_RETURN((LM_ERROR, 00523 ACE_TEXT("(%P|%t) ERROR: PublisherImpl::begin_coherent_changes:") 00524 ACE_TEXT(" Publisher is not enabled!\n")), 00525 DDS::RETCODE_NOT_ENABLED); 00526 } 00527 00528 if (!qos_.presentation.coherent_access) { 00529 ACE_ERROR_RETURN((LM_ERROR, 00530 ACE_TEXT("(%P|%t) ERROR: PublisherImpl::begin_coherent_changes:") 00531 ACE_TEXT(" QoS policy does not support coherent access!\n")), 00532 DDS::RETCODE_ERROR); 00533 } 00534 00535 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 00536 guard, 00537 this->pi_lock_, 00538 DDS::RETCODE_ERROR); 00539 00540 ++this->change_depth_; 00541 00542 if (qos_.presentation.access_scope == DDS::INSTANCE_PRESENTATION_QOS) { 00543 // INSTANCE access scope essentially behaves 00544 // as a no-op. (see: 7.1.3.6) 00545 return DDS::RETCODE_OK; 00546 } 00547 00548 // We should only notify publications on the first 00549 // and last change to the current change set: 00550 if (this->change_depth_ == 1) { 00551 for (PublicationMap::iterator it = this->publication_map_.begin(); 00552 it != this->publication_map_.end(); ++it) { 00553 it->second->begin_coherent_changes(); 00554 } 00555 } 00556 00557 return DDS::RETCODE_OK; 00558 }
bool OpenDDS::DCPS::PublisherImpl::contains_writer | ( | DDS::InstanceHandle_t | a_handle | ) |
Definition at line 75 of file PublisherImpl.cpp.
References datawriter_map_, and DDS::RETCODE_ERROR.
00076 { 00077 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 00078 guard, 00079 this->pi_lock_, 00080 DDS::RETCODE_ERROR); 00081 00082 for (DataWriterMap::iterator it(datawriter_map_.begin()); 00083 it != datawriter_map_.end(); ++it) { 00084 if (a_handle == it->second->get_instance_handle()) { 00085 return true; 00086 } 00087 } 00088 00089 return false; 00090 }
DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::copy_from_topic_qos | ( | DDS::DataWriterQos & | a_datawriter_qos, | |
const DDS::TopicQos & | a_topic_qos | |||
) | [virtual] |
Definition at line 725 of file PublisherImpl.cpp.
References OpenDDS::DCPS::Qos_Helper::copy_from_topic_qos(), DDS::RETCODE_INCONSISTENT_POLICY, and DDS::RETCODE_OK.
00727 { 00728 if (Qos_Helper::copy_from_topic_qos(a_datawriter_qos, a_topic_qos)) { 00729 return DDS::RETCODE_OK; 00730 } else { 00731 return DDS::RETCODE_INCONSISTENT_POLICY; 00732 } 00733 }
DDS::DataWriter_ptr OpenDDS::DCPS::PublisherImpl::create_datawriter | ( | DDS::Topic_ptr | a_topic, | |
const DDS::DataWriterQos & | qos, | |||
DDS::DataWriterListener_ptr | a_listener, | |||
DDS::StatusMask | mask | |||
) | [virtual] |
Definition at line 93 of file PublisherImpl.cpp.
References default_datawriter_qos_, OpenDDS::DCPS::DataWriterImpl::enable(), DDS::PublisherQos::entity_factory, OpenDDS::DCPS::TopicDescriptionImpl::get_name(), OpenDDS::DCPS::TopicDescriptionImpl::get_type_support(), OpenDDS::DCPS::DataWriterImpl::init(), participant_, qos_, DDS::RETCODE_OK, and validate_datawriter_qos().
00098 { 00099 DDS::DataWriterQos dw_qos; 00100 00101 if (!validate_datawriter_qos(qos, default_datawriter_qos_, a_topic, dw_qos)) { 00102 return DDS::DataWriter::_nil(); 00103 } 00104 00105 TopicImpl* topic_servant = dynamic_cast<TopicImpl*>(a_topic); 00106 00107 OpenDDS::DCPS::TypeSupport_ptr typesupport = 00108 topic_servant->get_type_support(); 00109 00110 if (typesupport == 0) { 00111 CORBA::String_var name = topic_servant->get_name(); 00112 ACE_ERROR((LM_ERROR, 00113 ACE_TEXT("(%P|%t) ERROR: ") 00114 ACE_TEXT("PublisherImpl::create_datawriter, ") 00115 ACE_TEXT("typesupport(topic_name=%C) is nil.\n"), 00116 name.in())); 00117 return DDS::DataWriter::_nil(); 00118 } 00119 00120 DDS::DataWriter_var dw_obj = typesupport->create_datawriter(); 00121 00122 DataWriterImpl* dw_servant = 00123 dynamic_cast <DataWriterImpl*>(dw_obj.in()); 00124 00125 dw_servant->init(a_topic, 00126 topic_servant, 00127 dw_qos, 00128 a_listener, 00129 mask, 00130 participant_, 00131 this, 00132 dw_obj.in()); 00133 00134 if (this->enabled_ == true 00135 && qos_.entity_factory.autoenable_created_entities == 1) { 00136 00137 DDS::ReturnCode_t ret = dw_servant->enable(); 00138 00139 if (ret != DDS::RETCODE_OK) { 00140 ACE_ERROR((LM_ERROR, 00141 ACE_TEXT("(%P|%t) ERROR: ") 00142 ACE_TEXT("PublisherImpl::create_datawriter, ") 00143 ACE_TEXT("enable failed.\n"))); 00144 return DDS::DataWriter::_nil(); 00145 } 00146 } 00147 00148 return DDS::DataWriter::_duplicate(dw_obj.in()); 00149 }
DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::delete_contained_entities | ( | ) | [virtual] |
Implements DDS::Publisher.
Definition at line 312 of file PublisherImpl.cpp.
References datawriter_map_, delete_datawriter(), OpenDDS::DCPS::DataWriterImpl::get_publication_id(), OpenDDS::DCPS::GUID_UNKNOWN, OPENDDS_STRING, DDS::RETCODE_ERROR, DDS::RETCODE_OK, and OpenDDS::DCPS::EntityImpl::set_deleted().
00313 { 00314 // mark that the entity is being deleted 00315 set_deleted(true); 00316 00317 while (true) { 00318 PublicationId pub_id = GUID_UNKNOWN; 00319 DataWriterImpl* a_datawriter; 00320 00321 { 00322 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 00323 guard, 00324 this->pi_lock_, 00325 DDS::RETCODE_ERROR); 00326 00327 if (datawriter_map_.empty()) { 00328 break; 00329 } else { 00330 a_datawriter = datawriter_map_.begin()->second; 00331 pub_id = a_datawriter->get_publication_id(); 00332 } 00333 } 00334 00335 DDS::ReturnCode_t ret = delete_datawriter(a_datawriter); 00336 00337 if (ret != DDS::RETCODE_OK) { 00338 GuidConverter converter(pub_id); 00339 ACE_ERROR_RETURN((LM_ERROR, 00340 ACE_TEXT("(%P|%t) ERROR: ") 00341 ACE_TEXT("PublisherImpl::") 00342 ACE_TEXT("delete_contained_entities: ") 00343 ACE_TEXT("failed to delete ") 00344 ACE_TEXT("datawriter %C.\n"), 00345 OPENDDS_STRING(converter).c_str()),ret); 00346 } 00347 } 00348 00349 // the publisher can now start creating new publications 00350 set_deleted(false); 00351 00352 return DDS::RETCODE_OK; 00353 }
DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::delete_datawriter | ( | DDS::DataWriter_ptr | a_datawriter | ) | [virtual] |
Definition at line 152 of file PublisherImpl.cpp.
References OpenDDS::DCPS::DataWriterImpl::cleanup(), datawriter_map_, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::DataWriterImpl::get_publication_id(), OpenDDS::DCPS::DataWriterImpl::get_publisher(), OpenDDS::DCPS::DataWriterImpl::get_topic_name(), OpenDDS::DCPS::GUID_UNKNOWN, OPENDDS_STRING, participant_, OpenDDS::DCPS::DataWriterImpl::persist_data(), OpenDDS::DCPS::DataWriterImpl::prepare_to_delete(), publication_map_, OpenDDS::DCPS::DomainParticipantImpl::remove_adjust_liveliness_timers(), OpenDDS::DCPS::DataWriterImpl::remove_all_associations(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, DDS::RETCODE_PRECONDITION_NOT_MET, TheServiceParticipant, OpenDDS::DCPS::time_value_to_time(), OpenDDS::DCPS::DataWriterImpl::unregister_all(), OpenDDS::DCPS::DataWriterImpl::unregister_instances(), OpenDDS::DCPS::DataWriterImpl::wait_control_pending(), and OpenDDS::DCPS::DataWriterImpl::wait_pending().
Referenced by delete_contained_entities().
00153 { 00154 DataWriterImpl* dw_servant = dynamic_cast<DataWriterImpl*>(a_datawriter); 00155 if (dw_servant) { 00156 // marks entity as deleted and stops future associating 00157 dw_servant->prepare_to_delete(); 00158 } 00159 00160 if (!dw_servant) { 00161 ACE_ERROR((LM_ERROR, 00162 "(%P|%t) PublisherImpl::delete_datawriter - dynamic cast to DataWriterImpl failed\n" 00163 )); 00164 return DDS::RETCODE_ERROR; 00165 } 00166 00167 { 00168 DDS::Publisher_var dw_publisher(dw_servant->get_publisher()); 00169 00170 if (dw_publisher.in() != this) { 00171 RepoId id = dw_servant->get_publication_id(); 00172 GuidConverter converter(id); 00173 ACE_ERROR((LM_ERROR, 00174 ACE_TEXT("(%P|%t) PublisherImpl::delete_datawriter: ") 00175 ACE_TEXT("the data writer %C doesn't ") 00176 ACE_TEXT("belong to this subscriber \n"), 00177 OPENDDS_STRING(converter).c_str())); 00178 return DDS::RETCODE_PRECONDITION_NOT_MET; 00179 } 00180 } 00181 00182 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE 00183 // Trigger data to be persisted, i.e. made durable, if so 00184 // configured. This needs be called before unregister_instances 00185 // because unregister_instances may cause instance dispose. 00186 if (!dw_servant->persist_data() && DCPS_debug_level >= 2) { 00187 ACE_ERROR((LM_ERROR, 00188 ACE_TEXT("(%P|%t) ERROR: ") 00189 ACE_TEXT("PublisherImpl::delete_datawriter, ") 00190 ACE_TEXT("failed to make data durable.\n"))); 00191 } 00192 #endif 00193 00194 // Unregister all registered instances prior to deletion. 00195 DDS::Time_t source_timestamp = time_value_to_time(ACE_OS::gettimeofday()); 00196 dw_servant->unregister_instances(source_timestamp); 00197 00198 // Wait for any control messages to be transported during 00199 // unregistering of instances. 00200 dw_servant->wait_pending(); 00201 dw_servant->wait_control_pending(); 00202 00203 CORBA::String_var topic_name = dw_servant->get_topic_name(); 00204 RepoId publication_id = GUID_UNKNOWN; 00205 { 00206 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 00207 guard, 00208 this->pi_lock_, 00209 DDS::RETCODE_ERROR); 00210 00211 publication_id = dw_servant->get_publication_id(); 00212 00213 PublicationMap::iterator it = publication_map_.find(publication_id); 00214 00215 if (it == publication_map_.end()) { 00216 GuidConverter converter(publication_id); 00217 ACE_ERROR_RETURN((LM_ERROR, 00218 ACE_TEXT("(%P|%t) ERROR: ") 00219 ACE_TEXT("PublisherImpl::delete_datawriter, ") 00220 ACE_TEXT("datawriter %C not found.\n"), 00221 OPENDDS_STRING(converter).c_str()), DDS::RETCODE_ERROR); 00222 } 00223 00224 // We can not erase the datawriter from datawriter map by the topic name 00225 // because the map might have multiple datawriters with the same topic 00226 // name. 00227 // Find the iterator to the datawriter in the datawriter map and erase 00228 // by the iterator. 00229 DataWriterMap::iterator writ; 00230 DataWriterMap::iterator the_writ = datawriter_map_.end(); 00231 00232 for (writ = datawriter_map_.begin(); 00233 writ != datawriter_map_.end(); 00234 ++writ) { 00235 if (writ->second == it->second) { 00236 the_writ = writ; 00237 break; 00238 } 00239 } 00240 00241 if (the_writ != datawriter_map_.end()) { 00242 datawriter_map_.erase(the_writ); 00243 } 00244 00245 publication_map_.erase(it); 00246 00247 // Release pi_lock_ before making call to transport layer to avoid 00248 // some deadlock situations that threads acquire locks(PublisherImpl 00249 // pi_lock_, TransportClient reservation_lock and TransportImpl 00250 // lock_) in reverse order. 00251 ACE_GUARD_RETURN(reverse_lock_type, reverse_monitor, this->reverse_pi_lock_, 00252 DDS::RETCODE_ERROR); 00253 // Wait for pending samples to drain prior to removing associations 00254 // and unregistering the publication. 00255 dw_servant->wait_pending(); 00256 // Call remove association before unregistering the datawriter 00257 // with the transport, otherwise some callbacks resulted from 00258 // remove_association may lost. 00259 dw_servant->remove_all_associations(); 00260 dw_servant->cleanup(); 00261 } 00262 // not just unregister but remove any pending writes/sends. 00263 dw_servant->unregister_all(); 00264 Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_); 00265 if (!disco->remove_publication( 00266 this->domain_id_, 00267 this->participant_->get_id(), 00268 publication_id)) { 00269 ACE_ERROR_RETURN((LM_ERROR, 00270 ACE_TEXT("(%P|%t) ERROR: ") 00271 ACE_TEXT("PublisherImpl::delete_datawriter, ") 00272 ACE_TEXT("publication not removed from discovery.\n")), 00273 DDS::RETCODE_ERROR); 00274 } 00275 // Decrease ref count after the servant is removed from the maps. 00276 dw_servant->_remove_ref(); 00277 00278 participant_->remove_adjust_liveliness_timers(); 00279 00280 return DDS::RETCODE_OK; 00281 }
DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::enable | ( | ) | [virtual] |
Implements DDS::Entity.
Definition at line 736 of file PublisherImpl.cpp.
References monitor_, OpenDDS::DCPS::Monitor::report(), DDS::RETCODE_OK, DDS::RETCODE_PRECONDITION_NOT_MET, and OpenDDS::DCPS::EntityImpl::set_enabled().
Referenced by OpenDDS::DCPS::DomainParticipantImpl::create_publisher().
00737 { 00738 //According spec: 00739 // - Calling enable on an already enabled Entity returns OK and has no 00740 // effect. 00741 // - Calling enable on an Entity whose factory is not enabled will fail 00742 // and return PRECONDITION_NOT_MET. 00743 00744 if (this->is_enabled()) { 00745 return DDS::RETCODE_OK; 00746 } 00747 00748 if (this->participant_->is_enabled() == false) { 00749 return DDS::RETCODE_PRECONDITION_NOT_MET; 00750 } 00751 00752 if (this->monitor_) { 00753 this->monitor_->report(); 00754 } 00755 00756 this->set_enabled(); 00757 return DDS::RETCODE_OK; 00758 }
DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::end_coherent_changes | ( | ) | [virtual] |
Implements DDS::Publisher.
Definition at line 561 of file PublisherImpl.cpp.
References change_depth_, OpenDDS::DCPS::EntityImpl::enabled_, DDS::INSTANCE_PRESENTATION_QOS, DDS::PublisherQos::presentation, publication_map_, qos_, DDS::RETCODE_ERROR, DDS::RETCODE_NOT_ENABLED, DDS::RETCODE_OK, and DDS::RETCODE_PRECONDITION_NOT_MET.
00562 { 00563 if (enabled_ == false) { 00564 ACE_ERROR_RETURN((LM_ERROR, 00565 ACE_TEXT("(%P|%t) ERROR: PublisherImpl::end_coherent_changes:") 00566 ACE_TEXT(" Publisher is not enabled!\n")), 00567 DDS::RETCODE_NOT_ENABLED); 00568 } 00569 00570 if (!qos_.presentation.coherent_access) { 00571 ACE_ERROR_RETURN((LM_ERROR, 00572 ACE_TEXT("(%P|%t) ERROR: PublisherImpl::end_coherent_changes:") 00573 ACE_TEXT(" QoS policy does not support coherent access!\n")), 00574 DDS::RETCODE_ERROR); 00575 } 00576 00577 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 00578 guard, 00579 this->pi_lock_, 00580 DDS::RETCODE_ERROR); 00581 00582 if (this->change_depth_ == 0) { 00583 ACE_ERROR_RETURN((LM_ERROR, 00584 ACE_TEXT("(%P|%t) ERROR: PublisherImpl::end_coherent_changes:") 00585 ACE_TEXT(" No matching call to begin_coherent_changes!\n")), 00586 DDS::RETCODE_PRECONDITION_NOT_MET); 00587 } 00588 00589 --this->change_depth_; 00590 00591 if (qos_.presentation.access_scope == DDS::INSTANCE_PRESENTATION_QOS) { 00592 // INSTANCE access scope essentially behaves 00593 // as a no-op. (see: 7.1.3.6) 00594 return DDS::RETCODE_OK; 00595 } 00596 00597 // We should only notify publications on the first 00598 // and last change to the current change set: 00599 if (this->change_depth_ == 0) { 00600 GroupCoherentSamples group_samples; 00601 for (PublicationMap::iterator it = this->publication_map_.begin(); 00602 it != this->publication_map_.end(); ++it) { 00603 00604 if (it->second->coherent_samples_ == 0) { 00605 continue; 00606 } 00607 00608 std::pair<GroupCoherentSamples::iterator, bool> pair = 00609 group_samples.insert(GroupCoherentSamples::value_type( 00610 it->second->get_publication_id(), 00611 WriterCoherentSample(it->second->coherent_samples_, 00612 it->second->sequence_number_))); 00613 00614 if (pair.second == false) { 00615 ACE_ERROR_RETURN((LM_ERROR, 00616 ACE_TEXT("(%P|%t) ERROR: PublisherImpl::end_coherent_changes: ") 00617 ACE_TEXT("failed to insert to GroupCoherentSamples.\n")), 00618 DDS::RETCODE_ERROR); 00619 } 00620 } 00621 00622 for (PublicationMap::iterator it = this->publication_map_.begin(); 00623 it != this->publication_map_.end(); ++it) { 00624 if (it->second->coherent_samples_ == 0) { 00625 continue; 00626 } 00627 00628 it->second->end_coherent_changes(group_samples); 00629 } 00630 } 00631 00632 return DDS::RETCODE_OK; 00633 }
DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::get_default_datawriter_qos | ( | DDS::DataWriterQos & | qos | ) | [virtual] |
Definition at line 718 of file PublisherImpl.cpp.
References default_datawriter_qos_, and DDS::RETCODE_OK.
00719 { 00720 qos = default_datawriter_qos_; 00721 return DDS::RETCODE_OK; 00722 }
DDS::InstanceHandle_t OpenDDS::DCPS::PublisherImpl::get_instance_handle | ( | ) | [virtual] |
Implements OpenDDS::DCPS::EntityImpl.
Definition at line 69 of file PublisherImpl.cpp.
References handle_.
Referenced by OpenDDS::DCPS::PublisherMonitorImpl::report().
00070 { 00071 return handle_; 00072 }
DDS::PublisherListener_ptr OpenDDS::DCPS::PublisherImpl::get_listener | ( | ) | [virtual] |
Implements DDS::Publisher.
Definition at line 448 of file PublisherImpl.cpp.
References listener_.
00449 { 00450 return DDS::PublisherListener::_duplicate(listener_.in()); 00451 }
DDS::DomainParticipant_ptr OpenDDS::DCPS::PublisherImpl::get_participant | ( | ) | [virtual] |
Implements DDS::Publisher.
Definition at line 700 of file PublisherImpl.cpp.
References participant_.
Referenced by OpenDDS::DCPS::PublisherMonitorImpl::report().
00701 { 00702 return DDS::DomainParticipant::_duplicate(participant_); 00703 }
ACE_Recursive_Thread_Mutex& OpenDDS::DCPS::PublisherImpl::get_pi_lock | ( | ) | [inline] |
void OpenDDS::DCPS::PublisherImpl::get_publication_ids | ( | PublicationIdVec & | pubs | ) |
Populates a std::vector with the PublicationIds (GUIDs) of this Publisher's Data Writers
Definition at line 858 of file PublisherImpl.cpp.
References publication_map_.
Referenced by OpenDDS::DCPS::PublisherMonitorImpl::report().
00859 { 00860 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 00861 guard, 00862 this->pi_lock_, 00863 ); 00864 00865 pubs.reserve(publication_map_.size()); 00866 for (PublicationMap::iterator iter = publication_map_.begin(); 00867 iter != publication_map_.end(); 00868 ++iter) { 00869 pubs.push_back(iter->first); 00870 } 00871 }
DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::get_qos | ( | DDS::PublisherQos & | qos | ) | [virtual] |
Definition at line 431 of file PublisherImpl.cpp.
References qos_, and DDS::RETCODE_OK.
Referenced by OpenDDS::DCPS::DataWriterImpl::enable(), OpenDDS::DCPS::DataWriterImpl::retrieve_inline_qos_data(), and OpenDDS::DCPS::DataWriterImpl::set_qos().
00432 { 00433 qos = qos_; 00434 return DDS::RETCODE_OK; 00435 }
bool OpenDDS::DCPS::PublisherImpl::is_clean | ( | ) | const |
This method is not defined in the IDL and is defined for internal use. Check if there is any datawriter associated with this publisher.
Definition at line 761 of file PublisherImpl.cpp.
References datawriter_map_, and publication_map_.
Referenced by OpenDDS::DCPS::DomainParticipantImpl::delete_publisher(), and ~PublisherImpl().
00762 { 00763 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 00764 guard, 00765 this->pi_lock_, 00766 false); 00767 return datawriter_map_.empty() && publication_map_.empty(); 00768 }
bool OpenDDS::DCPS::PublisherImpl::is_suspended | ( | ) | const |
Definition at line 473 of file PublisherImpl.cpp.
References suspend_depth_count_.
00474 { 00475 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 00476 guard, 00477 this->pi_lock_, 00478 false); 00479 return suspend_depth_count_; 00480 }
DDS::PublisherListener_ptr OpenDDS::DCPS::PublisherImpl::listener_for | ( | ::DDS::StatusKind | kind | ) |
This is used to retrieve the listener for a certain status change. If this publisher has a registered listener and the status kind is in the listener mask then the listener is returned. Otherwise, the query for listener is propagated up to the factory/DomainParticipant.
Referenced by OpenDDS::DCPS::DataWriterImpl::listener_for().
ACE_Time_Value OpenDDS::DCPS::PublisherImpl::liveliness_check_interval | ( | DDS::LivelinessQosPolicyKind | kind | ) |
Definition at line 835 of file PublisherImpl.cpp.
References datawriter_map_.
00836 { 00837 ACE_Time_Value tv = ACE_Time_Value::max_time; 00838 for (DataWriterMap::iterator it(datawriter_map_.begin()); 00839 it != datawriter_map_.end(); ++it) { 00840 tv = std::min (tv, it->second->liveliness_check_interval(kind)); 00841 } 00842 return tv; 00843 }
DDS::DataWriter_ptr OpenDDS::DCPS::PublisherImpl::lookup_datawriter | ( | const char * | topic_name | ) | [virtual] |
Definition at line 284 of file PublisherImpl.cpp.
References datawriter_map_, and OpenDDS::DCPS::DCPS_debug_level.
00285 { 00286 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 00287 guard, 00288 this->pi_lock_, 00289 DDS::DataWriter::_nil()); 00290 00291 // If multiple entries whose key is "topic_name" then which one is 00292 // returned ? Spec does not limit which one should give. 00293 DataWriterMap::iterator it = datawriter_map_.find(topic_name); 00294 00295 if (it == datawriter_map_.end()) { 00296 if (DCPS_debug_level >= 2) { 00297 ACE_DEBUG((LM_DEBUG, 00298 ACE_TEXT("(%P|%t) ") 00299 ACE_TEXT("PublisherImpl::lookup_datawriter, ") 00300 ACE_TEXT("The datawriter(topic_name=%C) is not found\n"), 00301 topic_name)); 00302 } 00303 00304 return DDS::DataWriter::_nil(); 00305 00306 } else { 00307 return DDS::DataWriter::_duplicate(it->second); 00308 } 00309 }
typedef OpenDDS::DCPS::PublisherImpl::OPENDDS_MAP_CMP | ( | RepoId | , | |
DDS::DataWriterQos | , | |||
GUID_tKeyLessThan | ||||
) | [private] |
typedef OpenDDS::DCPS::PublisherImpl::OPENDDS_MAP_CMP | ( | PublicationId | , | |
DataWriterImpl * | , | |||
GUID_tKeyLessThan | ||||
) | [private] |
typedef OpenDDS::DCPS::PublisherImpl::OPENDDS_MULTIMAP | ( | OPENDDS_STRING | , | |
DataWriterImpl * | ||||
) | [private] |
typedef OpenDDS::DCPS::PublisherImpl::OPENDDS_VECTOR | ( | PublicationId | ) |
EntityImpl * OpenDDS::DCPS::PublisherImpl::parent | ( | ) | const [virtual] |
Reimplemented from OpenDDS::DCPS::EntityImpl.
Definition at line 874 of file PublisherImpl.cpp.
References participant_.
00875 { 00876 return this->participant_; 00877 }
bool OpenDDS::DCPS::PublisherImpl::participant_liveliness_activity_after | ( | const ACE_Time_Value & | tv | ) |
Definition at line 846 of file PublisherImpl.cpp.
References datawriter_map_.
00847 { 00848 for (DataWriterMap::iterator it(datawriter_map_.begin()); 00849 it != datawriter_map_.end(); ++it) { 00850 if (it->second->participant_liveliness_activity_after(tv)) { 00851 return true; 00852 } 00853 } 00854 return false; 00855 }
DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::resume_publications | ( | ) | [virtual] |
Implements DDS::Publisher.
Definition at line 483 of file PublisherImpl.cpp.
References OpenDDS::DCPS::EntityImpl::enabled_, publication_map_, DDS::RETCODE_ERROR, DDS::RETCODE_NOT_ENABLED, DDS::RETCODE_OK, DDS::RETCODE_PRECONDITION_NOT_MET, and suspend_depth_count_.
00484 { 00485 if (enabled_ == false) { 00486 ACE_ERROR_RETURN((LM_ERROR, 00487 ACE_TEXT("(%P|%t) ERROR: ") 00488 ACE_TEXT("PublisherImpl::resume_publications, ") 00489 ACE_TEXT(" Entity is not enabled. \n")), 00490 DDS::RETCODE_NOT_ENABLED); 00491 } 00492 00493 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 00494 guard, 00495 this->pi_lock_, 00496 DDS::RETCODE_ERROR); 00497 00498 --suspend_depth_count_; 00499 00500 if (suspend_depth_count_ < 0) { 00501 suspend_depth_count_ = 0; 00502 return DDS::RETCODE_PRECONDITION_NOT_MET; 00503 } 00504 00505 if (suspend_depth_count_ == 0) { 00506 00507 for (PublicationMap::iterator it = this->publication_map_.begin(); 00508 it != this->publication_map_.end(); ++it) { 00509 it->second->send_suspended_data(); 00510 } 00511 } 00512 00513 return DDS::RETCODE_OK; 00514 }
DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::set_default_datawriter_qos | ( | const DDS::DataWriterQos & | qos | ) | [virtual] |
Definition at line 706 of file PublisherImpl.cpp.
References OpenDDS::DCPS::Qos_Helper::consistent(), default_datawriter_qos_, DDS::RETCODE_INCONSISTENT_POLICY, DDS::RETCODE_OK, and OpenDDS::DCPS::Qos_Helper::valid().
00707 { 00708 if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) { 00709 default_datawriter_qos_ = qos; 00710 return DDS::RETCODE_OK; 00711 00712 } else { 00713 return DDS::RETCODE_INCONSISTENT_POLICY; 00714 } 00715 }
DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::set_listener | ( | DDS::PublisherListener_ptr | a_listener, | |
DDS::StatusMask | mask | |||
) | [virtual] |
Definition at line 438 of file PublisherImpl.cpp.
References listener_, listener_mask_, and DDS::RETCODE_OK.
00440 { 00441 listener_mask_ = mask; 00442 //note: OK to duplicate a nil object ref 00443 listener_ = DDS::PublisherListener::_duplicate(a_listener); 00444 return DDS::RETCODE_OK; 00445 }
DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::set_qos | ( | const DDS::PublisherQos & | qos | ) | [virtual] |
Definition at line 356 of file PublisherImpl.cpp.
References OpenDDS::DCPS::Qos_Helper::changeable(), OpenDDS::DCPS::Qos_Helper::consistent(), OpenDDS::DCPS::EntityImpl::enabled_, OpenDDS::DCPS::DomainParticipantImpl::get_domain_id(), OpenDDS::DCPS::DomainParticipantImpl::get_id(), OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK, OPENDDS_STRING, participant_, publication_map_, qos_, DDS::RETCODE_ERROR, DDS::RETCODE_IMMUTABLE_POLICY, DDS::RETCODE_INCONSISTENT_POLICY, DDS::RETCODE_OK, DDS::RETCODE_UNSUPPORTED, TheServiceParticipant, and OpenDDS::DCPS::Qos_Helper::valid().
00357 { 00358 00359 OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED); 00360 00361 if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) { 00362 if (qos_ == qos) 00363 return DDS::RETCODE_OK; 00364 00365 // for the not changeable qos, it can be changed before enable 00366 if (!Qos_Helper::changeable(qos_, qos) && enabled_ == true) { 00367 return DDS::RETCODE_IMMUTABLE_POLICY; 00368 00369 } else { 00370 qos_ = qos; 00371 00372 DwIdToQosMap idToQosMap; 00373 { 00374 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 00375 guard, 00376 this->pi_lock_, 00377 DDS::RETCODE_ERROR); 00378 00379 for (PublicationMap::iterator iter = publication_map_.begin(); 00380 iter != publication_map_.end(); 00381 ++iter) { 00382 DDS::DataWriterQos qos; 00383 iter->second->get_qos(qos); 00384 RepoId id = iter->second->get_publication_id(); 00385 std::pair<DwIdToQosMap::iterator, bool> pair = 00386 idToQosMap.insert(DwIdToQosMap::value_type(id, qos)); 00387 00388 if (pair.second == false) { 00389 GuidConverter converter(id); 00390 ACE_ERROR_RETURN((LM_ERROR, 00391 ACE_TEXT("(%P|%t) ") 00392 ACE_TEXT("PublisherImpl::set_qos: ") 00393 ACE_TEXT("insert id %d to DwIdToQosMap ") 00394 ACE_TEXT("failed.\n"), 00395 OPENDDS_STRING(converter).c_str()), DDS::RETCODE_ERROR); 00396 } 00397 } 00398 } 00399 00400 DwIdToQosMap::iterator iter = idToQosMap.begin(); 00401 00402 while (iter != idToQosMap.end()) { 00403 Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_); 00404 const bool status 00405 = disco->update_publication_qos( 00406 participant_->get_domain_id(), 00407 participant_->get_id(), 00408 iter->first, 00409 iter->second, 00410 this->qos_); 00411 00412 if (!status) { 00413 ACE_ERROR_RETURN((LM_ERROR, 00414 ACE_TEXT("(%P|%t) PublisherImpl::set_qos, ") 00415 ACE_TEXT("failed. \n")), 00416 DDS::RETCODE_ERROR); 00417 } 00418 00419 ++iter; 00420 } 00421 } 00422 00423 return DDS::RETCODE_OK; 00424 00425 } else { 00426 return DDS::RETCODE_INCONSISTENT_POLICY; 00427 } 00428 }
DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::suspend_publications | ( | ) | [virtual] |
Implements DDS::Publisher.
Definition at line 454 of file PublisherImpl.cpp.
References OpenDDS::DCPS::EntityImpl::enabled_, DDS::RETCODE_ERROR, DDS::RETCODE_NOT_ENABLED, DDS::RETCODE_OK, and suspend_depth_count_.
00455 { 00456 if (enabled_ == false) { 00457 ACE_ERROR_RETURN((LM_ERROR, 00458 ACE_TEXT("(%P|%t) ERROR: ") 00459 ACE_TEXT("PublisherImpl::suspend_publications, ") 00460 ACE_TEXT(" Entity is not enabled. \n")), 00461 DDS::RETCODE_NOT_ENABLED); 00462 } 00463 00464 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 00465 guard, 00466 this->pi_lock_, 00467 DDS::RETCODE_ERROR); 00468 ++suspend_depth_count_; 00469 return DDS::RETCODE_OK; 00470 }
bool OpenDDS::DCPS::PublisherImpl::validate_datawriter_qos | ( | const DDS::DataWriterQos & | qos, | |
const DDS::DataWriterQos & | default_qos, | |||
DDS::Topic_ptr | a_topic, | |||
DDS::DataWriterQos & | dw_qos | |||
) | [static] |
Definition at line 880 of file PublisherImpl.cpp.
References OpenDDS::DCPS::Qos_Helper::consistent(), OpenDDS::DCPS::Qos_Helper::copy_from_topic_qos(), DATAWRITER_QOS_DEFAULT, DATAWRITER_QOS_USE_TOPIC_QOS, OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK, OPENDDS_NO_DURABILITY_SERVICE_COMPATIBILITY_CHECK, OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK, OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK, OPENDDS_NO_OWNERSHIP_STRENGTH_COMPATIBILITY_CHECK, and OpenDDS::DCPS::Qos_Helper::valid().
Referenced by create_datawriter(), and OpenDDS::DCPS::DomainParticipantImpl::create_replayer().
00884 { 00885 if (CORBA::is_nil(a_topic)) { 00886 ACE_ERROR((LM_ERROR, 00887 ACE_TEXT("(%P|%t) ERROR: ") 00888 ACE_TEXT("PublisherImpl::create_datawriter, ") 00889 ACE_TEXT("topic is nil.\n"))); 00890 return DDS::DataWriter::_nil(); 00891 } 00892 00893 if (qos == DATAWRITER_QOS_DEFAULT) { 00894 dw_qos = default_qos; 00895 00896 } else if (qos == DATAWRITER_QOS_USE_TOPIC_QOS) { 00897 DDS::TopicQos topic_qos; 00898 a_topic->get_qos(topic_qos); 00899 dw_qos = default_qos; 00900 00901 Qos_Helper::copy_from_topic_qos(dw_qos, topic_qos); 00902 00903 } else { 00904 dw_qos = qos; 00905 } 00906 00907 OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(dw_qos, DDS::DataWriter::_nil()); 00908 OPENDDS_NO_OWNERSHIP_STRENGTH_COMPATIBILITY_CHECK(dw_qos, DDS::DataWriter::_nil()); 00909 OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(dw_qos, DDS::DataWriter::_nil()); 00910 OPENDDS_NO_DURABILITY_SERVICE_COMPATIBILITY_CHECK(dw_qos, DDS::DataWriter::_nil()); 00911 OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(dw_qos, DDS::DataWriter::_nil()); 00912 00913 if (!Qos_Helper::valid(dw_qos)) { 00914 ACE_ERROR((LM_ERROR, 00915 ACE_TEXT("(%P|%t) ERROR: ") 00916 ACE_TEXT("PublisherImpl::create_datawriter, ") 00917 ACE_TEXT("invalid qos.\n"))); 00918 return DDS::DataWriter::_nil(); 00919 } 00920 00921 if (!Qos_Helper::consistent(dw_qos)) { 00922 ACE_ERROR((LM_ERROR, 00923 ACE_TEXT("(%P|%t) ERROR: ") 00924 ACE_TEXT("PublisherImpl::create_datawriter, ") 00925 ACE_TEXT("inconsistent qos.\n"))); 00926 return DDS::DataWriter::_nil(); 00927 } 00928 return true; 00929 }
DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::wait_for_acknowledgments | ( | const DDS::Duration_t & | max_wait | ) | [virtual] |
Definition at line 638 of file PublisherImpl.cpp.
References datawriter_map_, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::EntityImpl::enabled_, OpenDDS::DCPS::OPENDDS_MAP(), DDS::RELIABLE_RELIABILITY_QOS, DDS::RETCODE_ERROR, DDS::RETCODE_NOT_ENABLED, and DDS::RETCODE_OK.
00640 { 00641 if (enabled_ == false) { 00642 ACE_ERROR_RETURN((LM_ERROR, 00643 ACE_TEXT("(%P|%t) ERROR: PublisherImpl::wait_for_acknowledgments, ") 00644 ACE_TEXT("Entity is not enabled.\n")), 00645 DDS::RETCODE_NOT_ENABLED); 00646 } 00647 00648 typedef OPENDDS_MAP(DataWriterImpl*, DataWriterImpl::AckToken) DataWriterAckMap; 00649 DataWriterAckMap ack_writers; 00650 { 00651 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 00652 guard, 00653 this->pi_lock_, 00654 DDS::RETCODE_ERROR); 00655 00656 // Collect writers to request acks 00657 for (DataWriterMap::iterator it(this->datawriter_map_.begin()); 00658 it != this->datawriter_map_.end(); ++it) { 00659 DataWriterImpl* writer = it->second; 00660 if (writer->qos_.reliability.kind != DDS::RELIABLE_RELIABILITY_QOS) 00661 continue; 00662 if (writer->should_ack()) { 00663 DataWriterImpl::AckToken token = writer->create_ack_token(max_wait); 00664 00665 std::pair<DataWriterAckMap::iterator, bool> pair = 00666 ack_writers.insert(DataWriterAckMap::value_type(writer, token)); 00667 00668 if (!pair.second) { 00669 ACE_ERROR_RETURN((LM_ERROR, 00670 ACE_TEXT("(%P|%t) ERROR: PublisherImpl::wait_for_acknowledgments, ") 00671 ACE_TEXT("Unable to insert AckToken into DataWriterAckMap!\n")), 00672 DDS::RETCODE_ERROR); 00673 } 00674 } 00675 } 00676 } 00677 00678 if (ack_writers.empty()) { 00679 if (DCPS_debug_level > 0) { 00680 ACE_DEBUG((LM_DEBUG, 00681 ACE_TEXT("(%P|%t) PublisherImpl::wait_for_acknowledgments() - ") 00682 ACE_TEXT("not blocking due to no writers requiring acks.\n"))); 00683 } 00684 00685 return DDS::RETCODE_OK; 00686 } 00687 00688 // Wait for ack responses from all associated readers 00689 for (DataWriterAckMap::iterator it(ack_writers.begin()); 00690 it != ack_writers.end(); ++it) { 00691 DataWriterImpl::AckToken token = it->second; 00692 00693 it->first->wait_for_specific_ack(token); 00694 } 00695 00696 return DDS::RETCODE_OK; 00697 }
DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::writer_enabled | ( | const char * | topic_name, | |
DataWriterImpl * | impl | |||
) |
This method is called when the datawriter created by this publisher was enabled.
Definition at line 771 of file PublisherImpl.cpp.
References datawriter_map_, OpenDDS::DCPS::DataWriterImpl::get_publication_id(), OPENDDS_STRING, publication_map_, DDS::RETCODE_ERROR, and DDS::RETCODE_OK.
Referenced by OpenDDS::DCPS::DataWriterImpl::enable().
00773 { 00774 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 00775 guard, 00776 this->pi_lock_, 00777 DDS::RETCODE_ERROR); 00778 00779 datawriter_map_.insert(DataWriterMap::value_type(topic_name, writer)); 00780 00781 const RepoId publication_id = writer->get_publication_id(); 00782 00783 std::pair<PublicationMap::iterator, bool> pair = 00784 publication_map_.insert(PublicationMap::value_type(publication_id, writer)); 00785 00786 if (pair.second == false) { 00787 GuidConverter converter(publication_id); 00788 ACE_ERROR_RETURN((LM_ERROR, 00789 ACE_TEXT("(%P|%t) ERROR: ") 00790 ACE_TEXT("PublisherImpl::writer_enabled: ") 00791 ACE_TEXT("insert publication %C failed.\n"), 00792 OPENDDS_STRING(converter).c_str()), DDS::RETCODE_ERROR); 00793 } 00794 00795 // Increase ref count when the servant is added to the 00796 // datawriter/publication map. 00797 writer->_add_ref(); 00798 00799 return DDS::RETCODE_OK; 00800 }
friend class DataWriterImpl [friend] |
Definition at line 42 of file PublisherImpl.h.
ACE_Time_Value OpenDDS::DCPS::PublisherImpl::aggregation_period_start_ [private] |
Start of current aggregation period. - NOT USED IN FIRST IMPL.
Definition at line 191 of file PublisherImpl.h.
std::size_t OpenDDS::DCPS::PublisherImpl::change_depth_ [private] |
The number of times begin_coherent_changes as been called.
Definition at line 179 of file PublisherImpl.h.
Referenced by begin_coherent_changes(), and end_coherent_changes().
DataWriterMap OpenDDS::DCPS::PublisherImpl::datawriter_map_ [private] |
This map is used to support datawriter lookup by topic name.
Definition at line 173 of file PublisherImpl.h.
Referenced by assert_liveliness_by_participant(), contains_writer(), delete_contained_entities(), delete_datawriter(), is_clean(), liveliness_check_interval(), lookup_datawriter(), participant_liveliness_activity_after(), wait_for_acknowledgments(), and writer_enabled().
Default datawriter Qos policy list.
Definition at line 165 of file PublisherImpl.h.
Referenced by create_datawriter(), get_default_datawriter_qos(), and set_default_datawriter_qos().
DDS::PublisherListener_var OpenDDS::DCPS::PublisherImpl::listener_ [private] |
Used to notify the entity for relevant events.
Definition at line 171 of file PublisherImpl.h.
Referenced by get_listener(), and set_listener().
The StatusKind bit mask indicates which status condition change can be notified by the listener of this entity.
Definition at line 169 of file PublisherImpl.h.
Referenced by set_listener().
Monitor* OpenDDS::DCPS::PublisherImpl::monitor_ [private] |
Monitor object for this entity.
Definition at line 200 of file PublisherImpl.h.
Referenced by enable(), and PublisherImpl().
The DomainParticipant servant that owns this Publisher.
Definition at line 184 of file PublisherImpl.h.
Referenced by create_datawriter(), delete_datawriter(), get_participant(), parent(), and set_qos().
lock_type OpenDDS::DCPS::PublisherImpl::pi_lock_ [mutable, private] |
The recursive lock to protect datawriter map and suspend count.
Definition at line 196 of file PublisherImpl.h.
PublicationMap OpenDDS::DCPS::PublisherImpl::publication_map_ [private] |
This map is used to support datawriter lookup by datawriter repository id.
Definition at line 176 of file PublisherImpl.h.
Referenced by begin_coherent_changes(), delete_datawriter(), end_coherent_changes(), get_publication_ids(), is_clean(), resume_publications(), set_qos(), and writer_enabled().
NOTE: The publisher_id_ is not generated by repository, it's uniqueue in DomainParticipant scope.
Definition at line 204 of file PublisherImpl.h.
Referenced by OpenDDS::DCPS::DataWriterImpl::create_control_message(), OpenDDS::DCPS::DataWriterImpl::create_sample_data_message(), and OpenDDS::DCPS::DataWriterImpl::end_coherent_changes().
Publisher QoS policy list.
Definition at line 163 of file PublisherImpl.h.
Referenced by begin_coherent_changes(), create_datawriter(), OpenDDS::DCPS::DataWriterImpl::create_sample_data_message(), end_coherent_changes(), OpenDDS::DCPS::DataWriterImpl::end_coherent_changes(), get_qos(), and set_qos().
Definition at line 197 of file PublisherImpl.h.
Unique sequence number used when the scope_access = GROUP.
Definition at line 189 of file PublisherImpl.h.
CORBA::Short OpenDDS::DCPS::PublisherImpl::suspend_depth_count_ [private] |
The suspend depth count.
Definition at line 186 of file PublisherImpl.h.
Referenced by is_suspended(), resume_publications(), and suspend_publications().