#include <RecorderImpl.h>
Inheritance diagram for OpenDDS::DCPS::RecorderImpl:
This class is the implmentation of the Recorder. Inheritance is used to limit the applications access to underlying system methods.
Definition at line 39 of file RecorderImpl.h.
OpenDDS::DCPS::RecorderImpl::RecorderImpl | ( | ) |
Definition at line 47 of file RecorderImpl.cpp.
References DDS::SubscriptionMatchedStatus::current_count, DDS::SubscriptionMatchedStatus::current_count_change, DDS::HANDLE_NIL, DDS::RequestedIncompatibleQosStatus::last_policy_id, DDS::SubscriptionMatchedStatus::last_publication_handle, DDS::RequestedIncompatibleQosStatus::policies, requested_incompatible_qos_status_, subscription_match_status_, DDS::SubscriptionMatchedStatus::total_count, DDS::RequestedIncompatibleQosStatus::total_count, DDS::SubscriptionMatchedStatus::total_count_change, and DDS::RequestedIncompatibleQosStatus::total_count_change.
00048 : qos_(TheServiceParticipant->initial_DataReaderQos()), 00049 participant_servant_(0), 00050 topic_servant_(0), 00051 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE 00052 is_exclusive_ownership_ (false), 00053 #endif 00054 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE 00055 owner_manager_ (0), 00056 #endif 00057 subqos_ (TheServiceParticipant->initial_SubscriberQos()), 00058 topic_desc_(0), 00059 listener_mask_(DEFAULT_STATUS_MASK), 00060 domain_id_(0), 00061 remove_association_sweeper_( 00062 new RemoveAssociationSweeper<RecorderImpl>(TheServiceParticipant->reactor(), 00063 TheServiceParticipant->reactor_owner(), 00064 this)), 00065 is_bit_(false) 00066 { 00067 00068 requested_incompatible_qos_status_.total_count = 0; 00069 requested_incompatible_qos_status_.total_count_change = 0; 00070 requested_incompatible_qos_status_.last_policy_id = 0; 00071 requested_incompatible_qos_status_.policies.length(0); 00072 00073 subscription_match_status_.total_count = 0; 00074 subscription_match_status_.total_count_change = 0; 00075 subscription_match_status_.current_count = 0; 00076 subscription_match_status_.current_count_change = 0; 00077 subscription_match_status_.last_publication_handle = 00078 DDS::HANDLE_NIL; 00079 00080 }
OpenDDS::DCPS::RecorderImpl::~RecorderImpl | ( | ) | [virtual] |
Definition at line 84 of file RecorderImpl.cpp.
References DBG_ENTRY_LVL, remove_association_sweeper_, and writers_.
00085 { 00086 DBG_ENTRY_LVL("RecorderImpl","~RecorderImpl",6); 00087 { 00088 ACE_READ_GUARD(ACE_RW_Thread_Mutex, 00089 read_guard, 00090 this->writers_lock_); 00091 // Cancel any uncancelled sweeper timers to decrement reference count. 00092 WriterMapType::iterator writer; 00093 for (writer = writers_.begin(); writer != writers_.end(); ++writer) { 00094 remove_association_sweeper_->cancel_timer(writer->second); 00095 } 00096 } 00097 00098 remove_association_sweeper_->wait(); 00099 remove_association_sweeper_->destroy(); 00100 }
void OpenDDS::DCPS::RecorderImpl::_add_ref | ( | ) | [inline, private, virtual] |
void OpenDDS::DCPS::RecorderImpl::_remove_ref | ( | ) | [inline, private, virtual] |
void OpenDDS::DCPS::RecorderImpl::add_association | ( | const RepoId & | yourId, | |
const WriterAssociation & | writer, | |||
bool | active | |||
) | [virtual] |
Implements OpenDDS::DCPS::DataReaderCallbacks.
Definition at line 279 of file RecorderImpl.cpp.
References DDS::SubscriptionMatchedStatus::current_count, DDS::SubscriptionMatchedStatus::current_count_change, OpenDDS::DCPS::DCPS_debug_level, DDS::DataWriterQos::durability, OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::DomainParticipantImpl::id_to_handle(), id_to_handle_map_, OpenDDS::DCPS::RcHandle< T >::in(), is_bit_, DDS::SubscriptionMatchedStatus::last_publication_handle, listener_, OPENDDS_STRING, participant_servant_, OpenDDS::DCPS::AssociationData::publication_transport_priority_, DDS::DataWriterQos::reliability, DDS::RELIABLE_RELIABILITY_QOS, OpenDDS::DCPS::AssociationData::remote_data_, OpenDDS::DCPS::AssociationData::remote_durable_, OpenDDS::DCPS::AssociationData::remote_id_, OpenDDS::DCPS::AssociationData::remote_reliable_, OpenDDS::DCPS::WriterInfoListener::subscription_id_, subscription_match_status_, TheServiceParticipant, DDS::SubscriptionMatchedStatus::total_count, DDS::SubscriptionMatchedStatus::total_count_change, DDS::DataWriterQos::transport_priority, DDS::VOLATILE_DURABILITY_QOS, OpenDDS::DCPS::WriterAssociation::writerId, OpenDDS::DCPS::WriterAssociation::writerQos, writers_, and OpenDDS::DCPS::WriterAssociation::writerTransInfo.
00282 { 00283 ACE_DEBUG((LM_DEBUG, "RecorderImpl::add_association\n")); 00284 // 00285 // The following block is for diagnostic purposes only. 00286 // 00287 if (DCPS_debug_level >= 1) { 00288 GuidConverter reader_converter(yourId); 00289 GuidConverter writer_converter(writer.writerId); 00290 ACE_DEBUG((LM_DEBUG, 00291 ACE_TEXT("(%P|%t) RecorderImpl::add_association - ") 00292 ACE_TEXT("bit %d local %C remote %C\n"), 00293 is_bit_, 00294 OPENDDS_STRING(reader_converter).c_str(), 00295 OPENDDS_STRING(writer_converter).c_str())); 00296 } 00297 00298 // 00299 // This block prevents adding associations to deleted readers. 00300 // Presumably this is a "good thing(tm)". 00301 // 00302 // if (entity_deleted_ == true) { 00303 // if (DCPS_debug_level >= 1) 00304 // ACE_DEBUG((LM_DEBUG, 00305 // ACE_TEXT("(%P|%t) RecorderImpl::add_association") 00306 // ACE_TEXT(" This is a deleted datareader, ignoring add.\n"))); 00307 // 00308 // return; 00309 // } 00310 00311 // 00312 // We are being called back from the repository before we are done 00313 // processing after our call to the repository that caused this call 00314 // (from the repository) to be made. 00315 // 00316 if (GUID_UNKNOWN == subscription_id_) { 00317 // add_associations was invoked before DCSPInfoRepo::add_subscription() returned. 00318 subscription_id_ = yourId; 00319 } 00320 00321 // 00322 // We do the following while holding the publication_handle_lock_. 00323 // 00324 { 00325 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->publication_handle_lock_); 00326 00327 // 00328 // For each writer in the list of writers to associate with, we 00329 // create a WriterInfo and a WriterStats object and store them in 00330 // our internal maps. 00331 // 00332 { 00333 ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_); 00334 00335 const PublicationId& writer_id = writer.writerId; 00336 RcHandle<WriterInfo> info = new WriterInfo(this, writer_id, writer.writerQos); 00337 /*std::pair<WriterMapType::iterator, bool> bpair =*/ 00338 this->writers_.insert( 00339 // This insertion is idempotent. 00340 WriterMapType::value_type( 00341 writer_id, 00342 info)); 00343 // this->statistics_.insert( 00344 // StatsMapType::value_type( 00345 // writer_id, 00346 // WriterStats( 00347 // this->raw_latency_buffer_size_, 00348 // this->raw_latency_buffer_type_))); 00349 00350 // if (DCPS_debug_level > 4) { 00351 // GuidConverter converter(writer_id); 00352 // ACE_DEBUG((LM_DEBUG, 00353 // "(%P|%t) RecorderImpl::add_association: " 00354 // "inserted writer %C.return %d \n", 00355 // OPENDDS_STRING(converter).c_str(), bpair.second)); 00356 // 00357 // WriterMapType::iterator iter = writers_.find(writer_id); 00358 // if (iter != writers_.end()) { 00359 // // This may not be an error since it could happen that the sample 00360 // // is delivered to the datareader after the write is dis-associated 00361 // // with this datareader. 00362 // GuidConverter reader_converter(subscription_id_); 00363 // GuidConverter writer_converter(writer_id); 00364 // ACE_DEBUG((LM_DEBUG, 00365 // ACE_TEXT("(%P|%t) RecorderImpl::add_association: ") 00366 // ACE_TEXT("reader %C is associated with writer %C.\n"), 00367 // OPENDDS_STRING(reader_converter).c_str(), 00368 // OPENDDS_STRING(writer_converter).c_str())); 00369 // } 00370 // } 00371 } 00372 00373 // 00374 // Propagate the add_associations processing down into the Transport 00375 // layer here. This will establish the transport support and reserve 00376 // usage of an existing connection or initiate creation of a new 00377 // connection if no suitable connection is available. 00378 // 00379 AssociationData data; 00380 data.remote_id_ = writer.writerId; 00381 data.remote_data_ = writer.writerTransInfo; 00382 data.publication_transport_priority_ = 00383 writer.writerQos.transport_priority.value; 00384 data.remote_reliable_ = 00385 (writer.writerQos.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS); 00386 data.remote_durable_ = 00387 (writer.writerQos.durability.kind > DDS::VOLATILE_DURABILITY_QOS); 00388 00389 if (!this->associate(data, active)) { 00390 if (DCPS_debug_level) { 00391 ACE_DEBUG((LM_ERROR, 00392 ACE_TEXT("(%P|%t) RecorderImpl::add_association: ") 00393 ACE_TEXT("ERROR: transport layer failed to associate.\n"))); 00394 } 00395 return; 00396 } 00397 00398 // Check if any publications have already sent a REQUEST_ACK message. 00399 // { 00400 // ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_); 00401 // 00402 // WriterMapType::iterator where = this->writers_.find(writer.writerId); 00403 // 00404 // if (where != this->writers_.end()) { 00405 // const ACE_Time_Value now = ACE_OS::gettimeofday(); 00406 // 00407 // ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_); 00408 // 00409 // if (where->second->should_ack(now)) { 00410 // const SequenceNumber sequence = where->second->ack_sequence(); 00411 // const DDS::Time_t timenow = time_value_to_time(now); 00412 // if (this->send_sample_ack(writer.writerId, sequence, timenow)) { 00413 // where->second->clear_acks(sequence); 00414 // } 00415 // } 00416 // } 00417 // } 00418 00419 // 00420 // LIVELINESS policy timers are managed here. 00421 // 00422 // if (liveliness_lease_duration_ != ACE_Time_Value::zero) { 00423 // // this call will start the timer if it is not already set 00424 // const ACE_Time_Value now = ACE_OS::gettimeofday(); 00425 // 00426 // if (DCPS_debug_level >= 5) { 00427 // GuidConverter converter(subscription_id_); 00428 // ACE_DEBUG((LM_DEBUG, 00429 // ACE_TEXT("(%P|%t) RecorderImpl::add_association: ") 00430 // ACE_TEXT("starting/resetting liveliness timer for reader %C\n"), 00431 // OPENDDS_STRING(converter).c_str())); 00432 // } 00433 // 00434 // this->handle_timeout(now, this); 00435 // } 00436 00437 // else - no timer needed when LIVELINESS.lease_duration is INFINITE 00438 00439 } 00440 // 00441 // We no longer hold the publication_handle_lock_. 00442 // 00443 00444 // 00445 // We only do the following processing for readers that are *not* 00446 // readers of Builtin Topics. 00447 // 00448 if (!is_bit_) { 00449 00450 DDS::InstanceHandle_t handle = 00451 this->participant_servant_->id_to_handle(writer.writerId); 00452 00453 // 00454 // We acquire the publication_handle_lock_ for the remainder of our 00455 // processing. 00456 // 00457 { 00458 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->publication_handle_lock_); 00459 00460 // This insertion is idempotent. 00461 this->id_to_handle_map_.insert( 00462 RepoIdToHandleMap::value_type(writer.writerId, handle)); 00463 00464 if (DCPS_debug_level > 4) { 00465 GuidConverter converter(writer.writerId); 00466 ACE_DEBUG((LM_DEBUG, 00467 ACE_TEXT("(%P|%t) RecorderImpl::add_association: ") 00468 ACE_TEXT("id_to_handle_map_[ %C] = 0x%x.\n"), 00469 OPENDDS_STRING(converter).c_str(), 00470 handle)); 00471 } 00472 00473 // We need to adjust these after the insertions have all completed 00474 // since insertions are not guaranteed to increase the number of 00475 // currently matched publications. 00476 int matchedPublications = static_cast<int>(this->id_to_handle_map_.size()); 00477 this->subscription_match_status_.current_count_change 00478 = matchedPublications - this->subscription_match_status_.current_count; 00479 this->subscription_match_status_.current_count = matchedPublications; 00480 00481 ++this->subscription_match_status_.total_count; 00482 ++this->subscription_match_status_.total_count_change; 00483 00484 this->subscription_match_status_.last_publication_handle = handle; 00485 00486 // set_status_changed_flag(DDS::SUBSCRIPTION_MATCHED_STATUS, true); 00487 00488 00489 if (listener_.in()) { 00490 listener_->on_recorder_matched( 00491 this, 00492 this->subscription_match_status_); 00493 00494 // TBD - why does the spec say to change this but not change 00495 // the ChangeFlagStatus after a listener call? 00496 00497 // Client will look at it so next time it looks the change should be 0 00498 this->subscription_match_status_.total_count_change = 0; 00499 this->subscription_match_status_.current_count_change = 0; 00500 } 00501 00502 // notify_status_condition(); 00503 } 00504 00505 { 00506 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_); 00507 00508 this->writers_[writer.writerId]->handle_ = handle; 00509 } 00510 } 00511 00512 if (!active) { 00513 Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_); 00514 disco->association_complete(this->domain_id_, 00515 this->participant_servant_->get_id(), 00516 this->subscription_id_, writer.writerId); 00517 } 00518 00519 // if (this->monitor_) { 00520 // this->monitor_->report(); 00521 // } 00522 }
void OpenDDS::DCPS::RecorderImpl::association_complete | ( | const RepoId & | remote_id | ) | [virtual] |
Implements OpenDDS::DCPS::DataReaderCallbacks.
Definition at line 525 of file RecorderImpl.cpp.
00526 { 00527 // For the current DCPSInfoRepo implementation, the DataReader side will 00528 // always be passive, so association_complete() will not be called. 00529 }
bool OpenDDS::DCPS::RecorderImpl::check_transport_qos | ( | const TransportInst & | inst | ) | [virtual] |
Implements OpenDDS::DCPS::TransportClient.
Definition at line 199 of file RecorderImpl.cpp.
References OpenDDS::DCPS::TransportInst::is_reliable(), and DDS::RELIABLE_RELIABILITY_QOS.
00200 { 00201 if (this->qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS) { 00202 return ti.is_reliable(); 00203 } 00204 return true; 00205 }
DDS::ReturnCode_t OpenDDS::DCPS::RecorderImpl::cleanup | ( | ) |
cleanup the DataWriter.
Definition at line 104 of file RecorderImpl.cpp.
References OpenDDS::DCPS::DomainParticipantImpl::get_id(), participant_servant_, remove_all_associations(), remove_association_sweeper_, OpenDDS::DCPS::TopicImpl::remove_entity_ref(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::WriterInfoListener::subscription_id_, TheServiceParticipant, topic_servant_, and writers_.
Referenced by OpenDDS::DCPS::Service_Participant::delete_recorder().
00105 { 00106 00107 Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_); 00108 if (!disco->remove_subscription(this->domain_id_, 00109 participant_servant_->get_id(), 00110 this->subscription_id_)) { 00111 ACE_ERROR_RETURN((LM_ERROR, 00112 ACE_TEXT("(%P|%t) ERROR: ") 00113 ACE_TEXT("RecorderImpl::cleanup: ") 00114 ACE_TEXT(" could not remove subscription from discovery.\n")), 00115 DDS::RETCODE_ERROR); 00116 } 00117 00118 // Call remove association before unregistering the datareader from the transport, 00119 // otherwise some callbacks resulted from remove_association may lost. 00120 00121 this->remove_all_associations(); 00122 00123 if (topic_servant_) { 00124 topic_servant_->remove_entity_ref(); 00125 topic_servant_->_remove_ref(); 00126 } 00127 { 00128 ACE_READ_GUARD_RETURN(ACE_RW_Thread_Mutex, 00129 read_guard, 00130 this->writers_lock_, 00131 0); 00132 // Cancel any uncancelled sweeper timers 00133 WriterMapType::iterator writer; 00134 for (writer = writers_.begin(); writer != writers_.end(); ++writer) { 00135 remove_association_sweeper_->cancel_timer(writer->second); 00136 } 00137 } 00138 00139 remove_association_sweeper_->wait(); 00140 return DDS::RETCODE_OK; 00141 }
void OpenDDS::DCPS::RecorderImpl::data_received | ( | const ReceivedDataSample & | sample | ) | [virtual] |
Implements OpenDDS::DCPS::TransportReceiveListener.
Definition at line 218 of file RecorderImpl.cpp.
References OpenDDS::DCPS::DataSampleHeader::byte_order_, DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::ReceivedDataSample::header_, OpenDDS::DCPS::RcHandle< T >::in(), listener_, OpenDDS::DCPS::DataSampleHeader::message_id_, OPENDDS_STRING, OpenDDS::DCPS::DataSampleHeader::publication_id_, OpenDDS::DCPS::ReceivedDataSample::sample_, OpenDDS::DCPS::SAMPLE_DATA, OpenDDS::DCPS::DataSampleHeader::source_timestamp_nanosec_, OpenDDS::DCPS::DataSampleHeader::source_timestamp_sec_, OpenDDS::DCPS::WriterInfoListener::subscription_id_, and OpenDDS::DCPS::to_string().
00219 { 00220 DBG_ENTRY_LVL("RecorderImpl","data_received",6); 00221 00222 // ensure some other thread is not changing the sample container 00223 // or statuses related to samples. 00224 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_); 00225 00226 if (DCPS_debug_level > 9) { 00227 GuidConverter converter(subscription_id_); 00228 ACE_DEBUG((LM_DEBUG, 00229 ACE_TEXT("(%P|%t) RecorderImpl::data_received: ") 00230 ACE_TEXT("%C received sample: %C.\n"), 00231 OPENDDS_STRING(converter).c_str(), 00232 to_string(sample.header_).c_str())); 00233 } 00234 00235 // we only support SAMPLE_DATA messages 00236 if (sample.header_.message_id_ != SAMPLE_DATA) 00237 return; 00238 00239 RawDataSample rawSample(static_cast<MessageId> (sample.header_.message_id_), 00240 sample.header_.source_timestamp_sec_, 00241 sample.header_.source_timestamp_nanosec_, 00242 sample.header_.publication_id_, 00243 sample.header_.byte_order_, 00244 sample.sample_); 00245 00246 if (listener_.in()) { 00247 listener_->on_sample_data_received(this, rawSample); 00248 } 00249 00250 }
DDS::DomainId_t OpenDDS::DCPS::RecorderImpl::domain_id | ( | ) | const [inline, virtual] |
Implements OpenDDS::DCPS::TransportClient.
Definition at line 73 of file RecorderImpl.h.
00073 { return this->domain_id_; }
DDS::ReturnCode_t OpenDDS::DCPS::RecorderImpl::enable | ( | ) |
Implements DDS::Entity.
Definition at line 968 of file RecorderImpl.cpp.
References OpenDDS::DCPS::TransportClient::connection_info(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::TransportClient::enable_transport(), OpenDDS::DCPS::TopicImpl::get_id(), OpenDDS::DCPS::TopicDescriptionImpl::get_name(), OpenDDS::DCPS::GUID_UNKNOWN, DDS::RELIABLE_RELIABILITY_QOS, DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::EntityImpl::set_enabled(), subqos_, OpenDDS::DCPS::WriterInfoListener::subscription_id_, TheServiceParticipant, topic_servant_, and DDS::VOLATILE_DURABILITY_QOS.
Referenced by OpenDDS::DCPS::DomainParticipantImpl::create_recorder().
00969 { 00970 if (DCPS_debug_level >= 1) { 00971 00972 ACE_DEBUG((LM_DEBUG, 00973 ACE_TEXT("(%P|%t) RecorderImpl::enable\n"))); 00974 } 00975 //According spec: 00976 // - Calling enable on an already enabled Entity returns OK and has no 00977 // effect. 00978 // - Calling enable on an Entity whose factory is not enabled will fail 00979 // and return PRECONDITION_NOT_MET. 00980 00981 if (this->is_enabled()) { 00982 return DDS::RETCODE_OK; 00983 } 00984 00985 this->set_enabled(); 00986 00987 // if (topic_servant_ && !transport_disabled_) { 00988 if (topic_servant_) { 00989 00990 ACE_DEBUG((LM_DEBUG, 00991 ACE_TEXT("(%P|%t) RecorderImpl::enable_transport\n"))); 00992 00993 try { 00994 this->enable_transport(this->qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS, 00995 this->qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS); 00996 } catch (const Transport::Exception&) { 00997 ACE_ERROR((LM_ERROR, 00998 ACE_TEXT("(%P|%t) ERROR: RecorderImpl::enable, ") 00999 ACE_TEXT("Transport Exception.\n"))); 01000 return DDS::RETCODE_ERROR; 01001 01002 } 01003 01004 const TransportLocatorSeq& trans_conf_info = this->connection_info(); 01005 01006 CORBA::String_var filterClassName = ""; 01007 CORBA::String_var filterExpression = ""; 01008 DDS::StringSeq exprParams; 01009 01010 01011 Discovery_rch disco = 01012 TheServiceParticipant->get_discovery(this->domain_id_); 01013 01014 ACE_DEBUG((LM_DEBUG, 01015 ACE_TEXT("(%P|%t) RecorderImpl::add_subscription\n"))); 01016 01017 this->subscription_id_ = 01018 disco->add_subscription(this->domain_id_, 01019 this->participant_servant_->get_id(), 01020 this->topic_servant_->get_id(), 01021 this, 01022 this->qos_, 01023 trans_conf_info, 01024 this->subqos_, 01025 filterClassName, 01026 filterExpression, 01027 exprParams); 01028 01029 if (this->subscription_id_ == OpenDDS::DCPS::GUID_UNKNOWN) { 01030 ACE_ERROR((LM_ERROR, 01031 ACE_TEXT("(%P|%t) ERROR: RecorderImpl::enable, ") 01032 ACE_TEXT("add_subscription returned invalid id.\n"))); 01033 return DDS::RETCODE_ERROR; 01034 } 01035 } 01036 01037 if (topic_servant_) { 01038 const CORBA::String_var name = topic_servant_->get_name(); 01039 DDS::ReturnCode_t return_value = DDS::RETCODE_OK; 01040 // this->participant_servant_->recorder_enabled(name.in(), this); 01041 01042 return return_value; 01043 } else { 01044 return DDS::RETCODE_OK; 01045 } 01046 }
DDS::InstanceHandle_t OpenDDS::DCPS::RecorderImpl::get_instance_handle | ( | ) | [virtual] |
Implements OpenDDS::DCPS::EntityImpl.
Definition at line 1049 of file RecorderImpl.cpp.
References OpenDDS::DCPS::DomainParticipantImpl::id_to_handle(), participant_servant_, and OpenDDS::DCPS::WriterInfoListener::subscription_id_.
01050 { 01051 return this->participant_servant_->id_to_handle(subscription_id_); 01052 }
RecorderListener_rch OpenDDS::DCPS::RecorderImpl::get_listener | ( | ) | [virtual] |
Get the listener for this Recorder.
Implements OpenDDS::DCPS::Recorder.
Definition at line 932 of file RecorderImpl.cpp.
References listener_.
00933 { 00934 return listener_; 00935 }
CORBA::Long OpenDDS::DCPS::RecorderImpl::get_priority_value | ( | const AssociationData & | data | ) | const [virtual] |
Implements OpenDDS::DCPS::TransportClient.
Definition at line 212 of file RecorderImpl.cpp.
References OpenDDS::DCPS::AssociationData::publication_transport_priority_.
DDS::ReturnCode_t OpenDDS::DCPS::RecorderImpl::get_qos | ( | DDS::SubscriberQos & | subscriber_qos, | |
DDS::DataReaderQos & | datareader_qos | |||
) | [virtual] |
Get the Quality of Service settings for the Recorder.
Implements OpenDDS::DCPS::Recorder.
Definition at line 912 of file RecorderImpl.cpp.
References DDS::RETCODE_OK, and subqos_.
00915 { 00916 qos = qos_; 00917 subscriber_qos = subqos_; 00918 return DDS::RETCODE_OK; 00919 }
const RepoId & OpenDDS::DCPS::RecorderImpl::get_repo_id | ( | ) | const [virtual] |
Implements OpenDDS::DCPS::TransportClient.
Definition at line 207 of file RecorderImpl.cpp.
References OpenDDS::DCPS::WriterInfoListener::subscription_id_.
00208 { 00209 return this->subscription_id_; 00210 }
void OpenDDS::DCPS::RecorderImpl::inconsistent_topic | ( | ) | [virtual] |
Implements OpenDDS::DCPS::DataReaderCallbacks.
Definition at line 817 of file RecorderImpl.cpp.
References OpenDDS::DCPS::TopicImpl::inconsistent_topic(), and topic_servant_.
00818 { 00819 topic_servant_->inconsistent_topic(); 00820 }
void OpenDDS::DCPS::RecorderImpl::init | ( | TopicDescriptionImpl * | a_topic_desc, | |
const DDS::DataReaderQos & | qos, | |||
RecorderListener_rch | a_listener, | |||
const DDS::StatusMask & | mask, | |||
DomainParticipantImpl * | participant, | |||
DDS::SubscriberQos | subqos | |||
) |
Definition at line 143 of file RecorderImpl.cpp.
References OpenDDS::DCPS::TopicImpl::add_entity_ref(), OpenDDS::DCPS::BUILT_IN_PARTICIPANT_TOPIC, OpenDDS::DCPS::BUILT_IN_PUBLICATION_TOPIC, OpenDDS::DCPS::BUILT_IN_SUBSCRIPTION_TOPIC, OpenDDS::DCPS::BUILT_IN_TOPIC_TOPIC, OpenDDS::DCPS::DCPS_debug_level, domain_id_, DDS::EXCLUSIVE_OWNERSHIP_QOS, OpenDDS::DCPS::DomainParticipantImpl::get_domain_id(), is_bit_, is_exclusive_ownership_, listener_, listener_mask_, owner_manager_, OpenDDS::DCPS::DomainParticipantImpl::ownership_manager(), participant(), participant_servant_, qos_, subqos_, topic_desc_, and topic_servant_.
Referenced by OpenDDS::DCPS::DomainParticipantImpl::create_recorder().
00150 { 00151 // 00152 if (DCPS_debug_level >= 1) { 00153 00154 ACE_DEBUG((LM_DEBUG, 00155 ACE_TEXT("(%P|%t) RecorderImpl::init \n"))); 00156 } 00157 00158 00159 topic_desc_ = DDS::TopicDescription::_duplicate(a_topic_desc); 00160 if (TopicImpl* a_topic = dynamic_cast<TopicImpl*>(a_topic_desc)) { 00161 topic_servant_ = a_topic; 00162 topic_servant_->_add_ref(); 00163 00164 topic_servant_->add_entity_ref(); 00165 } 00166 00167 CORBA::String_var topic_name = a_topic_desc->get_name(); 00168 00169 #if !defined (DDS_HAS_MINIMUM_BIT) 00170 is_bit_ = ACE_OS::strcmp(topic_name.in(), BUILT_IN_PARTICIPANT_TOPIC) == 0 00171 || ACE_OS::strcmp(topic_name.in(), BUILT_IN_TOPIC_TOPIC) == 0 00172 || ACE_OS::strcmp(topic_name.in(), BUILT_IN_SUBSCRIPTION_TOPIC) == 0 00173 || ACE_OS::strcmp(topic_name.in(), BUILT_IN_PUBLICATION_TOPIC) == 0; 00174 #endif // !defined (DDS_HAS_MINIMUM_BIT) 00175 00176 qos_ = qos; 00177 00178 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE 00179 is_exclusive_ownership_ = this->qos_.ownership.kind == ::DDS::EXCLUSIVE_OWNERSHIP_QOS; 00180 #endif 00181 00182 listener_ = a_listener; 00183 listener_mask_ = mask; 00184 00185 // Only store the participant pointer, since it is our "grand" 00186 // parent, we will exist as long as it does 00187 participant_servant_ = participant; 00188 00189 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE 00190 if (is_exclusive_ownership_) { 00191 owner_manager_ = participant_servant_->ownership_manager (); 00192 } 00193 #endif 00194 00195 domain_id_ = participant_servant_->get_domain_id(); 00196 subqos_ = subqos; 00197 }
void OpenDDS::DCPS::RecorderImpl::listener_add_ref | ( | ) | [inline, private, virtual] |
void OpenDDS::DCPS::RecorderImpl::listener_remove_ref | ( | ) | [inline, private, virtual] |
bool OpenDDS::DCPS::RecorderImpl::lookup_instance_handles | ( | const WriterIdSeq & | ids, | |
DDS::InstanceHandleSeq & | hdls | |||
) | [private] |
Lookup the instance handles by the publication repo ids.
Definition at line 938 of file RecorderImpl.cpp.
References OpenDDS::DCPS::DCPS_debug_level, and OPENDDS_STRING.
00940 { 00941 if (DCPS_debug_level > 9) { 00942 CORBA::ULong const size = ids.length(); 00943 OPENDDS_STRING separator = ""; 00944 OPENDDS_STRING buffer; 00945 00946 for (unsigned long i = 0; i < size; ++i) { 00947 buffer += separator + OPENDDS_STRING(GuidConverter(ids[i])); 00948 separator = ", "; 00949 } 00950 00951 ACE_DEBUG((LM_DEBUG, 00952 ACE_TEXT("(%P|%t) RecorderImpl::lookup_instance_handles: ") 00953 ACE_TEXT("searching for handles for writer Ids: %C.\n"), 00954 buffer.c_str())); 00955 } 00956 00957 CORBA::ULong const num_wrts = ids.length(); 00958 hdls.length(num_wrts); 00959 00960 for (CORBA::ULong i = 0; i < num_wrts; ++i) { 00961 hdls[i] = this->participant_servant_->id_to_handle(ids[i]); 00962 } 00963 00964 return true; 00965 }
void OpenDDS::DCPS::RecorderImpl::notify_connection_deleted | ( | const RepoId & | ) | [virtual] |
Implements OpenDDS::DCPS::TransportReceiveListener.
Definition at line 271 of file RecorderImpl.cpp.
void OpenDDS::DCPS::RecorderImpl::notify_subscription_disconnected | ( | const WriterIdSeq & | pubids | ) | [virtual] |
Implements OpenDDS::DCPS::TransportReceiveListener.
Definition at line 252 of file RecorderImpl.cpp.
void OpenDDS::DCPS::RecorderImpl::notify_subscription_lost | ( | const DDS::InstanceHandleSeq & | handles | ) | [private] |
void OpenDDS::DCPS::RecorderImpl::notify_subscription_lost | ( | const WriterIdSeq & | pubids | ) | [virtual] |
Implements OpenDDS::DCPS::TransportReceiveListener.
Definition at line 266 of file RecorderImpl.cpp.
Referenced by remove_associations_i().
void OpenDDS::DCPS::RecorderImpl::notify_subscription_reconnected | ( | const WriterIdSeq & | pubids | ) | [virtual] |
Implements OpenDDS::DCPS::TransportReceiveListener.
Definition at line 256 of file RecorderImpl.cpp.
typedef OpenDDS::DCPS::RecorderImpl::OPENDDS_MAP_CMP | ( | PublicationId | , | |
RcHandle< WriterInfo > | , | |||
GUID_tKeyLessThan | ||||
) | [private] |
publications writing to this reader.
typedef OpenDDS::DCPS::RecorderImpl::OPENDDS_MAP_CMP | ( | RepoId | , | |
DDS::InstanceHandle_t | , | |||
GUID_tKeyLessThan | ||||
) | [private] |
DomainParticipantImpl* OpenDDS::DCPS::RecorderImpl::participant | ( | ) | [inline] |
Definition at line 125 of file RecorderImpl.h.
Referenced by OpenDDS::DCPS::Service_Participant::delete_recorder(), init(), register_for_writer(), and unregister_for_writer().
00125 { 00126 return participant_servant_; 00127 }
void OpenDDS::DCPS::RecorderImpl::register_for_writer | ( | const RepoId & | , | |
const RepoId & | , | |||
const RepoId & | , | |||
const TransportLocatorSeq & | , | |||
DiscoveryListener * | ||||
) | [virtual] |
Reimplemented from OpenDDS::DCPS::DataReaderCallbacks.
Definition at line 1055 of file RecorderImpl.cpp.
References participant(), and OpenDDS::DCPS::TransportClient::register_for_writer().
01060 { 01061 TransportClient::register_for_writer(participant, readerid, writerid, locators, listener); 01062 }
void OpenDDS::DCPS::RecorderImpl::remove_all_associations | ( | ) |
Definition at line 739 of file RecorderImpl.cpp.
References DBG_ENTRY_LVL, remove_associations(), and writers_.
Referenced by cleanup().
00740 { 00741 DBG_ENTRY_LVL("RecorderImpl","remove_all_associations",6); 00742 00743 OpenDDS::DCPS::WriterIdSeq writers; 00744 int size; 00745 00746 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->publication_handle_lock_); 00747 00748 { 00749 ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_); 00750 00751 size = static_cast<int>(writers_.size()); 00752 writers.length(size); 00753 00754 WriterMapType::iterator curr_writer = writers_.begin(); 00755 WriterMapType::iterator end_writer = writers_.end(); 00756 00757 int i = 0; 00758 00759 while (curr_writer != end_writer) { 00760 writers[i++] = curr_writer->first; 00761 ++curr_writer; 00762 } 00763 } 00764 00765 try { 00766 CORBA::Boolean dont_notify_lost = 0; 00767 00768 if (0 < size) { 00769 remove_associations(writers, dont_notify_lost); 00770 } 00771 00772 } catch (const CORBA::Exception&) { 00773 } 00774 }
virtual void OpenDDS::DCPS::RecorderImpl::remove_associations | ( | const WriterIdSeq & | writers, | |
CORBA::Boolean | callback | |||
) | [virtual] |
Implements OpenDDS::DCPS::DataReaderCallbacks.
Referenced by remove_all_associations(), and remove_or_reschedule().
void OpenDDS::DCPS::RecorderImpl::remove_associations_i | ( | const WriterIdSeq & | writers, | |
bool | callback | |||
) | [protected, virtual] |
Definition at line 601 of file RecorderImpl.cpp.
References DDS::SubscriptionMatchedStatus::current_count, DDS::SubscriptionMatchedStatus::current_count_change, DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, id_to_handle_map_, OpenDDS::DCPS::RcHandle< T >::in(), is_bit_, DDS::SubscriptionMatchedStatus::last_publication_handle, listener_, notify_subscription_lost(), OPENDDS_STRING, OpenDDS::DCPS::push_back(), remove_association_sweeper_, OpenDDS::DCPS::WriterInfoListener::subscription_id_, subscription_match_status_, DDS::SubscriptionMatchedStatus::total_count_change, and writers_.
Referenced by remove_or_reschedule().
00603 { 00604 DBG_ENTRY_LVL("RecorderImpl", "remove_associations_i", 6); 00605 00606 if (writers.length() == 0) { 00607 return; 00608 } 00609 00610 if (DCPS_debug_level >= 1) { 00611 GuidConverter reader_converter(subscription_id_); 00612 GuidConverter writer_converter(writers[0]); 00613 ACE_DEBUG((LM_DEBUG, 00614 ACE_TEXT("(%P|%t) RecorderImpl::remove_associations_i: ") 00615 ACE_TEXT("bit %d local %C remote %C num remotes %d \n"), 00616 is_bit_, 00617 OPENDDS_STRING(reader_converter).c_str(), 00618 OPENDDS_STRING(writer_converter).c_str(), 00619 writers.length())); 00620 } 00621 DDS::InstanceHandleSeq handles; 00622 00623 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->publication_handle_lock_); 00624 00625 // This is used to hold the list of writers which were actually 00626 // removed, which is a proper subset of the writers which were 00627 // requested to be removed. 00628 WriterIdSeq updated_writers; 00629 00630 CORBA::ULong wr_len; 00631 00632 //Remove the writers from writer list. If the supplied writer 00633 //is not in the cached writers list then it is already removed. 00634 //We just need remove the writers in the list that have not been 00635 //removed. 00636 { 00637 ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_); 00638 00639 wr_len = writers.length(); 00640 00641 for (CORBA::ULong i = 0; i < wr_len; i++) { 00642 PublicationId writer_id = writers[i]; 00643 00644 WriterMapType::iterator it = this->writers_.find(writer_id); 00645 00646 if (it != this->writers_.end()) { 00647 it->second->removed(); 00648 remove_association_sweeper_->cancel_timer(it->second); 00649 } 00650 00651 if (this->writers_.erase(writer_id) == 0) { 00652 if (DCPS_debug_level >= 1) { 00653 GuidConverter converter(writer_id); 00654 ACE_DEBUG((LM_DEBUG, 00655 ACE_TEXT("(%P|%t) RecorderImpl::remove_associations_i: ") 00656 ACE_TEXT("the writer local %C was already removed.\n"), 00657 OPENDDS_STRING(converter).c_str())); 00658 } 00659 00660 } else { 00661 push_back(updated_writers, writer_id); 00662 } 00663 } 00664 } 00665 00666 wr_len = updated_writers.length(); 00667 00668 // Return now if the supplied writers have been removed already. 00669 if (wr_len == 0) { 00670 return; 00671 } 00672 00673 if (!is_bit_) { 00674 // The writer should be in the id_to_handle map at this time. Note 00675 // it if it not there. 00676 if (this->lookup_instance_handles(updated_writers, handles) == false) { 00677 if (DCPS_debug_level > 4) { 00678 ACE_DEBUG((LM_DEBUG, 00679 ACE_TEXT("(%P|%t) RecorderImpl::remove_associations_i: ") 00680 ACE_TEXT("lookup_instance_handles failed.\n"))); 00681 } 00682 } 00683 for (CORBA::ULong i = 0; i < wr_len; ++i) { 00684 id_to_handle_map_.erase(updated_writers[i]); 00685 } 00686 } 00687 for (CORBA::ULong i = 0; i < updated_writers.length(); ++i) { 00688 this->disassociate(updated_writers[i]); 00689 } 00690 00691 // Mirror the add_associations SUBSCRIPTION_MATCHED_STATUS processing. 00692 if (!this->is_bit_) { 00693 // Derive the change in the number of publications writing to this reader. 00694 int matchedPublications = static_cast<int>(this->id_to_handle_map_.size()); 00695 this->subscription_match_status_.current_count_change 00696 = matchedPublications - this->subscription_match_status_.current_count; 00697 00698 // Only process status if the number of publications has changed. 00699 if (this->subscription_match_status_.current_count_change != 0) { 00700 this->subscription_match_status_.current_count = matchedPublications; 00701 /// Section 7.1.4.1: total_count will not decrement. 00702 00703 /// @TODO: Reconcile this with the verbiage in section 7.1.4.1 00704 this->subscription_match_status_.last_publication_handle 00705 = handles[ wr_len - 1]; 00706 00707 // set_status_changed_flag(DDS::SUBSCRIPTION_MATCHED_STATUS, true); 00708 00709 // DDS::DataReaderListener_var listener 00710 // = listener_for(DDS::SUBSCRIPTION_MATCHED_STATUS); 00711 00712 if (listener_.in()) { 00713 listener_->on_recorder_matched( 00714 this, 00715 this->subscription_match_status_); 00716 00717 // Client will look at it so next time it looks the change should be 0 00718 this->subscription_match_status_.total_count_change = 0; 00719 this->subscription_match_status_.current_count_change = 0; 00720 } 00721 00722 // notify_status_condition(); 00723 } 00724 } 00725 00726 // If this remove_association is invoked when the InfoRepo 00727 // detects a lost writer then make a callback to notify 00728 // subscription lost. 00729 if (notify_lost) { 00730 this->notify_subscription_lost(handles); 00731 } 00732 00733 // if (this->monitor_) { 00734 // this->monitor_->report(); 00735 // } 00736 }
void OpenDDS::DCPS::RecorderImpl::remove_or_reschedule | ( | const PublicationId & | pub_id | ) | [protected] |
Definition at line 581 of file RecorderImpl.cpp.
References OpenDDS::DCPS::push_back(), remove_associations(), remove_associations_i(), and writers_.
00582 { 00583 ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_); 00584 WriterMapType::iterator where = writers_.find(pub_id); 00585 if (writers_.end() != where) { 00586 WriterInfo& info = *where->second; 00587 WriterIdSeq writers; 00588 push_back(writers, pub_id); 00589 bool notify = info.notify_lost_; 00590 if (info.removal_deadline_ < ACE_OS::gettimeofday()) { 00591 write_guard.release(); 00592 remove_associations_i(writers, notify); 00593 } else { 00594 write_guard.release(); 00595 remove_associations(writers, notify); 00596 } 00597 } 00598 }
DDS::ReturnCode_t OpenDDS::DCPS::RecorderImpl::repoid_to_bit_key | ( | const DCPS::RepoId & | id, | |
DDS::BuiltinTopicKey_t & | key | |||
) | [virtual] |
Find the bit key for a given repo id.
Implements OpenDDS::DCPS::Recorder.
Definition at line 1074 of file RecorderImpl.cpp.
References OpenDDS::DCPS::BUILT_IN_PUBLICATION_TOPIC, OpenDDS::DCPS::DomainParticipantImpl::id_to_handle(), participant_servant_, DDS::RETCODE_ERROR, and DDS::RETCODE_OK.
01076 { 01077 DDS::InstanceHandle_t publication_handle = this->participant_servant_->id_to_handle(id); 01078 01079 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 01080 guard, 01081 this->publication_handle_lock_, 01082 DDS::RETCODE_ERROR); 01083 01084 BIT_Helper_1 < DDS::PublicationBuiltinTopicDataDataReader, 01085 DDS::PublicationBuiltinTopicDataDataReader_var, 01086 DDS::PublicationBuiltinTopicDataSeq > hh; 01087 01088 DDS::PublicationBuiltinTopicDataSeq data; 01089 01090 DDS::ReturnCode_t ret 01091 = hh.instance_handle_to_bit_data(participant_servant_, 01092 BUILT_IN_PUBLICATION_TOPIC, 01093 publication_handle, 01094 data); 01095 01096 if (ret == DDS::RETCODE_OK) { 01097 key = data[0].key; 01098 } 01099 01100 return ret; 01101 }
DDS::ReturnCode_t OpenDDS::DCPS::RecorderImpl::set_listener | ( | const RecorderListener_rch & | a_listener, | |
DDS::StatusMask | mask | |||
) | [virtual] |
Change the listener for this Recorder.
Implements OpenDDS::DCPS::Recorder.
Definition at line 922 of file RecorderImpl.cpp.
References listener_, listener_mask_, and DDS::RETCODE_OK.
00924 { 00925 listener_mask_ = mask; 00926 //note: OK to duplicate a nil object ref 00927 listener_ = a_listener; 00928 return DDS::RETCODE_OK; 00929 }
DDS::ReturnCode_t OpenDDS::DCPS::RecorderImpl::set_qos | ( | const ::DDS::SubscriberQos & | subscriber_qos, | |
const DDS::DataReaderQos & | datareader_qos | |||
) | [virtual] |
Set the Quality of Service settings for the Recorder.
Implements OpenDDS::DCPS::Recorder.
void OpenDDS::DCPS::RecorderImpl::signal_liveliness | ( | const RepoId & | remote_participant | ) | [virtual] |
Implements OpenDDS::DCPS::DataReaderCallbacks.
Definition at line 823 of file RecorderImpl.cpp.
References OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::GUID_t::guidPrefix, OpenDDS::DCPS::TransportClient::OPENDDS_VECTOR(), and writers_.
00824 { 00825 RepoId prefix = remote_participant; 00826 prefix.entityId = EntityId_t(); 00827 00828 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_); 00829 00830 typedef std::pair<RepoId, RcHandle<WriterInfo> > WriterSetElement; 00831 typedef OPENDDS_VECTOR(WriterSetElement) WriterSet; 00832 WriterSet writers; 00833 00834 { 00835 ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_); 00836 for (WriterMapType::iterator pos = writers_.lower_bound(prefix), 00837 limit = writers_.end(); 00838 pos != limit && GuidPrefixEqual() (pos->first.guidPrefix, prefix.guidPrefix); 00839 ++pos) { 00840 writers.push_back(std::make_pair(pos->first, pos->second)); 00841 } 00842 } 00843 00844 ACE_Time_Value when = ACE_OS::gettimeofday(); 00845 for (WriterSet::iterator pos = writers.begin(), limit = writers.end(); 00846 pos != limit; 00847 ++pos) { 00848 pos->second->received_activity(when); 00849 } 00850 }
void OpenDDS::DCPS::RecorderImpl::unregister_for_writer | ( | const RepoId & | , | |
const RepoId & | , | |||
const RepoId & | ||||
) | [virtual] |
Reimplemented from OpenDDS::DCPS::DataReaderCallbacks.
Definition at line 1065 of file RecorderImpl.cpp.
References participant(), and OpenDDS::DCPS::TransportClient::unregister_for_writer().
01068 { 01069 TransportClient::unregister_for_writer(participant, readerid, writerid); 01070 }
void OpenDDS::DCPS::RecorderImpl::update_incompatible_qos | ( | const IncompatibleQosStatus & | status | ) | [virtual] |
Implements OpenDDS::DCPS::DataReaderCallbacks.
Definition at line 777 of file RecorderImpl.cpp.
References OpenDDS::DCPS::IncompatibleQosStatus::count_since_last_send, OpenDDS::DCPS::IncompatibleQosStatus::last_policy_id, DDS::RequestedIncompatibleQosStatus::last_policy_id, OpenDDS::DCPS::IncompatibleQosStatus::policies, DDS::RequestedIncompatibleQosStatus::policies, requested_incompatible_qos_status_, DDS::RequestedIncompatibleQosStatus::total_count, OpenDDS::DCPS::IncompatibleQosStatus::total_count, and DDS::RequestedIncompatibleQosStatus::total_count_change.
00778 { 00779 00780 00781 ACE_GUARD(ACE_Recursive_Thread_Mutex, 00782 guard, 00783 this->publication_handle_lock_); 00784 00785 if (this->requested_incompatible_qos_status_.total_count == status.total_count) { 00786 // This test should make the method idempotent. 00787 return; 00788 } 00789 00790 // set_status_changed_flag(DDS::REQUESTED_INCOMPATIBLE_QOS_STATUS, 00791 // true); 00792 00793 // copy status and increment change 00794 requested_incompatible_qos_status_.total_count = status.total_count; 00795 requested_incompatible_qos_status_.total_count_change += 00796 status.count_since_last_send; 00797 requested_incompatible_qos_status_.last_policy_id = 00798 status.last_policy_id; 00799 requested_incompatible_qos_status_.policies = status.policies; 00800 00801 // if (!CORBA::is_nil(listener.in())) { 00802 // listener->on_requested_incompatible_qos(dr_local_objref_.in(), 00803 // requested_incompatible_qos_status_); 00804 // 00805 // // TBD - why does the spec say to change total_count_change but not 00806 // // change the ChangeFlagStatus after a listener call? 00807 // 00808 // // client just looked at it so next time it looks the 00809 // // change should be 0 00810 // requested_incompatible_qos_status_.total_count_change = 0; 00811 // } 00812 // 00813 // notify_status_condition(); 00814 }
friend class ::DDS_TEST [friend] |
friend class RemoveAssociationSweeper< RecorderImpl > [friend] |
Definition at line 174 of file RecorderImpl.h.
RepoIdToHandleMap OpenDDS::DCPS::RecorderImpl::id_to_handle_map_ [private] |
Definition at line 187 of file RecorderImpl.h.
Referenced by add_association(), and remove_associations_i().
bool OpenDDS::DCPS::RecorderImpl::is_bit_ [private] |
Flag indicates that this datareader is a builtin topic datareader.
Definition at line 194 of file RecorderImpl.h.
Referenced by add_association(), init(), and remove_associations_i().
bool OpenDDS::DCPS::RecorderImpl::is_exclusive_ownership_ [private] |
Definition at line 180 of file RecorderImpl.h.
Referenced by add_association(), data_received(), get_listener(), init(), remove_associations_i(), and set_listener().
Definition at line 163 of file RecorderImpl.h.
Referenced by add_association(), cleanup(), get_instance_handle(), init(), and repoid_to_bit_key().
ACE_Recursive_Thread_Mutex OpenDDS::DCPS::RecorderImpl::publication_handle_lock_ [private] |
Definition at line 184 of file RecorderImpl.h.
RemoveAssociationSweeper<RecorderImpl>* OpenDDS::DCPS::RecorderImpl::remove_association_sweeper_ [private] |
Definition at line 182 of file RecorderImpl.h.
Referenced by cleanup(), remove_associations_i(), and ~RecorderImpl().
DDS::RequestedIncompatibleQosStatus OpenDDS::DCPS::RecorderImpl::requested_incompatible_qos_status_ [private] |
Definition at line 189 of file RecorderImpl.h.
Referenced by RecorderImpl(), and update_incompatible_qos().
ACE_Recursive_Thread_Mutex OpenDDS::DCPS::RecorderImpl::sample_lock_ [private] |
lock protecting sample container as well as statuses.
Definition at line 161 of file RecorderImpl.h.
Definition at line 190 of file RecorderImpl.h.
Referenced by add_association(), RecorderImpl(), and remove_associations_i().
DDS::TopicDescription_var OpenDDS::DCPS::RecorderImpl::topic_desc_ [private] |
Definition at line 164 of file RecorderImpl.h.
Referenced by cleanup(), enable(), inconsistent_topic(), and init().
WriterMapType OpenDDS::DCPS::RecorderImpl::writers_ [private] |
Definition at line 199 of file RecorderImpl.h.
Referenced by add_association(), cleanup(), remove_all_associations(), remove_associations_i(), remove_or_reschedule(), signal_liveliness(), and ~RecorderImpl().
ACE_RW_Thread_Mutex OpenDDS::DCPS::RecorderImpl::writers_lock_ [private] |