Implements the OpenDDS::DCPS::Publisher interfaces. More...
#include <PublisherImpl.h>
Implements the OpenDDS::DCPS::Publisher interfaces.
This class acts as a factory and container of the datawriter.
See the DDS specification, OMG formal/04-12-02, for a description of the interface this class is implementing.
Definition at line 38 of file PublisherImpl.h.
typedef ACE_Recursive_Thread_Mutex OpenDDS::DCPS::PublisherImpl::lock_type [private] |
Definition at line 198 of file PublisherImpl.h.
typedef ACE_Reverse_Lock<lock_type> OpenDDS::DCPS::PublisherImpl::reverse_lock_type [private] |
Definition at line 199 of file PublisherImpl.h.
OpenDDS::DCPS::PublisherImpl::PublisherImpl | ( | DDS::InstanceHandle_t | handle, | |
RepoId | id, | |||
const DDS::PublisherQos & | qos, | |||
DDS::PublisherListener_ptr | a_listener, | |||
const DDS::StatusMask & | mask, | |||
DomainParticipantImpl * | participant | |||
) |
Definition at line 30 of file PublisherImpl.cpp.
References _duplicate(), monitor_, and TheServiceParticipant.
00036 : handle_(handle), 00037 qos_(qos), 00038 default_datawriter_qos_(TheServiceParticipant->initial_DataWriterQos()), 00039 listener_mask_(mask), 00040 listener_(DDS::PublisherListener::_duplicate(a_listener)), 00041 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 00042 change_depth_(0), 00043 #endif 00044 domain_id_(participant->get_domain_id()), 00045 participant_(*participant), 00046 suspend_depth_count_(0), 00047 sequence_number_(), 00048 aggregation_period_start_(ACE_Time_Value::zero), 00049 reverse_pi_lock_(pi_lock_), 00050 monitor_(0), 00051 publisher_id_(id) 00052 { 00053 monitor_ = TheServiceParticipant->monitor_factory_->create_publisher_monitor(this); 00054 }
OpenDDS::DCPS::PublisherImpl::~PublisherImpl | ( | ) | [virtual] |
Definition at line 56 of file PublisherImpl.cpp.
References ACE_TEXT(), datawriter_map_, is_clean(), LM_ERROR, and publication_map_.
00057 { 00058 //The datawriters should be deleted already before calling delete 00059 //publisher. 00060 if (!is_clean()) { 00061 ACE_ERROR((LM_ERROR, 00062 ACE_TEXT("(%P|%t) ERROR: ") 00063 ACE_TEXT("PublisherImpl::~PublisherImpl, ") 00064 ACE_TEXT("%B datawriters and %B publications still exist.\n"), 00065 datawriter_map_.size(), publication_map_.size())); 00066 } 00067 }
DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::assert_liveliness_by_participant | ( | ) |
Definition at line 861 of file PublisherImpl.cpp.
References datawriter_map_, and DDS::RETCODE_OK.
00862 { 00863 DDS::ReturnCode_t ret = DDS::RETCODE_OK; 00864 00865 for (DataWriterMap::iterator it(datawriter_map_.begin()); 00866 it != datawriter_map_.end(); ++it) { 00867 DDS::ReturnCode_t dw_ret = it->second->assert_liveliness_by_participant(); 00868 00869 if (dw_ret != DDS::RETCODE_OK) { 00870 ret = dw_ret; 00871 } 00872 } 00873 00874 return ret; 00875 }
DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::begin_coherent_changes | ( | ) | [virtual] |
Implements DDS::Publisher.
Definition at line 544 of file PublisherImpl.cpp.
References ACE_TEXT(), change_depth_, OpenDDS::DCPS::EntityImpl::enabled_, DDS::INSTANCE_PRESENTATION_QOS, LM_ERROR, pi_lock_, DDS::PublisherQos::presentation, publication_map_, qos_, DDS::RETCODE_ERROR, DDS::RETCODE_NOT_ENABLED, and DDS::RETCODE_OK.
00545 { 00546 if (enabled_ == false) { 00547 ACE_ERROR_RETURN((LM_ERROR, 00548 ACE_TEXT("(%P|%t) ERROR: PublisherImpl::begin_coherent_changes:") 00549 ACE_TEXT(" Publisher is not enabled!\n")), 00550 DDS::RETCODE_NOT_ENABLED); 00551 } 00552 00553 if (!qos_.presentation.coherent_access) { 00554 ACE_ERROR_RETURN((LM_ERROR, 00555 ACE_TEXT("(%P|%t) ERROR: PublisherImpl::begin_coherent_changes:") 00556 ACE_TEXT(" QoS policy does not support coherent access!\n")), 00557 DDS::RETCODE_ERROR); 00558 } 00559 00560 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 00561 guard, 00562 this->pi_lock_, 00563 DDS::RETCODE_ERROR); 00564 00565 ++this->change_depth_; 00566 00567 if (qos_.presentation.access_scope == DDS::INSTANCE_PRESENTATION_QOS) { 00568 // INSTANCE access scope essentially behaves 00569 // as a no-op. (see: 7.1.3.6) 00570 return DDS::RETCODE_OK; 00571 } 00572 00573 // We should only notify publications on the first 00574 // and last change to the current change set: 00575 if (this->change_depth_ == 1) { 00576 for (PublicationMap::iterator it = this->publication_map_.begin(); 00577 it != this->publication_map_.end(); ++it) { 00578 it->second->begin_coherent_changes(); 00579 } 00580 } 00581 00582 return DDS::RETCODE_OK; 00583 }
bool OpenDDS::DCPS::PublisherImpl::contains_writer | ( | DDS::InstanceHandle_t | a_handle | ) |
Definition at line 76 of file PublisherImpl.cpp.
References datawriter_map_, pi_lock_, and DDS::RETCODE_ERROR.
00077 { 00078 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 00079 guard, 00080 this->pi_lock_, 00081 DDS::RETCODE_ERROR); 00082 00083 for (DataWriterMap::iterator it(datawriter_map_.begin()); 00084 it != datawriter_map_.end(); ++it) { 00085 if (a_handle == it->second->get_instance_handle()) { 00086 return true; 00087 } 00088 } 00089 00090 return false; 00091 }
DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::copy_from_topic_qos | ( | DDS::DataWriterQos & | a_datawriter_qos, | |
const DDS::TopicQos & | a_topic_qos | |||
) | [virtual] |
Definition at line 750 of file PublisherImpl.cpp.
References OpenDDS::DCPS::Qos_Helper::copy_from_topic_qos(), DDS::RETCODE_INCONSISTENT_POLICY, and DDS::RETCODE_OK.
Referenced by validate_datawriter_qos().
00752 { 00753 if (Qos_Helper::copy_from_topic_qos(a_datawriter_qos, a_topic_qos)) { 00754 return DDS::RETCODE_OK; 00755 } else { 00756 return DDS::RETCODE_INCONSISTENT_POLICY; 00757 } 00758 }
DDS::DataWriter_ptr OpenDDS::DCPS::PublisherImpl::create_datawriter | ( | DDS::Topic_ptr | a_topic, | |
const DDS::DataWriterQos & | qos, | |||
DDS::DataWriterListener_ptr | a_listener, | |||
DDS::StatusMask | mask | |||
) | [virtual] |
Definition at line 94 of file PublisherImpl.cpp.
References CORBA::LocalObject::_duplicate(), CORBA::LocalObject::_nil(), ACE_TEXT(), default_datawriter_qos_, OpenDDS::DCPS::DataWriterImpl::enable(), OpenDDS::DCPS::EntityImpl::enabled_, DDS::PublisherQos::entity_factory, OpenDDS::DCPS::TopicDescriptionImpl::get_name(), OpenDDS::DCPS::TopicDescriptionImpl::get_type_support(), OpenDDS::DCPS::DataWriterImpl::init(), LM_ERROR, LM_WARNING, participant_, pi_lock_, qos_, OpenDDS::DCPS::rchandle_from(), DDS::RETCODE_OK, validate_datawriter_qos(), and writers_not_enabled_.
00099 { 00100 DDS::DataWriterQos dw_qos; 00101 00102 if (!validate_datawriter_qos(qos, default_datawriter_qos_, a_topic, dw_qos)) { 00103 return DDS::DataWriter::_nil(); 00104 } 00105 00106 TopicImpl* topic_servant = dynamic_cast<TopicImpl*>(a_topic); 00107 00108 if (!topic_servant) { 00109 CORBA::String_var name = a_topic->get_name(); 00110 ACE_ERROR((LM_ERROR, 00111 ACE_TEXT("(%P|%t) ERROR: ") 00112 ACE_TEXT("PublisherImpl::create_datawriter, ") 00113 ACE_TEXT("topic_servant(topic_name=%C) is nil.\n"), 00114 name.in())); 00115 return 0; 00116 } 00117 00118 OpenDDS::DCPS::TypeSupport_ptr typesupport = 00119 topic_servant->get_type_support(); 00120 00121 if (typesupport == 0) { 00122 CORBA::String_var name = topic_servant->get_name(); 00123 ACE_ERROR((LM_ERROR, 00124 ACE_TEXT("(%P|%t) ERROR: ") 00125 ACE_TEXT("PublisherImpl::create_datawriter, ") 00126 ACE_TEXT("typesupport(topic_name=%C) is nil.\n"), 00127 name.in())); 00128 return DDS::DataWriter::_nil(); 00129 } 00130 00131 DDS::DataWriter_var dw_obj = typesupport->create_datawriter(); 00132 00133 DataWriterImpl* dw_servant = 00134 dynamic_cast <DataWriterImpl*>(dw_obj.in()); 00135 00136 if (dw_servant == 0) { 00137 ACE_ERROR((LM_ERROR, 00138 ACE_TEXT("(%P|%t) ERROR: ") 00139 ACE_TEXT("PublisherImpl::create_datawriter, ") 00140 ACE_TEXT("servant is nil.\n"))); 00141 return DDS::DataWriter::_nil(); 00142 } 00143 00144 dw_servant->init( 00145 topic_servant, 00146 dw_qos, 00147 a_listener, 00148 mask, 00149 participant_, 00150 this); 00151 00152 if ((this->enabled_ == true) && (qos_.entity_factory.autoenable_created_entities)) { 00153 const DDS::ReturnCode_t ret = dw_servant->enable(); 00154 00155 if (ret != DDS::RETCODE_OK) { 00156 ACE_ERROR((LM_WARNING, 00157 ACE_TEXT("(%P|%t) WARNING: ") 00158 ACE_TEXT("PublisherImpl::create_datawriter, ") 00159 ACE_TEXT("enable failed.\n"))); 00160 return DDS::DataWriter::_nil(); 00161 } 00162 } else { 00163 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, pi_lock_, 0); 00164 writers_not_enabled_.insert(rchandle_from(dw_servant)); 00165 } 00166 00167 return DDS::DataWriter::_duplicate(dw_obj.in()); 00168 }
DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::delete_contained_entities | ( | ) | [virtual] |
Implements DDS::Publisher.
Definition at line 334 of file PublisherImpl.cpp.
References ACE_TEXT(), datawriter_map_, delete_datawriter(), OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::RcHandle< T >::in(), LM_ERROR, OPENDDS_STRING, pi_lock_, DDS::RETCODE_ERROR, DDS::RETCODE_OK, and OpenDDS::DCPS::EntityImpl::set_deleted().
00335 { 00336 // mark that the entity is being deleted 00337 set_deleted(true); 00338 00339 while (true) { 00340 PublicationId pub_id = GUID_UNKNOWN; 00341 DataWriterImpl_rch a_datawriter; 00342 00343 { 00344 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 00345 guard, 00346 this->pi_lock_, 00347 DDS::RETCODE_ERROR); 00348 00349 if (datawriter_map_.empty()) { 00350 break; 00351 } else { 00352 a_datawriter = datawriter_map_.begin()->second; 00353 pub_id = a_datawriter->get_publication_id(); 00354 } 00355 } 00356 00357 const DDS::ReturnCode_t ret = delete_datawriter(a_datawriter.in()); 00358 00359 if (ret != DDS::RETCODE_OK) { 00360 GuidConverter converter(pub_id); 00361 ACE_ERROR_RETURN((LM_ERROR, 00362 ACE_TEXT("(%P|%t) ERROR: ") 00363 ACE_TEXT("PublisherImpl::") 00364 ACE_TEXT("delete_contained_entities: ") 00365 ACE_TEXT("failed to delete ") 00366 ACE_TEXT("datawriter %C.\n"), 00367 OPENDDS_STRING(converter).c_str()),ret); 00368 } 00369 } 00370 00371 // the publisher can now start creating new publications 00372 set_deleted(false); 00373 00374 return DDS::RETCODE_OK; 00375 }
DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::delete_datawriter | ( | DDS::DataWriter_ptr | a_datawriter | ) | [virtual] |
Definition at line 171 of file PublisherImpl.cpp.
References ACE_TEXT(), OpenDDS::DCPS::DataWriterImpl::cleanup(), datawriter_map_, OpenDDS::DCPS::DCPS_debug_level, domain_id_, OpenDDS::DCPS::DataWriterImpl::get_publication_id(), OpenDDS::DCPS::DataWriterImpl::get_publisher(), ACE_OS::gettimeofday(), OpenDDS::DCPS::GUID_UNKNOWN, LM_ERROR, OpenDDS::DCPS::WeakRcHandle< T >::lock(), monitor_, OPENDDS_STRING, participant_, OpenDDS::DCPS::DataWriterImpl::persist_data(), pi_lock_, OpenDDS::DCPS::DataWriterImpl::prepare_to_delete(), publication_map_, OpenDDS::DCPS::DataWriterImpl::remove_all_associations(), OpenDDS::DCPS::Monitor::report(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, DDS::RETCODE_PRECONDITION_NOT_MET, reverse_pi_lock_, TheServiceParticipant, OpenDDS::DCPS::time_value_to_time(), OpenDDS::DCPS::DataWriterImpl::unregister_all(), OpenDDS::DCPS::DataWriterImpl::unregister_instances(), OpenDDS::DCPS::DataWriterImpl::wait_control_pending(), and OpenDDS::DCPS::DataWriterImpl::wait_pending().
Referenced by delete_contained_entities().
00172 { 00173 DataWriterImpl* dw_servant = dynamic_cast<DataWriterImpl*>(a_datawriter); 00174 if (!dw_servant) { 00175 ACE_ERROR((LM_ERROR, 00176 "(%P|%t) PublisherImpl::delete_datawriter - dynamic cast to DataWriterImpl failed\n" 00177 )); 00178 return DDS::RETCODE_ERROR; 00179 } 00180 00181 // marks entity as deleted and stops future associating 00182 dw_servant->prepare_to_delete(); 00183 00184 { 00185 DDS::Publisher_var dw_publisher(dw_servant->get_publisher()); 00186 00187 if (dw_publisher.in() != this) { 00188 RepoId id = dw_servant->get_publication_id(); 00189 GuidConverter converter(id); 00190 ACE_ERROR((LM_ERROR, 00191 ACE_TEXT("(%P|%t) PublisherImpl::delete_datawriter: ") 00192 ACE_TEXT("the data writer %C doesn't ") 00193 ACE_TEXT("belong to this subscriber \n"), 00194 OPENDDS_STRING(converter).c_str())); 00195 return DDS::RETCODE_PRECONDITION_NOT_MET; 00196 } 00197 } 00198 00199 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE 00200 // Trigger data to be persisted, i.e. made durable, if so 00201 // configured. This needs be called before unregister_instances 00202 // because unregister_instances may cause instance dispose. 00203 if (!dw_servant->persist_data() && DCPS_debug_level >= 2) { 00204 ACE_ERROR((LM_ERROR, 00205 ACE_TEXT("(%P|%t) ERROR: ") 00206 ACE_TEXT("PublisherImpl::delete_datawriter, ") 00207 ACE_TEXT("failed to make data durable.\n"))); 00208 } 00209 #endif 00210 00211 // Unregister all registered instances prior to deletion. 00212 DDS::Time_t source_timestamp = time_value_to_time(ACE_OS::gettimeofday()); 00213 dw_servant->unregister_instances(source_timestamp); 00214 00215 // Wait for any control messages to be transported during 00216 // unregistering of instances. 00217 dw_servant->wait_pending(); 00218 dw_servant->wait_control_pending(); 00219 00220 RepoId publication_id = GUID_UNKNOWN; 00221 { 00222 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 00223 guard, 00224 this->pi_lock_, 00225 DDS::RETCODE_ERROR); 00226 00227 publication_id = dw_servant->get_publication_id(); 00228 00229 PublicationMap::iterator it = publication_map_.find(publication_id); 00230 00231 if (it == publication_map_.end()) { 00232 GuidConverter converter(publication_id); 00233 ACE_ERROR_RETURN((LM_ERROR, 00234 ACE_TEXT("(%P|%t) ERROR: ") 00235 ACE_TEXT("PublisherImpl::delete_datawriter, ") 00236 ACE_TEXT("datawriter %C not found.\n"), 00237 OPENDDS_STRING(converter).c_str()), DDS::RETCODE_ERROR); 00238 } 00239 00240 // We can not erase the datawriter from datawriter map by the topic name 00241 // because the map might have multiple datawriters with the same topic 00242 // name. 00243 // Find the iterator to the datawriter in the datawriter map and erase 00244 // by the iterator. 00245 DataWriterMap::iterator writ; 00246 DataWriterMap::iterator the_writ = datawriter_map_.end(); 00247 00248 for (writ = datawriter_map_.begin(); 00249 writ != datawriter_map_.end(); 00250 ++writ) { 00251 if (writ->second == it->second) { 00252 the_writ = writ; 00253 break; 00254 } 00255 } 00256 00257 if (the_writ != datawriter_map_.end()) { 00258 datawriter_map_.erase(the_writ); 00259 } 00260 00261 publication_map_.erase(it); 00262 00263 // Release pi_lock_ before making call to transport layer to avoid 00264 // some deadlock situations that threads acquire locks(PublisherImpl 00265 // pi_lock_, TransportClient reservation_lock and TransportImpl 00266 // lock_) in reverse order. 00267 ACE_GUARD_RETURN(reverse_lock_type, reverse_monitor, this->reverse_pi_lock_, 00268 DDS::RETCODE_ERROR); 00269 // Wait for pending samples to drain prior to removing associations 00270 // and unregistering the publication. 00271 dw_servant->wait_pending(); 00272 // Call remove association before unregistering the datawriter 00273 // with the transport, otherwise some callbacks resulted from 00274 // remove_association may lost. 00275 dw_servant->remove_all_associations(); 00276 dw_servant->cleanup(); 00277 } 00278 00279 if (this->monitor_) { 00280 this->monitor_->report(); 00281 } 00282 00283 // not just unregister but remove any pending writes/sends. 00284 dw_servant->unregister_all(); 00285 00286 RcHandle<DomainParticipantImpl> participant = this->participant_.lock(); 00287 00288 Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_); 00289 if (!disco->remove_publication( 00290 this->domain_id_, 00291 participant->get_id(), 00292 publication_id)) { 00293 ACE_ERROR_RETURN((LM_ERROR, 00294 ACE_TEXT("(%P|%t) ERROR: ") 00295 ACE_TEXT("PublisherImpl::delete_datawriter, ") 00296 ACE_TEXT("publication not removed from discovery.\n")), 00297 DDS::RETCODE_ERROR); 00298 } 00299 00300 participant->remove_adjust_liveliness_timers(); 00301 00302 return DDS::RETCODE_OK; 00303 }
DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::enable | ( | ) | [virtual] |
Implements DDS::Entity.
Definition at line 761 of file PublisherImpl.cpp.
References DDS::PublisherQos::entity_factory, OpenDDS::DCPS::EntityImpl::is_enabled(), OpenDDS::DCPS::WeakRcHandle< T >::lock(), monitor_, participant_, pi_lock_, qos_, OpenDDS::DCPS::Monitor::report(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, DDS::RETCODE_PRECONDITION_NOT_MET, OpenDDS::DCPS::EntityImpl::set_enabled(), and writers_not_enabled_.
Referenced by OpenDDS::DCPS::DomainParticipantImpl::create_publisher().
00762 { 00763 //According spec: 00764 // - Calling enable on an already enabled Entity returns OK and has no 00765 // effect. 00766 // - Calling enable on an Entity whose factory is not enabled will fail 00767 // and return PRECONDITION_NOT_MET. 00768 00769 if (this->is_enabled()) { 00770 return DDS::RETCODE_OK; 00771 } 00772 00773 RcHandle<DomainParticipantImpl> participant = this->participant_.lock(); 00774 if (!participant || participant->is_enabled() == false) { 00775 return DDS::RETCODE_PRECONDITION_NOT_MET; 00776 } 00777 00778 if (this->monitor_) { 00779 this->monitor_->report(); 00780 } 00781 00782 this->set_enabled(); 00783 00784 if (qos_.entity_factory.autoenable_created_entities) { 00785 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, pi_lock_, DDS::RETCODE_ERROR); 00786 DataWriterSet writers; 00787 writers_not_enabled_.swap(writers); 00788 for (DataWriterSet::iterator it = writers.begin(); it != writers.end(); ++it) { 00789 (*it)->enable(); 00790 } 00791 } 00792 00793 return DDS::RETCODE_OK; 00794 }
DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::end_coherent_changes | ( | ) | [virtual] |
Implements DDS::Publisher.
Definition at line 586 of file PublisherImpl.cpp.
References ACE_TEXT(), change_depth_, OpenDDS::DCPS::EntityImpl::enabled_, DDS::INSTANCE_PRESENTATION_QOS, LM_ERROR, pi_lock_, DDS::PublisherQos::presentation, publication_map_, qos_, DDS::RETCODE_ERROR, DDS::RETCODE_NOT_ENABLED, DDS::RETCODE_OK, and DDS::RETCODE_PRECONDITION_NOT_MET.
00587 { 00588 if (enabled_ == false) { 00589 ACE_ERROR_RETURN((LM_ERROR, 00590 ACE_TEXT("(%P|%t) ERROR: PublisherImpl::end_coherent_changes:") 00591 ACE_TEXT(" Publisher is not enabled!\n")), 00592 DDS::RETCODE_NOT_ENABLED); 00593 } 00594 00595 if (!qos_.presentation.coherent_access) { 00596 ACE_ERROR_RETURN((LM_ERROR, 00597 ACE_TEXT("(%P|%t) ERROR: PublisherImpl::end_coherent_changes:") 00598 ACE_TEXT(" QoS policy does not support coherent access!\n")), 00599 DDS::RETCODE_ERROR); 00600 } 00601 00602 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 00603 guard, 00604 this->pi_lock_, 00605 DDS::RETCODE_ERROR); 00606 00607 if (this->change_depth_ == 0) { 00608 ACE_ERROR_RETURN((LM_ERROR, 00609 ACE_TEXT("(%P|%t) ERROR: PublisherImpl::end_coherent_changes:") 00610 ACE_TEXT(" No matching call to begin_coherent_changes!\n")), 00611 DDS::RETCODE_PRECONDITION_NOT_MET); 00612 } 00613 00614 --this->change_depth_; 00615 00616 if (qos_.presentation.access_scope == DDS::INSTANCE_PRESENTATION_QOS) { 00617 // INSTANCE access scope essentially behaves 00618 // as a no-op. (see: 7.1.3.6) 00619 return DDS::RETCODE_OK; 00620 } 00621 00622 // We should only notify publications on the first 00623 // and last change to the current change set: 00624 if (this->change_depth_ == 0) { 00625 GroupCoherentSamples group_samples; 00626 for (PublicationMap::iterator it = this->publication_map_.begin(); 00627 it != this->publication_map_.end(); ++it) { 00628 00629 if (it->second->coherent_samples_ == 0) { 00630 continue; 00631 } 00632 00633 std::pair<GroupCoherentSamples::iterator, bool> pair = 00634 group_samples.insert(GroupCoherentSamples::value_type( 00635 it->second->get_publication_id(), 00636 WriterCoherentSample(it->second->coherent_samples_, 00637 it->second->sequence_number_))); 00638 00639 if (pair.second == false) { 00640 ACE_ERROR_RETURN((LM_ERROR, 00641 ACE_TEXT("(%P|%t) ERROR: PublisherImpl::end_coherent_changes: ") 00642 ACE_TEXT("failed to insert to GroupCoherentSamples.\n")), 00643 DDS::RETCODE_ERROR); 00644 } 00645 } 00646 00647 for (PublicationMap::iterator it = this->publication_map_.begin(); 00648 it != this->publication_map_.end(); ++it) { 00649 if (it->second->coherent_samples_ == 0) { 00650 continue; 00651 } 00652 00653 it->second->end_coherent_changes(group_samples); 00654 } 00655 } 00656 00657 return DDS::RETCODE_OK; 00658 }
DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::get_default_datawriter_qos | ( | DDS::DataWriterQos & | qos | ) | [virtual] |
Definition at line 743 of file PublisherImpl.cpp.
References default_datawriter_qos_, and DDS::RETCODE_OK.
00744 { 00745 qos = default_datawriter_qos_; 00746 return DDS::RETCODE_OK; 00747 }
DDS::InstanceHandle_t OpenDDS::DCPS::PublisherImpl::get_instance_handle | ( | ) | [virtual] |
Implements OpenDDS::DCPS::EntityImpl.
Definition at line 70 of file PublisherImpl.cpp.
References handle_.
Referenced by OpenDDS::DCPS::PublisherMonitorImpl::report().
00071 { 00072 return handle_; 00073 }
DDS::PublisherListener_ptr OpenDDS::DCPS::PublisherImpl::get_listener | ( | ) | [virtual] |
Implements DDS::Publisher.
Definition at line 473 of file PublisherImpl.cpp.
References CORBA::LocalObject::_duplicate(), and listener_.
00474 { 00475 return DDS::PublisherListener::_duplicate(listener_.in()); 00476 }
DDS::DomainParticipant_ptr OpenDDS::DCPS::PublisherImpl::get_participant | ( | ) | [virtual] |
Implements DDS::Publisher.
Definition at line 725 of file PublisherImpl.cpp.
References OpenDDS::DCPS::RcHandle< T >::_retn(), OpenDDS::DCPS::WeakRcHandle< T >::lock(), and participant_.
Referenced by OpenDDS::DCPS::PublisherMonitorImpl::report().
00726 { 00727 return participant_.lock()._retn(); 00728 }
ACE_Recursive_Thread_Mutex& OpenDDS::DCPS::PublisherImpl::get_pi_lock | ( | ) | [inline] |
Definition at line 113 of file PublisherImpl.h.
00113 { return pi_lock_; }
void OpenDDS::DCPS::PublisherImpl::get_publication_ids | ( | PublicationIdVec & | pubs | ) |
Populates a std::vector with the PublicationIds (GUIDs) of this Publisher's Data Writers
Definition at line 901 of file PublisherImpl.cpp.
References pi_lock_, and publication_map_.
Referenced by OpenDDS::DCPS::PublisherMonitorImpl::report().
00902 { 00903 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 00904 guard, 00905 this->pi_lock_, 00906 ); 00907 00908 pubs.reserve(publication_map_.size()); 00909 for (PublicationMap::iterator iter = publication_map_.begin(); 00910 iter != publication_map_.end(); 00911 ++iter) { 00912 pubs.push_back(iter->first); 00913 } 00914 }
DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::get_qos | ( | DDS::PublisherQos & | qos | ) | [virtual] |
Definition at line 456 of file PublisherImpl.cpp.
References qos_, and DDS::RETCODE_OK.
00457 { 00458 qos = qos_; 00459 return DDS::RETCODE_OK; 00460 }
bool OpenDDS::DCPS::PublisherImpl::is_clean | ( | ) | const |
This method is not defined in the IDL and is defined for internal use. Check if there is any datawriter associated with this publisher.
Definition at line 797 of file PublisherImpl.cpp.
References datawriter_map_, pi_lock_, and publication_map_.
Referenced by OpenDDS::DCPS::DomainParticipantImpl::delete_publisher(), and ~PublisherImpl().
00798 { 00799 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 00800 guard, 00801 this->pi_lock_, 00802 false); 00803 return datawriter_map_.empty() && publication_map_.empty(); 00804 }
bool OpenDDS::DCPS::PublisherImpl::is_suspended | ( | void | ) | const |
Definition at line 498 of file PublisherImpl.cpp.
References pi_lock_, and suspend_depth_count_.
00499 { 00500 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 00501 guard, 00502 this->pi_lock_, 00503 false); 00504 return suspend_depth_count_; 00505 }
DDS::PublisherListener_ptr OpenDDS::DCPS::PublisherImpl::listener_for | ( | ::DDS::StatusKind | kind | ) |
This is used to retrieve the listener for a certain status change. If this publisher has a registered listener and the status kind is in the listener mask then the listener is returned. Otherwise, the query for listener is propagated up to the factory/DomainParticipant.
ACE_Time_Value OpenDDS::DCPS::PublisherImpl::liveliness_check_interval | ( | DDS::LivelinessQosPolicyKind | kind | ) |
Definition at line 878 of file PublisherImpl.cpp.
References datawriter_map_, and ACE_Time_Value::max_time.
00879 { 00880 ACE_Time_Value tv = ACE_Time_Value::max_time; 00881 for (DataWriterMap::iterator it(datawriter_map_.begin()); 00882 it != datawriter_map_.end(); ++it) { 00883 tv = std::min (tv, it->second->liveliness_check_interval(kind)); 00884 } 00885 return tv; 00886 }
DDS::DataWriter_ptr OpenDDS::DCPS::PublisherImpl::lookup_datawriter | ( | const char * | topic_name | ) | [virtual] |
Definition at line 306 of file PublisherImpl.cpp.
References CORBA::LocalObject::_duplicate(), CORBA::LocalObject::_nil(), ACE_TEXT(), datawriter_map_, OpenDDS::DCPS::DCPS_debug_level, LM_DEBUG, and pi_lock_.
00307 { 00308 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 00309 guard, 00310 this->pi_lock_, 00311 DDS::DataWriter::_nil()); 00312 00313 // If multiple entries whose key is "topic_name" then which one is 00314 // returned ? Spec does not limit which one should give. 00315 DataWriterMap::iterator it = datawriter_map_.find(topic_name); 00316 00317 if (it == datawriter_map_.end()) { 00318 if (DCPS_debug_level >= 2) { 00319 ACE_DEBUG((LM_DEBUG, 00320 ACE_TEXT("(%P|%t) ") 00321 ACE_TEXT("PublisherImpl::lookup_datawriter, ") 00322 ACE_TEXT("The datawriter(topic_name=%C) is not found\n"), 00323 topic_name)); 00324 } 00325 00326 return DDS::DataWriter::_nil(); 00327 00328 } else { 00329 return DDS::DataWriter::_duplicate(it->second.in()); 00330 } 00331 }
typedef OpenDDS::DCPS::PublisherImpl::OPENDDS_MAP_CMP | ( | RepoId | , | |
DDS::DataWriterQos | , | |||
GUID_tKeyLessThan | ||||
) | [private] |
typedef OpenDDS::DCPS::PublisherImpl::OPENDDS_MAP_CMP | ( | PublicationId | , | |
DataWriterImpl_rch | , | |||
GUID_tKeyLessThan | ||||
) | [private] |
typedef OpenDDS::DCPS::PublisherImpl::OPENDDS_MULTIMAP | ( | OPENDDS_STRING | , | |
DataWriterImpl_rch | ||||
) | [private] |
typedef OpenDDS::DCPS::PublisherImpl::OPENDDS_SET | ( | DataWriterImpl_rch | ) | [private] |
typedef OpenDDS::DCPS::PublisherImpl::OPENDDS_VECTOR | ( | PublicationId | ) |
RcHandle< EntityImpl > OpenDDS::DCPS::PublisherImpl::parent | ( | void | ) | const [virtual] |
Reimplemented from OpenDDS::DCPS::EntityImpl.
Definition at line 917 of file PublisherImpl.cpp.
References OpenDDS::DCPS::WeakRcHandle< T >::lock(), and participant_.
00918 { 00919 return this->participant_.lock(); 00920 }
bool OpenDDS::DCPS::PublisherImpl::participant_liveliness_activity_after | ( | const ACE_Time_Value & | tv | ) |
Definition at line 889 of file PublisherImpl.cpp.
References datawriter_map_.
00890 { 00891 for (DataWriterMap::iterator it(datawriter_map_.begin()); 00892 it != datawriter_map_.end(); ++it) { 00893 if (it->second->participant_liveliness_activity_after(tv)) { 00894 return true; 00895 } 00896 } 00897 return false; 00898 }
DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::resume_publications | ( | ) | [virtual] |
Implements DDS::Publisher.
Definition at line 508 of file PublisherImpl.cpp.
References ACE_TEXT(), OpenDDS::DCPS::EntityImpl::enabled_, LM_ERROR, pi_lock_, publication_map_, DDS::RETCODE_ERROR, DDS::RETCODE_NOT_ENABLED, DDS::RETCODE_OK, DDS::RETCODE_PRECONDITION_NOT_MET, and suspend_depth_count_.
00509 { 00510 if (enabled_ == false) { 00511 ACE_ERROR_RETURN((LM_ERROR, 00512 ACE_TEXT("(%P|%t) ERROR: ") 00513 ACE_TEXT("PublisherImpl::resume_publications, ") 00514 ACE_TEXT(" Entity is not enabled. \n")), 00515 DDS::RETCODE_NOT_ENABLED); 00516 } 00517 00518 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 00519 guard, 00520 this->pi_lock_, 00521 DDS::RETCODE_ERROR); 00522 00523 --suspend_depth_count_; 00524 00525 if (suspend_depth_count_ < 0) { 00526 suspend_depth_count_ = 0; 00527 return DDS::RETCODE_PRECONDITION_NOT_MET; 00528 } 00529 00530 if (suspend_depth_count_ == 0) { 00531 00532 for (PublicationMap::iterator it = this->publication_map_.begin(); 00533 it != this->publication_map_.end(); ++it) { 00534 it->second->send_suspended_data(); 00535 } 00536 } 00537 00538 return DDS::RETCODE_OK; 00539 }
DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::set_default_datawriter_qos | ( | const DDS::DataWriterQos & | qos | ) | [virtual] |
Definition at line 731 of file PublisherImpl.cpp.
References OpenDDS::DCPS::Qos_Helper::consistent(), default_datawriter_qos_, DDS::RETCODE_INCONSISTENT_POLICY, DDS::RETCODE_OK, and OpenDDS::DCPS::Qos_Helper::valid().
00732 { 00733 if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) { 00734 default_datawriter_qos_ = qos; 00735 return DDS::RETCODE_OK; 00736 00737 } else { 00738 return DDS::RETCODE_INCONSISTENT_POLICY; 00739 } 00740 }
DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::set_listener | ( | DDS::PublisherListener_ptr | a_listener, | |
DDS::StatusMask | mask | |||
) | [virtual] |
Definition at line 463 of file PublisherImpl.cpp.
References CORBA::LocalObject::_duplicate(), listener_, listener_mask_, and DDS::RETCODE_OK.
00465 { 00466 listener_mask_ = mask; 00467 //note: OK to duplicate a nil object ref 00468 listener_ = DDS::PublisherListener::_duplicate(a_listener); 00469 return DDS::RETCODE_OK; 00470 }
DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::set_qos | ( | const DDS::PublisherQos & | qos | ) | [virtual] |
Definition at line 378 of file PublisherImpl.cpp.
References ACE_TEXT(), OpenDDS::DCPS::Qos_Helper::changeable(), OpenDDS::DCPS::Qos_Helper::consistent(), domain_id_, OpenDDS::DCPS::EntityImpl::enabled_, LM_ERROR, OpenDDS::DCPS::WeakRcHandle< T >::lock(), OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK, OPENDDS_STRING, participant_, pi_lock_, publication_map_, qos_, DDS::RETCODE_ERROR, DDS::RETCODE_IMMUTABLE_POLICY, DDS::RETCODE_INCONSISTENT_POLICY, DDS::RETCODE_OK, DDS::RETCODE_UNSUPPORTED, status, TheServiceParticipant, and OpenDDS::DCPS::Qos_Helper::valid().
00379 { 00380 00381 OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED); 00382 00383 if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) { 00384 if (qos_ == qos) 00385 return DDS::RETCODE_OK; 00386 00387 // for the not changeable qos, it can be changed before enable 00388 if (!Qos_Helper::changeable(qos_, qos) && enabled_ == true) { 00389 return DDS::RETCODE_IMMUTABLE_POLICY; 00390 00391 } else { 00392 qos_ = qos; 00393 00394 DwIdToQosMap idToQosMap; 00395 { 00396 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 00397 guard, 00398 this->pi_lock_, 00399 DDS::RETCODE_ERROR); 00400 00401 for (PublicationMap::iterator iter = publication_map_.begin(); 00402 iter != publication_map_.end(); 00403 ++iter) { 00404 DDS::DataWriterQos qos; 00405 iter->second->get_qos(qos); 00406 RepoId id = iter->second->get_publication_id(); 00407 std::pair<DwIdToQosMap::iterator, bool> pair = 00408 idToQosMap.insert(DwIdToQosMap::value_type(id, qos)); 00409 00410 if (pair.second == false) { 00411 GuidConverter converter(id); 00412 ACE_ERROR_RETURN((LM_ERROR, 00413 ACE_TEXT("(%P|%t) ") 00414 ACE_TEXT("PublisherImpl::set_qos: ") 00415 ACE_TEXT("insert id %C to DwIdToQosMap ") 00416 ACE_TEXT("failed.\n"), 00417 OPENDDS_STRING(converter).c_str()), DDS::RETCODE_ERROR); 00418 } 00419 } 00420 } 00421 00422 DwIdToQosMap::iterator iter = idToQosMap.begin(); 00423 00424 while (iter != idToQosMap.end()) { 00425 Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_); 00426 bool status = false; 00427 00428 RcHandle<DomainParticipantImpl> participant = this->participant_.lock(); 00429 if (participant) 00430 status = disco->update_publication_qos( 00431 participant->get_domain_id(), 00432 participant->get_id(), 00433 iter->first, 00434 iter->second, 00435 this->qos_); 00436 00437 if (!status) { 00438 ACE_ERROR_RETURN((LM_ERROR, 00439 ACE_TEXT("(%P|%t) PublisherImpl::set_qos, ") 00440 ACE_TEXT("failed. \n")), 00441 DDS::RETCODE_ERROR); 00442 } 00443 00444 ++iter; 00445 } 00446 } 00447 00448 return DDS::RETCODE_OK; 00449 00450 } else { 00451 return DDS::RETCODE_INCONSISTENT_POLICY; 00452 } 00453 }
DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::suspend_publications | ( | ) | [virtual] |
Implements DDS::Publisher.
Definition at line 479 of file PublisherImpl.cpp.
References ACE_TEXT(), OpenDDS::DCPS::EntityImpl::enabled_, LM_ERROR, pi_lock_, DDS::RETCODE_ERROR, DDS::RETCODE_NOT_ENABLED, DDS::RETCODE_OK, and suspend_depth_count_.
00480 { 00481 if (enabled_ == false) { 00482 ACE_ERROR_RETURN((LM_ERROR, 00483 ACE_TEXT("(%P|%t) ERROR: ") 00484 ACE_TEXT("PublisherImpl::suspend_publications, ") 00485 ACE_TEXT(" Entity is not enabled. \n")), 00486 DDS::RETCODE_NOT_ENABLED); 00487 } 00488 00489 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 00490 guard, 00491 this->pi_lock_, 00492 DDS::RETCODE_ERROR); 00493 ++suspend_depth_count_; 00494 return DDS::RETCODE_OK; 00495 }
bool OpenDDS::DCPS::PublisherImpl::validate_datawriter_qos | ( | const DDS::DataWriterQos & | qos, | |
const DDS::DataWriterQos & | default_qos, | |||
DDS::Topic_ptr | a_topic, | |||
DDS::DataWriterQos & | dw_qos | |||
) | [static] |
Definition at line 923 of file PublisherImpl.cpp.
References CORBA::LocalObject::_nil(), ACE_TEXT(), OpenDDS::DCPS::Qos_Helper::consistent(), copy_from_topic_qos(), DATAWRITER_QOS_DEFAULT, DATAWRITER_QOS_USE_TOPIC_QOS, CORBA::is_nil(), LM_ERROR, OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK, OPENDDS_NO_DURABILITY_SERVICE_COMPATIBILITY_CHECK, OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK, OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK, OPENDDS_NO_OWNERSHIP_STRENGTH_COMPATIBILITY_CHECK, and OpenDDS::DCPS::Qos_Helper::valid().
Referenced by create_datawriter(), and OpenDDS::DCPS::DomainParticipantImpl::create_replayer().
00927 { 00928 if (CORBA::is_nil(a_topic)) { 00929 ACE_ERROR((LM_ERROR, 00930 ACE_TEXT("(%P|%t) ERROR: ") 00931 ACE_TEXT("PublisherImpl::create_datawriter, ") 00932 ACE_TEXT("topic is nil.\n"))); 00933 return DDS::DataWriter::_nil(); 00934 } 00935 00936 if (qos == DATAWRITER_QOS_DEFAULT) { 00937 dw_qos = default_qos; 00938 00939 } else if (qos == DATAWRITER_QOS_USE_TOPIC_QOS) { 00940 DDS::TopicQos topic_qos; 00941 a_topic->get_qos(topic_qos); 00942 dw_qos = default_qos; 00943 00944 Qos_Helper::copy_from_topic_qos(dw_qos, topic_qos); 00945 00946 } else { 00947 dw_qos = qos; 00948 } 00949 00950 OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(dw_qos, DDS::DataWriter::_nil()); 00951 OPENDDS_NO_OWNERSHIP_STRENGTH_COMPATIBILITY_CHECK(dw_qos, DDS::DataWriter::_nil()); 00952 OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(dw_qos, DDS::DataWriter::_nil()); 00953 OPENDDS_NO_DURABILITY_SERVICE_COMPATIBILITY_CHECK(dw_qos, DDS::DataWriter::_nil()); 00954 OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(dw_qos, DDS::DataWriter::_nil()); 00955 00956 if (!Qos_Helper::valid(dw_qos)) { 00957 ACE_ERROR((LM_ERROR, 00958 ACE_TEXT("(%P|%t) ERROR: ") 00959 ACE_TEXT("PublisherImpl::create_datawriter, ") 00960 ACE_TEXT("invalid qos.\n"))); 00961 return DDS::DataWriter::_nil(); 00962 } 00963 00964 if (!Qos_Helper::consistent(dw_qos)) { 00965 ACE_ERROR((LM_ERROR, 00966 ACE_TEXT("(%P|%t) ERROR: ") 00967 ACE_TEXT("PublisherImpl::create_datawriter, ") 00968 ACE_TEXT("inconsistent qos.\n"))); 00969 return DDS::DataWriter::_nil(); 00970 } 00971 return true; 00972 }
DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::wait_for_acknowledgments | ( | const DDS::Duration_t & | max_wait | ) | [virtual] |
Definition at line 663 of file PublisherImpl.cpp.
References ACE_TEXT(), datawriter_map_, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::EntityImpl::enabled_, OpenDDS::DCPS::RcHandle< T >::in(), LM_DEBUG, LM_ERROR, OpenDDS::DCPS::OPENDDS_MAP(), pi_lock_, DDS::RELIABLE_RELIABILITY_QOS, DDS::RETCODE_ERROR, DDS::RETCODE_NOT_ENABLED, and DDS::RETCODE_OK.
00665 { 00666 if (enabled_ == false) { 00667 ACE_ERROR_RETURN((LM_ERROR, 00668 ACE_TEXT("(%P|%t) ERROR: PublisherImpl::wait_for_acknowledgments, ") 00669 ACE_TEXT("Entity is not enabled.\n")), 00670 DDS::RETCODE_NOT_ENABLED); 00671 } 00672 00673 typedef OPENDDS_MAP(DataWriterImpl*, DataWriterImpl::AckToken) DataWriterAckMap; 00674 DataWriterAckMap ack_writers; 00675 { 00676 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 00677 guard, 00678 this->pi_lock_, 00679 DDS::RETCODE_ERROR); 00680 00681 // Collect writers to request acks 00682 for (DataWriterMap::iterator it(this->datawriter_map_.begin()); 00683 it != this->datawriter_map_.end(); ++it) { 00684 DataWriterImpl_rch writer = it->second; 00685 if (writer->qos_.reliability.kind != DDS::RELIABLE_RELIABILITY_QOS) 00686 continue; 00687 if (writer->should_ack()) { 00688 DataWriterImpl::AckToken token = writer->create_ack_token(max_wait); 00689 00690 std::pair<DataWriterAckMap::iterator, bool> pair = 00691 ack_writers.insert(DataWriterAckMap::value_type(writer.in(), token)); 00692 00693 if (!pair.second) { 00694 ACE_ERROR_RETURN((LM_ERROR, 00695 ACE_TEXT("(%P|%t) ERROR: PublisherImpl::wait_for_acknowledgments, ") 00696 ACE_TEXT("Unable to insert AckToken into DataWriterAckMap!\n")), 00697 DDS::RETCODE_ERROR); 00698 } 00699 } 00700 } 00701 } 00702 00703 if (ack_writers.empty()) { 00704 if (DCPS_debug_level > 0) { 00705 ACE_DEBUG((LM_DEBUG, 00706 ACE_TEXT("(%P|%t) PublisherImpl::wait_for_acknowledgments() - ") 00707 ACE_TEXT("not blocking due to no writers requiring acks.\n"))); 00708 } 00709 00710 return DDS::RETCODE_OK; 00711 } 00712 00713 // Wait for ack responses from all associated readers 00714 for (DataWriterAckMap::iterator it(ack_writers.begin()); 00715 it != ack_writers.end(); ++it) { 00716 DataWriterImpl::AckToken token = it->second; 00717 00718 it->first->wait_for_specific_ack(token); 00719 } 00720 00721 return DDS::RETCODE_OK; 00722 }
DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::writer_enabled | ( | const char * | topic_name, | |
DataWriterImpl * | impl | |||
) |
This method is called when the datawriter created by this publisher was enabled.
Definition at line 807 of file PublisherImpl.cpp.
References ACE_TEXT(), datawriter_map_, LM_ERROR, monitor_, OPENDDS_STRING, pi_lock_, publication_map_, OpenDDS::DCPS::rchandle_from(), OpenDDS::DCPS::Monitor::report(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, and writers_not_enabled_.
00809 { 00810 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 00811 guard, 00812 this->pi_lock_, 00813 DDS::RETCODE_ERROR); 00814 DataWriterImpl_rch writer = rchandle_from(writer_ptr); 00815 writers_not_enabled_.erase(writer); 00816 00817 datawriter_map_.insert(DataWriterMap::value_type(topic_name, writer)); 00818 00819 const RepoId publication_id = writer->get_publication_id(); 00820 00821 std::pair<PublicationMap::iterator, bool> pair = 00822 publication_map_.insert(PublicationMap::value_type(publication_id, writer)); 00823 00824 if (pair.second == false) { 00825 GuidConverter converter(publication_id); 00826 ACE_ERROR_RETURN((LM_ERROR, 00827 ACE_TEXT("(%P|%t) ERROR: ") 00828 ACE_TEXT("PublisherImpl::writer_enabled: ") 00829 ACE_TEXT("insert publication %C failed.\n"), 00830 OPENDDS_STRING(converter).c_str()), DDS::RETCODE_ERROR); 00831 } 00832 00833 if (this->monitor_) { 00834 this->monitor_->report(); 00835 } 00836 00837 return DDS::RETCODE_OK; 00838 }
friend class DataWriterImpl [friend] |
Definition at line 43 of file PublisherImpl.h.
Start of current aggregation period. - NOT USED IN FIRST IMPL.
Definition at line 196 of file PublisherImpl.h.
std::size_t OpenDDS::DCPS::PublisherImpl::change_depth_ [private] |
The number of times begin_coherent_changes as been called.
Definition at line 184 of file PublisherImpl.h.
Referenced by begin_coherent_changes(), and end_coherent_changes().
DataWriterMap OpenDDS::DCPS::PublisherImpl::datawriter_map_ [private] |
This map is used to support datawriter lookup by topic name.
Definition at line 178 of file PublisherImpl.h.
Referenced by assert_liveliness_by_participant(), contains_writer(), delete_contained_entities(), delete_datawriter(), is_clean(), liveliness_check_interval(), lookup_datawriter(), participant_liveliness_activity_after(), wait_for_acknowledgments(), writer_enabled(), and ~PublisherImpl().
Default datawriter Qos policy list.
Definition at line 166 of file PublisherImpl.h.
Referenced by create_datawriter(), get_default_datawriter_qos(), and set_default_datawriter_qos().
Domain in which we are contained.
Definition at line 187 of file PublisherImpl.h.
Referenced by delete_datawriter(), and set_qos().
Definition at line 161 of file PublisherImpl.h.
Referenced by get_instance_handle().
DDS::PublisherListener_var OpenDDS::DCPS::PublisherImpl::listener_ [private] |
Used to notify the entity for relevant events.
Definition at line 172 of file PublisherImpl.h.
Referenced by get_listener(), and set_listener().
The StatusKind bit mask indicates which status condition change can be notified by the listener of this entity.
Definition at line 170 of file PublisherImpl.h.
Referenced by set_listener().
Monitor* OpenDDS::DCPS::PublisherImpl::monitor_ [private] |
Monitor object for this entity.
Definition at line 205 of file PublisherImpl.h.
Referenced by delete_datawriter(), enable(), PublisherImpl(), and writer_enabled().
The DomainParticipant servant that owns this Publisher.
Definition at line 189 of file PublisherImpl.h.
Referenced by create_datawriter(), delete_datawriter(), enable(), get_participant(), parent(), and set_qos().
lock_type OpenDDS::DCPS::PublisherImpl::pi_lock_ [mutable, private] |
The recursive lock to protect datawriter map and suspend count.
Definition at line 201 of file PublisherImpl.h.
Referenced by begin_coherent_changes(), contains_writer(), create_datawriter(), delete_contained_entities(), delete_datawriter(), enable(), end_coherent_changes(), get_publication_ids(), is_clean(), is_suspended(), lookup_datawriter(), resume_publications(), set_qos(), suspend_publications(), wait_for_acknowledgments(), and writer_enabled().
PublicationMap OpenDDS::DCPS::PublisherImpl::publication_map_ [private] |
This map is used to support datawriter lookup by datawriter repository id.
Definition at line 181 of file PublisherImpl.h.
Referenced by begin_coherent_changes(), delete_datawriter(), end_coherent_changes(), get_publication_ids(), is_clean(), resume_publications(), set_qos(), writer_enabled(), and ~PublisherImpl().
Definition at line 209 of file PublisherImpl.h.
Publisher QoS policy list.
Definition at line 164 of file PublisherImpl.h.
Referenced by begin_coherent_changes(), create_datawriter(), enable(), end_coherent_changes(), get_qos(), and set_qos().
Definition at line 202 of file PublisherImpl.h.
Referenced by delete_datawriter().
Unique sequence number used when the scope_access = GROUP.
Definition at line 194 of file PublisherImpl.h.
The suspend depth count.
Definition at line 191 of file PublisherImpl.h.
Referenced by is_suspended(), resume_publications(), and suspend_publications().
DataWriterSet OpenDDS::DCPS::PublisherImpl::writers_not_enabled_ [private] |
Definition at line 175 of file PublisherImpl.h.
Referenced by create_datawriter(), enable(), and writer_enabled().