Implementation of Recorder functionality. More...
#include <RecorderImpl.h>
Implementation of Recorder functionality.
This class is the implementation of the Recorder. Inheritance is used to limit the applications access to underlying system methods.
Definition at line 40 of file RecorderImpl.h.
OpenDDS::DCPS::RecorderImpl::RecorderImpl | ( | ) |
Definition at line 49 of file RecorderImpl.cpp.
References DDS::SubscriptionMatchedStatus::current_count, DDS::SubscriptionMatchedStatus::current_count_change, DDS::HANDLE_NIL, DDS::RequestedIncompatibleQosStatus::last_policy_id, DDS::SubscriptionMatchedStatus::last_publication_handle, DDS::RequestedIncompatibleQosStatus::policies, requested_incompatible_qos_status_, subscription_match_status_, DDS::SubscriptionMatchedStatus::total_count, DDS::RequestedIncompatibleQosStatus::total_count, DDS::SubscriptionMatchedStatus::total_count_change, and DDS::RequestedIncompatibleQosStatus::total_count_change.
00050 : qos_(TheServiceParticipant->initial_DataReaderQos()), 00051 participant_servant_(0), 00052 topic_servant_(0), 00053 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE 00054 is_exclusive_ownership_ (false), 00055 #endif 00056 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE 00057 owner_manager_ (0), 00058 #endif 00059 subqos_ (TheServiceParticipant->initial_SubscriberQos()), 00060 topic_desc_(0), 00061 listener_mask_(DEFAULT_STATUS_MASK), 00062 domain_id_(0), 00063 remove_association_sweeper_( 00064 make_rch<RemoveAssociationSweeper<RecorderImpl> >(TheServiceParticipant->reactor(), 00065 TheServiceParticipant->reactor_owner(), 00066 this)), 00067 is_bit_(false) 00068 { 00069 00070 requested_incompatible_qos_status_.total_count = 0; 00071 requested_incompatible_qos_status_.total_count_change = 0; 00072 requested_incompatible_qos_status_.last_policy_id = 0; 00073 requested_incompatible_qos_status_.policies.length(0); 00074 00075 subscription_match_status_.total_count = 0; 00076 subscription_match_status_.total_count_change = 0; 00077 subscription_match_status_.current_count = 0; 00078 subscription_match_status_.current_count_change = 0; 00079 subscription_match_status_.last_publication_handle = 00080 DDS::HANDLE_NIL; 00081 00082 }
OpenDDS::DCPS::RecorderImpl::~RecorderImpl | ( | ) | [virtual] |
Definition at line 86 of file RecorderImpl.cpp.
References DBG_ENTRY_LVL, remove_association_sweeper_, writers_, and writers_lock_.
00087 { 00088 DBG_ENTRY_LVL("RecorderImpl","~RecorderImpl",6); 00089 { 00090 ACE_READ_GUARD(ACE_RW_Thread_Mutex, 00091 read_guard, 00092 this->writers_lock_); 00093 // Cancel any uncancelled sweeper timers to decrement reference count. 00094 WriterMapType::iterator writer; 00095 for (writer = writers_.begin(); writer != writers_.end(); ++writer) { 00096 remove_association_sweeper_->cancel_timer(writer->second); 00097 } 00098 } 00099 00100 remove_association_sweeper_->wait(); 00101 }
void OpenDDS::DCPS::RecorderImpl::add_association | ( | const RepoId & | yourId, | |
const WriterAssociation & | writer, | |||
bool | active | |||
) | [virtual] |
Implements OpenDDS::DCPS::DataReaderCallbacks.
Definition at line 264 of file RecorderImpl.cpp.
References ACE_TEXT(), OpenDDS::DCPS::TransportClient::associate(), DDS::SubscriptionMatchedStatus::current_count, DDS::SubscriptionMatchedStatus::current_count_change, OpenDDS::DCPS::DCPS_debug_level, domain_id_, DDS::DataWriterQos::durability, OpenDDS::DCPS::DomainParticipantImpl::get_id(), OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::DomainParticipantImpl::id_to_handle(), id_to_handle_map_, OpenDDS::DCPS::RcHandle< T >::in(), is_bit_, DDS::SubscriptionMatchedStatus::last_publication_handle, listener_, LM_DEBUG, LM_ERROR, OPENDDS_STRING, participant_servant_, publication_handle_lock_, OpenDDS::DCPS::AssociationData::publication_transport_priority_, DDS::DataWriterQos::reliability, DDS::RELIABLE_RELIABILITY_QOS, OpenDDS::DCPS::AssociationData::remote_data_, OpenDDS::DCPS::AssociationData::remote_durable_, OpenDDS::DCPS::AssociationData::remote_id_, OpenDDS::DCPS::AssociationData::remote_reliable_, sample_lock_, OpenDDS::DCPS::WriterInfoListener::subscription_id_, subscription_match_status_, TheServiceParticipant, DDS::SubscriptionMatchedStatus::total_count, DDS::SubscriptionMatchedStatus::total_count_change, DDS::DataWriterQos::transport_priority, DDS::VOLATILE_DURABILITY_QOS, OpenDDS::DCPS::WriterAssociation::writerId, OpenDDS::DCPS::WriterAssociation::writerQos, writers_, writers_lock_, and OpenDDS::DCPS::WriterAssociation::writerTransInfo.
00267 { 00268 ACE_DEBUG((LM_DEBUG, "RecorderImpl::add_association\n")); 00269 // 00270 // The following block is for diagnostic purposes only. 00271 // 00272 if (DCPS_debug_level >= 1) { 00273 GuidConverter reader_converter(yourId); 00274 GuidConverter writer_converter(writer.writerId); 00275 ACE_DEBUG((LM_DEBUG, 00276 ACE_TEXT("(%P|%t) RecorderImpl::add_association - ") 00277 ACE_TEXT("bit %d local %C remote %C\n"), 00278 is_bit_, 00279 OPENDDS_STRING(reader_converter).c_str(), 00280 OPENDDS_STRING(writer_converter).c_str())); 00281 } 00282 00283 // 00284 // This block prevents adding associations to deleted readers. 00285 // Presumably this is a "good thing(tm)". 00286 // 00287 // if (entity_deleted_ == true) { 00288 // if (DCPS_debug_level >= 1) 00289 // ACE_DEBUG((LM_DEBUG, 00290 // ACE_TEXT("(%P|%t) RecorderImpl::add_association") 00291 // ACE_TEXT(" This is a deleted datareader, ignoring add.\n"))); 00292 // 00293 // return; 00294 // } 00295 00296 // 00297 // We are being called back from the repository before we are done 00298 // processing after our call to the repository that caused this call 00299 // (from the repository) to be made. 00300 // 00301 if (GUID_UNKNOWN == subscription_id_) { 00302 // add_associations was invoked before DCSPInfoRepo::add_subscription() returned. 00303 subscription_id_ = yourId; 00304 } 00305 00306 // 00307 // We do the following while holding the publication_handle_lock_. 00308 // 00309 { 00310 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->publication_handle_lock_); 00311 00312 // 00313 // For each writer in the list of writers to associate with, we 00314 // create a WriterInfo and a WriterStats object and store them in 00315 // our internal maps. 00316 // 00317 { 00318 ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_); 00319 00320 const PublicationId& writer_id = writer.writerId; 00321 RcHandle<WriterInfo> info ( make_rch<WriterInfo>(static_cast<WriterInfoListener*>(this), writer_id, writer.writerQos)); 00322 /*std::pair<WriterMapType::iterator, bool> bpair =*/ 00323 this->writers_.insert( 00324 // This insertion is idempotent. 00325 WriterMapType::value_type( 00326 writer_id, 00327 info)); 00328 // this->statistics_.insert( 00329 // StatsMapType::value_type( 00330 // writer_id, 00331 // WriterStats( 00332 // this->raw_latency_buffer_size_, 00333 // this->raw_latency_buffer_type_))); 00334 00335 // if (DCPS_debug_level > 4) { 00336 // GuidConverter converter(writer_id); 00337 // ACE_DEBUG((LM_DEBUG, 00338 // "(%P|%t) RecorderImpl::add_association: " 00339 // "inserted writer %C.return %d \n", 00340 // OPENDDS_STRING(converter).c_str(), bpair.second)); 00341 // 00342 // WriterMapType::iterator iter = writers_.find(writer_id); 00343 // if (iter != writers_.end()) { 00344 // // This may not be an error since it could happen that the sample 00345 // // is delivered to the datareader after the write is dis-associated 00346 // // with this datareader. 00347 // GuidConverter reader_converter(subscription_id_); 00348 // GuidConverter writer_converter(writer_id); 00349 // ACE_DEBUG((LM_DEBUG, 00350 // ACE_TEXT("(%P|%t) RecorderImpl::add_association: ") 00351 // ACE_TEXT("reader %C is associated with writer %C.\n"), 00352 // OPENDDS_STRING(reader_converter).c_str(), 00353 // OPENDDS_STRING(writer_converter).c_str())); 00354 // } 00355 // } 00356 } 00357 00358 // 00359 // Propagate the add_associations processing down into the Transport 00360 // layer here. This will establish the transport support and reserve 00361 // usage of an existing connection or initiate creation of a new 00362 // connection if no suitable connection is available. 00363 // 00364 AssociationData data; 00365 data.remote_id_ = writer.writerId; 00366 data.remote_data_ = writer.writerTransInfo; 00367 data.publication_transport_priority_ = 00368 writer.writerQos.transport_priority.value; 00369 data.remote_reliable_ = 00370 (writer.writerQos.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS); 00371 data.remote_durable_ = 00372 (writer.writerQos.durability.kind > DDS::VOLATILE_DURABILITY_QOS); 00373 00374 if (!this->associate(data, active)) { 00375 if (DCPS_debug_level) { 00376 ACE_ERROR((LM_ERROR, 00377 ACE_TEXT("(%P|%t) RecorderImpl::add_association: ") 00378 ACE_TEXT("ERROR: transport layer failed to associate.\n"))); 00379 } 00380 return; 00381 } 00382 00383 // Check if any publications have already sent a REQUEST_ACK message. 00384 // { 00385 // ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_); 00386 // 00387 // WriterMapType::iterator where = this->writers_.find(writer.writerId); 00388 // 00389 // if (where != this->writers_.end()) { 00390 // const ACE_Time_Value now = ACE_OS::gettimeofday(); 00391 // 00392 // ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_); 00393 // 00394 // if (where->second->should_ack(now)) { 00395 // const SequenceNumber sequence = where->second->ack_sequence(); 00396 // const DDS::Time_t timenow = time_value_to_time(now); 00397 // if (this->send_sample_ack(writer.writerId, sequence, timenow)) { 00398 // where->second->clear_acks(sequence); 00399 // } 00400 // } 00401 // } 00402 // } 00403 00404 // 00405 // LIVELINESS policy timers are managed here. 00406 // 00407 // if (liveliness_lease_duration_ != ACE_Time_Value::zero) { 00408 // // this call will start the timer if it is not already set 00409 // const ACE_Time_Value now = ACE_OS::gettimeofday(); 00410 // 00411 // if (DCPS_debug_level >= 5) { 00412 // GuidConverter converter(subscription_id_); 00413 // ACE_DEBUG((LM_DEBUG, 00414 // ACE_TEXT("(%P|%t) RecorderImpl::add_association: ") 00415 // ACE_TEXT("starting/resetting liveliness timer for reader %C\n"), 00416 // OPENDDS_STRING(converter).c_str())); 00417 // } 00418 // 00419 // this->handle_timeout(now, this); 00420 // } 00421 00422 // else - no timer needed when LIVELINESS.lease_duration is INFINITE 00423 00424 } 00425 // 00426 // We no longer hold the publication_handle_lock_. 00427 // 00428 00429 // 00430 // We only do the following processing for readers that are *not* 00431 // readers of Builtin Topics. 00432 // 00433 if (!is_bit_) { 00434 00435 DDS::InstanceHandle_t handle = 00436 this->participant_servant_->id_to_handle(writer.writerId); 00437 00438 // 00439 // We acquire the publication_handle_lock_ for the remainder of our 00440 // processing. 00441 // 00442 { 00443 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->publication_handle_lock_); 00444 00445 // This insertion is idempotent. 00446 this->id_to_handle_map_.insert( 00447 RepoIdToHandleMap::value_type(writer.writerId, handle)); 00448 00449 if (DCPS_debug_level > 4) { 00450 GuidConverter converter(writer.writerId); 00451 ACE_DEBUG((LM_DEBUG, 00452 ACE_TEXT("(%P|%t) RecorderImpl::add_association: ") 00453 ACE_TEXT("id_to_handle_map_[ %C] = 0x%x.\n"), 00454 OPENDDS_STRING(converter).c_str(), 00455 handle)); 00456 } 00457 00458 // We need to adjust these after the insertions have all completed 00459 // since insertions are not guaranteed to increase the number of 00460 // currently matched publications. 00461 int matchedPublications = static_cast<int>(this->id_to_handle_map_.size()); 00462 this->subscription_match_status_.current_count_change 00463 = matchedPublications - this->subscription_match_status_.current_count; 00464 this->subscription_match_status_.current_count = matchedPublications; 00465 00466 ++this->subscription_match_status_.total_count; 00467 ++this->subscription_match_status_.total_count_change; 00468 00469 this->subscription_match_status_.last_publication_handle = handle; 00470 00471 // set_status_changed_flag(DDS::SUBSCRIPTION_MATCHED_STATUS, true); 00472 00473 00474 if (listener_.in()) { 00475 listener_->on_recorder_matched( 00476 this, 00477 this->subscription_match_status_); 00478 00479 // TBD - why does the spec say to change this but not change 00480 // the ChangeFlagStatus after a listener call? 00481 00482 // Client will look at it so next time it looks the change should be 0 00483 this->subscription_match_status_.total_count_change = 0; 00484 this->subscription_match_status_.current_count_change = 0; 00485 } 00486 00487 // notify_status_condition(); 00488 } 00489 00490 { 00491 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_); 00492 00493 this->writers_[writer.writerId]->handle_ = handle; 00494 } 00495 } 00496 00497 if (!active) { 00498 Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_); 00499 disco->association_complete(this->domain_id_, 00500 this->participant_servant_->get_id(), 00501 this->subscription_id_, writer.writerId); 00502 } 00503 00504 // if (this->monitor_) { 00505 // this->monitor_->report(); 00506 // } 00507 }
void OpenDDS::DCPS::RecorderImpl::association_complete | ( | const RepoId & | remote_id | ) | [virtual] |
Implements OpenDDS::DCPS::DataReaderCallbacks.
Definition at line 510 of file RecorderImpl.cpp.
bool OpenDDS::DCPS::RecorderImpl::check_transport_qos | ( | const TransportInst & | inst | ) | [virtual] |
Implements OpenDDS::DCPS::TransportClient.
Definition at line 190 of file RecorderImpl.cpp.
References OpenDDS::DCPS::TransportInst::is_reliable(), qos_, DDS::DataReaderQos::reliability, and DDS::RELIABLE_RELIABILITY_QOS.
00191 { 00192 if (this->qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS) { 00193 return ti.is_reliable(); 00194 } 00195 return true; 00196 }
DDS::ReturnCode_t OpenDDS::DCPS::RecorderImpl::cleanup | ( | void | ) |
cleanup the DataWriter.
Definition at line 105 of file RecorderImpl.cpp.
References ACE_TEXT(), domain_id_, OpenDDS::DCPS::DomainParticipantImpl::get_id(), LM_ERROR, participant_servant_, remove_all_associations(), remove_association_sweeper_, DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::WriterInfoListener::subscription_id_, TheServiceParticipant, writers_, and writers_lock_.
Referenced by OpenDDS::DCPS::Service_Participant::delete_recorder(), and OpenDDS::DCPS::DomainParticipantImpl::handle_exception().
00106 { 00107 00108 Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_); 00109 if (!disco->remove_subscription(this->domain_id_, 00110 participant_servant_->get_id(), 00111 this->subscription_id_)) { 00112 ACE_ERROR_RETURN((LM_ERROR, 00113 ACE_TEXT("(%P|%t) ERROR: ") 00114 ACE_TEXT("RecorderImpl::cleanup: ") 00115 ACE_TEXT(" could not remove subscription from discovery.\n")), 00116 DDS::RETCODE_ERROR); 00117 } 00118 00119 // Call remove association before unregistering the datareader from the transport, 00120 // otherwise some callbacks resulted from remove_association may lost. 00121 00122 this->remove_all_associations(); 00123 00124 { 00125 ACE_READ_GUARD_RETURN(ACE_RW_Thread_Mutex, 00126 read_guard, 00127 this->writers_lock_, 00128 0); 00129 // Cancel any uncancelled sweeper timers 00130 WriterMapType::iterator writer; 00131 for (writer = writers_.begin(); writer != writers_.end(); ++writer) { 00132 remove_association_sweeper_->cancel_timer(writer->second); 00133 } 00134 } 00135 00136 remove_association_sweeper_->wait(); 00137 return DDS::RETCODE_OK; 00138 }
void OpenDDS::DCPS::RecorderImpl::data_received | ( | const ReceivedDataSample & | sample | ) | [virtual] |
Implements OpenDDS::DCPS::TransportReceiveListener.
Definition at line 209 of file RecorderImpl.cpp.
References ACE_TEXT(), OpenDDS::DCPS::DataSampleHeader::byte_order_, DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), OpenDDS::DCPS::ReceivedDataSample::header_, OpenDDS::DCPS::RcHandle< T >::in(), listener_, LM_DEBUG, OpenDDS::DCPS::DataSampleHeader::message_id_, OPENDDS_STRING, OpenDDS::DCPS::DataSampleHeader::publication_id_, OpenDDS::DCPS::ReceivedDataSample::sample_, OpenDDS::DCPS::SAMPLE_DATA, sample_lock_, OpenDDS::DCPS::DataSampleHeader::source_timestamp_nanosec_, OpenDDS::DCPS::DataSampleHeader::source_timestamp_sec_, OpenDDS::DCPS::WriterInfoListener::subscription_id_, and OpenDDS::DCPS::to_string().
00210 { 00211 DBG_ENTRY_LVL("RecorderImpl","data_received",6); 00212 00213 // ensure some other thread is not changing the sample container 00214 // or statuses related to samples. 00215 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_); 00216 00217 if (DCPS_debug_level > 9) { 00218 GuidConverter converter(subscription_id_); 00219 ACE_DEBUG((LM_DEBUG, 00220 ACE_TEXT("(%P|%t) RecorderImpl::data_received: ") 00221 ACE_TEXT("%C received sample: %C.\n"), 00222 OPENDDS_STRING(converter).c_str(), 00223 to_string(sample.header_).c_str())); 00224 } 00225 00226 // we only support SAMPLE_DATA messages 00227 if (sample.header_.message_id_ != SAMPLE_DATA) 00228 return; 00229 00230 RawDataSample rawSample(static_cast<MessageId> (sample.header_.message_id_), 00231 sample.header_.source_timestamp_sec_, 00232 sample.header_.source_timestamp_nanosec_, 00233 sample.header_.publication_id_, 00234 sample.header_.byte_order_, 00235 sample.sample_.get()); 00236 00237 if (listener_.in()) { 00238 listener_->on_sample_data_received(this, rawSample); 00239 } 00240 00241 }
DDS::DomainId_t OpenDDS::DCPS::RecorderImpl::domain_id | ( | ) | const [inline, virtual] |
Implements OpenDDS::DCPS::TransportClient.
Definition at line 71 of file RecorderImpl.h.
00071 { return this->domain_id_; }
DDS::ReturnCode_t OpenDDS::DCPS::RecorderImpl::enable | ( | ) |
Implements DDS::Entity.
Definition at line 941 of file RecorderImpl.cpp.
References ACE_TEXT(), OpenDDS::DCPS::TransportClient::connection_info(), OpenDDS::DCPS::DCPS_debug_level, domain_id_, OpenDDS::DCPS::TransportClient::enable_transport(), OpenDDS::DCPS::DomainParticipantImpl::get_id(), OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::EntityImpl::is_enabled(), LM_DEBUG, LM_ERROR, participant_servant_, qos_, DDS::DataReaderQos::reliability, DDS::RELIABLE_RELIABILITY_QOS, DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::EntityImpl::set_enabled(), subqos_, OpenDDS::DCPS::WriterInfoListener::subscription_id_, TheServiceParticipant, topic_servant_, and DDS::VOLATILE_DURABILITY_QOS.
Referenced by OpenDDS::DCPS::DomainParticipantImpl::create_recorder().
00942 { 00943 if (DCPS_debug_level >= 1) { 00944 00945 ACE_DEBUG((LM_DEBUG, 00946 ACE_TEXT("(%P|%t) RecorderImpl::enable\n"))); 00947 } 00948 //According spec: 00949 // - Calling enable on an already enabled Entity returns OK and has no 00950 // effect. 00951 // - Calling enable on an Entity whose factory is not enabled will fail 00952 // and return PRECONDITION_NOT_MET. 00953 00954 if (this->is_enabled()) { 00955 return DDS::RETCODE_OK; 00956 } 00957 00958 this->set_enabled(); 00959 00960 // if (topic_servant_ && !transport_disabled_) { 00961 if (topic_servant_) { 00962 00963 ACE_DEBUG((LM_DEBUG, 00964 ACE_TEXT("(%P|%t) RecorderImpl::enable_transport\n"))); 00965 00966 try { 00967 this->enable_transport(this->qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS, 00968 this->qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS); 00969 } catch (const Transport::Exception&) { 00970 ACE_ERROR((LM_ERROR, 00971 ACE_TEXT("(%P|%t) ERROR: RecorderImpl::enable, ") 00972 ACE_TEXT("Transport Exception.\n"))); 00973 return DDS::RETCODE_ERROR; 00974 00975 } 00976 00977 const TransportLocatorSeq& trans_conf_info = this->connection_info(); 00978 00979 CORBA::String_var filterClassName = ""; 00980 CORBA::String_var filterExpression = ""; 00981 DDS::StringSeq exprParams; 00982 00983 00984 Discovery_rch disco = 00985 TheServiceParticipant->get_discovery(this->domain_id_); 00986 00987 ACE_DEBUG((LM_DEBUG, 00988 ACE_TEXT("(%P|%t) RecorderImpl::add_subscription\n"))); 00989 00990 this->subscription_id_ = 00991 disco->add_subscription(this->domain_id_, 00992 this->participant_servant_->get_id(), 00993 this->topic_servant_->get_id(), 00994 this, 00995 this->qos_, 00996 trans_conf_info, 00997 this->subqos_, 00998 filterClassName, 00999 filterExpression, 01000 exprParams); 01001 01002 if (this->subscription_id_ == OpenDDS::DCPS::GUID_UNKNOWN) { 01003 ACE_ERROR((LM_ERROR, 01004 ACE_TEXT("(%P|%t) ERROR: RecorderImpl::enable, ") 01005 ACE_TEXT("add_subscription returned invalid id.\n"))); 01006 return DDS::RETCODE_ERROR; 01007 } 01008 } 01009 01010 if (topic_servant_) { 01011 const CORBA::String_var name = topic_servant_->get_name(); 01012 DDS::ReturnCode_t return_value = DDS::RETCODE_OK; 01013 // this->participant_servant_->recorder_enabled(name.in(), this); 01014 01015 return return_value; 01016 } else { 01017 return DDS::RETCODE_OK; 01018 } 01019 }
DDS::InstanceHandle_t OpenDDS::DCPS::RecorderImpl::get_instance_handle | ( | ) | [virtual] |
Implements OpenDDS::DCPS::EntityImpl.
Definition at line 1022 of file RecorderImpl.cpp.
References OpenDDS::DCPS::DomainParticipantImpl::id_to_handle(), participant_servant_, and OpenDDS::DCPS::WriterInfoListener::subscription_id_.
01023 { 01024 return this->participant_servant_->id_to_handle(subscription_id_); 01025 }
RecorderListener_rch OpenDDS::DCPS::RecorderImpl::get_listener | ( | ) | [virtual] |
Get the listener for this Recorder.
Implements OpenDDS::DCPS::Recorder.
Definition at line 907 of file RecorderImpl.cpp.
References listener_.
00908 { 00909 return listener_; 00910 }
CORBA::Long OpenDDS::DCPS::RecorderImpl::get_priority_value | ( | const AssociationData & | data | ) | const [virtual] |
Implements OpenDDS::DCPS::TransportClient.
Definition at line 203 of file RecorderImpl.cpp.
References OpenDDS::DCPS::AssociationData::publication_transport_priority_.
DDS::ReturnCode_t OpenDDS::DCPS::RecorderImpl::get_qos | ( | DDS::SubscriberQos & | subscriber_qos, | |
DDS::DataReaderQos & | datareader_qos | |||
) | [virtual] |
Get the Quality of Service settings for the Recorder.
Implements OpenDDS::DCPS::Recorder.
Definition at line 887 of file RecorderImpl.cpp.
References qos_, DDS::RETCODE_OK, and subqos_.
00890 { 00891 qos = qos_; 00892 subscriber_qos = subqos_; 00893 return DDS::RETCODE_OK; 00894 }
const RepoId & OpenDDS::DCPS::RecorderImpl::get_repo_id | ( | ) | const [virtual] |
Implements OpenDDS::DCPS::TransportClient.
Definition at line 198 of file RecorderImpl.cpp.
References OpenDDS::DCPS::WriterInfoListener::subscription_id_.
00199 { 00200 return this->subscription_id_; 00201 }
void OpenDDS::DCPS::RecorderImpl::inconsistent_topic | ( | ) | [virtual] |
Implements OpenDDS::DCPS::DataReaderCallbacks.
Definition at line 792 of file RecorderImpl.cpp.
References topic_servant_.
00793 { 00794 topic_servant_->inconsistent_topic(); 00795 }
void OpenDDS::DCPS::RecorderImpl::init | ( | TopicDescriptionImpl * | a_topic_desc, | |
const DDS::DataReaderQos & | qos, | |||
RecorderListener_rch | a_listener, | |||
const DDS::StatusMask & | mask, | |||
DomainParticipantImpl * | participant, | |||
DDS::SubscriberQos | subqos | |||
) |
Definition at line 140 of file RecorderImpl.cpp.
References OpenDDS::DCPS::Recorder::_duplicate(), ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, domain_id_, DDS::EXCLUSIVE_OWNERSHIP_QOS, OpenDDS::DCPS::DomainParticipantImpl::get_domain_id(), OpenDDS::DCPS::TopicDescriptionImpl::get_name(), OpenDDS::DCPS::TopicDescriptionImpl::get_type_name(), is_bit_, is_exclusive_ownership_, listener_, listener_mask_, LM_DEBUG, owner_manager_, OpenDDS::DCPS::DomainParticipantImpl::ownership_manager(), participant_servant_, qos_, subqos_, topic_desc_, topic_servant_, and OpenDDS::DCPS::topicIsBIT().
Referenced by OpenDDS::DCPS::DomainParticipantImpl::create_recorder().
00147 { 00148 // 00149 if (DCPS_debug_level >= 1) { 00150 00151 ACE_DEBUG((LM_DEBUG, 00152 ACE_TEXT("(%P|%t) RecorderImpl::init \n"))); 00153 } 00154 00155 00156 topic_desc_ = DDS::TopicDescription::_duplicate(a_topic_desc); 00157 if (TopicImpl* a_topic = dynamic_cast<TopicImpl*>(a_topic_desc)) { 00158 topic_servant_ = a_topic; 00159 } 00160 00161 CORBA::String_var topic_name = a_topic_desc->get_name(); 00162 00163 #if !defined (DDS_HAS_MINIMUM_BIT) 00164 is_bit_ = topicIsBIT(topic_name.in(), a_topic_desc->get_type_name()); 00165 #endif // !defined (DDS_HAS_MINIMUM_BIT) 00166 00167 qos_ = qos; 00168 00169 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE 00170 is_exclusive_ownership_ = this->qos_.ownership.kind == ::DDS::EXCLUSIVE_OWNERSHIP_QOS; 00171 #endif 00172 00173 listener_ = a_listener; 00174 listener_mask_ = mask; 00175 00176 // Only store the participant pointer, since it is our "grand" 00177 // parent, we will exist as long as it does 00178 participant_servant_ = participant; 00179 00180 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE 00181 if (is_exclusive_ownership_) { 00182 owner_manager_ = participant_servant_->ownership_manager (); 00183 } 00184 #endif 00185 00186 domain_id_ = participant_servant_->get_domain_id(); 00187 subqos_ = subqos; 00188 }
void OpenDDS::DCPS::RecorderImpl::lookup_instance_handles | ( | const WriterIdSeq & | ids, | |
DDS::InstanceHandleSeq & | hdls | |||
) | [private] |
Lookup the instance handles by the publication repo ids.
Definition at line 913 of file RecorderImpl.cpp.
References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::DomainParticipantImpl::id_to_handle(), LM_DEBUG, OPENDDS_STRING, and participant_servant_.
Referenced by remove_associations_i().
00915 { 00916 CORBA::ULong const num_wrts = ids.length(); 00917 00918 if (DCPS_debug_level > 9) { 00919 OPENDDS_STRING separator = ""; 00920 OPENDDS_STRING buffer; 00921 00922 for (CORBA::ULong i = 0; i < num_wrts; ++i) { 00923 buffer += separator + OPENDDS_STRING(GuidConverter(ids[i])); 00924 separator = ", "; 00925 } 00926 00927 ACE_DEBUG((LM_DEBUG, 00928 ACE_TEXT("(%P|%t) RecorderImpl::lookup_instance_handles: ") 00929 ACE_TEXT("searching for handles for writer Ids: %C.\n"), 00930 buffer.c_str())); 00931 } 00932 00933 hdls.length(num_wrts); 00934 00935 for (CORBA::ULong i = 0; i < num_wrts; ++i) { 00936 hdls[i] = this->participant_servant_->id_to_handle(ids[i]); 00937 } 00938 }
void OpenDDS::DCPS::RecorderImpl::notify_subscription_disconnected | ( | const WriterIdSeq & | pubids | ) | [virtual] |
Implements OpenDDS::DCPS::TransportReceiveListener.
Definition at line 243 of file RecorderImpl.cpp.
void OpenDDS::DCPS::RecorderImpl::notify_subscription_lost | ( | const DDS::InstanceHandleSeq & | handles | ) | [private] |
Definition at line 253 of file RecorderImpl.cpp.
void OpenDDS::DCPS::RecorderImpl::notify_subscription_lost | ( | const WriterIdSeq & | pubids | ) | [virtual] |
Implements OpenDDS::DCPS::TransportReceiveListener.
Definition at line 257 of file RecorderImpl.cpp.
Referenced by remove_associations_i().
void OpenDDS::DCPS::RecorderImpl::notify_subscription_reconnected | ( | const WriterIdSeq & | pubids | ) | [virtual] |
Implements OpenDDS::DCPS::TransportReceiveListener.
Definition at line 247 of file RecorderImpl.cpp.
typedef OpenDDS::DCPS::RecorderImpl::OPENDDS_MAP_CMP | ( | PublicationId | , | |
RcHandle< WriterInfo > | , | |||
GUID_tKeyLessThan | ||||
) | [private] |
publications writing to this reader.
typedef OpenDDS::DCPS::RecorderImpl::OPENDDS_MAP_CMP | ( | RepoId | , | |
DDS::InstanceHandle_t | , | |||
GUID_tKeyLessThan | ||||
) | [private] |
DomainParticipantImpl* OpenDDS::DCPS::RecorderImpl::participant | ( | ) | [inline] |
Definition at line 122 of file RecorderImpl.h.
Referenced by OpenDDS::DCPS::Service_Participant::delete_recorder().
00122 { 00123 return participant_servant_; 00124 }
void OpenDDS::DCPS::RecorderImpl::register_for_writer | ( | const RepoId & | participant, | |
const RepoId & | readerid, | |||
const RepoId & | writerid, | |||
const TransportLocatorSeq & | locators, | |||
DiscoveryListener * | listener | |||
) | [virtual] |
Reimplemented from OpenDDS::DCPS::DataReaderCallbacks.
Definition at line 1028 of file RecorderImpl.cpp.
01033 { 01034 TransportClient::register_for_writer(participant, readerid, writerid, locators, listener); 01035 }
void OpenDDS::DCPS::RecorderImpl::remove_all_associations | ( | ) |
Definition at line 714 of file RecorderImpl.cpp.
References DBG_ENTRY_LVL, publication_handle_lock_, remove_associations(), size, writers_, and writers_lock_.
Referenced by cleanup().
00715 { 00716 DBG_ENTRY_LVL("RecorderImpl","remove_all_associations",6); 00717 00718 OpenDDS::DCPS::WriterIdSeq writers; 00719 int size; 00720 00721 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->publication_handle_lock_); 00722 00723 { 00724 ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_); 00725 00726 size = static_cast<int>(writers_.size()); 00727 writers.length(size); 00728 00729 WriterMapType::iterator curr_writer = writers_.begin(); 00730 WriterMapType::iterator end_writer = writers_.end(); 00731 00732 int i = 0; 00733 00734 while (curr_writer != end_writer) { 00735 writers[i++] = curr_writer->first; 00736 ++curr_writer; 00737 } 00738 } 00739 00740 try { 00741 CORBA::Boolean dont_notify_lost = 0; 00742 00743 if (0 < size) { 00744 remove_associations(writers, dont_notify_lost); 00745 } 00746 00747 } catch (const CORBA::Exception&) { 00748 } 00749 }
virtual void OpenDDS::DCPS::RecorderImpl::remove_associations | ( | const WriterIdSeq & | writers, | |
CORBA::Boolean | callback | |||
) | [virtual] |
Implements OpenDDS::DCPS::DataReaderCallbacks.
Referenced by remove_all_associations().
void OpenDDS::DCPS::RecorderImpl::remove_associations_i | ( | const WriterIdSeq & | writers, | |
bool | callback | |||
) | [protected, virtual] |
Section 7.1.4.1: total_count will not decrement.
: Reconcile this with the verbiage in section 7.1.4.1
Definition at line 581 of file RecorderImpl.cpp.
References ACE_TEXT(), DDS::SubscriptionMatchedStatus::current_count, DDS::SubscriptionMatchedStatus::current_count_change, DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::TransportClient::disassociate(), id_to_handle_map_, OpenDDS::DCPS::RcHandle< T >::in(), is_bit_, DDS::SubscriptionMatchedStatus::last_publication_handle, listener_, LM_DEBUG, lookup_instance_handles(), notify_subscription_lost(), OPENDDS_STRING, publication_handle_lock_, OpenDDS::DCPS::push_back(), remove_association_sweeper_, OpenDDS::DCPS::WriterInfoListener::subscription_id_, subscription_match_status_, DDS::SubscriptionMatchedStatus::total_count_change, writers_, and writers_lock_.
Referenced by remove_publication().
00583 { 00584 DBG_ENTRY_LVL("RecorderImpl", "remove_associations_i", 6); 00585 00586 if (writers.length() == 0) { 00587 return; 00588 } 00589 00590 if (DCPS_debug_level >= 1) { 00591 GuidConverter reader_converter(subscription_id_); 00592 GuidConverter writer_converter(writers[0]); 00593 ACE_DEBUG((LM_DEBUG, 00594 ACE_TEXT("(%P|%t) RecorderImpl::remove_associations_i: ") 00595 ACE_TEXT("bit %d local %C remote %C num remotes %d \n"), 00596 is_bit_, 00597 OPENDDS_STRING(reader_converter).c_str(), 00598 OPENDDS_STRING(writer_converter).c_str(), 00599 writers.length())); 00600 } 00601 DDS::InstanceHandleSeq handles; 00602 00603 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->publication_handle_lock_); 00604 00605 // This is used to hold the list of writers which were actually 00606 // removed, which is a proper subset of the writers which were 00607 // requested to be removed. 00608 WriterIdSeq updated_writers; 00609 00610 CORBA::ULong wr_len; 00611 00612 //Remove the writers from writer list. If the supplied writer 00613 //is not in the cached writers list then it is already removed. 00614 //We just need remove the writers in the list that have not been 00615 //removed. 00616 { 00617 ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_); 00618 00619 wr_len = writers.length(); 00620 00621 for (CORBA::ULong i = 0; i < wr_len; i++) { 00622 PublicationId writer_id = writers[i]; 00623 00624 WriterMapType::iterator it = this->writers_.find(writer_id); 00625 00626 if (it != this->writers_.end()) { 00627 it->second->removed(); 00628 remove_association_sweeper_->cancel_timer(it->second); 00629 } 00630 00631 if (this->writers_.erase(writer_id) == 0) { 00632 if (DCPS_debug_level >= 1) { 00633 GuidConverter converter(writer_id); 00634 ACE_DEBUG((LM_DEBUG, 00635 ACE_TEXT("(%P|%t) RecorderImpl::remove_associations_i: ") 00636 ACE_TEXT("the writer local %C was already removed.\n"), 00637 OPENDDS_STRING(converter).c_str())); 00638 } 00639 00640 } else { 00641 push_back(updated_writers, writer_id); 00642 } 00643 } 00644 } 00645 00646 wr_len = updated_writers.length(); 00647 00648 // Return now if the supplied writers have been removed already. 00649 if (wr_len == 0) { 00650 return; 00651 } 00652 00653 if (!is_bit_) { 00654 // The writer should be in the id_to_handle map at this time. Note 00655 // it if it not there. 00656 this->lookup_instance_handles(updated_writers, handles); 00657 00658 for (CORBA::ULong i = 0; i < wr_len; ++i) { 00659 id_to_handle_map_.erase(updated_writers[i]); 00660 } 00661 } 00662 for (CORBA::ULong i = 0; i < updated_writers.length(); ++i) { 00663 this->disassociate(updated_writers[i]); 00664 } 00665 00666 // Mirror the add_associations SUBSCRIPTION_MATCHED_STATUS processing. 00667 if (!this->is_bit_) { 00668 // Derive the change in the number of publications writing to this reader. 00669 int matchedPublications = static_cast<int>(this->id_to_handle_map_.size()); 00670 this->subscription_match_status_.current_count_change 00671 = matchedPublications - this->subscription_match_status_.current_count; 00672 00673 // Only process status if the number of publications has changed. 00674 if (this->subscription_match_status_.current_count_change != 0) { 00675 this->subscription_match_status_.current_count = matchedPublications; 00676 /// Section 7.1.4.1: total_count will not decrement. 00677 00678 /// @TODO: Reconcile this with the verbiage in section 7.1.4.1 00679 this->subscription_match_status_.last_publication_handle 00680 = handles[ wr_len - 1]; 00681 00682 // set_status_changed_flag(DDS::SUBSCRIPTION_MATCHED_STATUS, true); 00683 00684 // DDS::DataReaderListener_var listener 00685 // = listener_for(DDS::SUBSCRIPTION_MATCHED_STATUS); 00686 00687 if (listener_.in()) { 00688 listener_->on_recorder_matched( 00689 this, 00690 this->subscription_match_status_); 00691 00692 // Client will look at it so next time it looks the change should be 0 00693 this->subscription_match_status_.total_count_change = 0; 00694 this->subscription_match_status_.current_count_change = 0; 00695 } 00696 00697 // notify_status_condition(); 00698 } 00699 } 00700 00701 // If this remove_association is invoked when the InfoRepo 00702 // detects a lost writer then make a callback to notify 00703 // subscription lost. 00704 if (notify_lost) { 00705 this->notify_subscription_lost(handles); 00706 } 00707 00708 // if (this->monitor_) { 00709 // this->monitor_->report(); 00710 // } 00711 }
void OpenDDS::DCPS::RecorderImpl::remove_publication | ( | const PublicationId & | pub_id | ) | [protected] |
Definition at line 566 of file RecorderImpl.cpp.
References OpenDDS::DCPS::WriterInfo::notify_lost_, OpenDDS::DCPS::push_back(), remove_associations_i(), writers_, and writers_lock_.
00567 { 00568 ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_); 00569 WriterMapType::iterator where = writers_.find(pub_id); 00570 if (writers_.end() != where) { 00571 WriterInfo& info = *where->second; 00572 WriterIdSeq writers; 00573 push_back(writers, pub_id); 00574 bool notify = info.notify_lost_; 00575 write_guard.release(); 00576 remove_associations_i(writers, notify); 00577 } 00578 }
DDS::ReturnCode_t OpenDDS::DCPS::RecorderImpl::repoid_to_bit_key | ( | const DCPS::RepoId & | id, | |
DDS::BuiltinTopicKey_t & | key | |||
) | [virtual] |
Find the bit key for a given repo id.
Implements OpenDDS::DCPS::Recorder.
Definition at line 1047 of file RecorderImpl.cpp.
References OpenDDS::DCPS::BUILT_IN_PUBLICATION_TOPIC, OpenDDS::DCPS::DomainParticipantImpl::id_to_handle(), participant_servant_, publication_handle_lock_, DDS::RETCODE_ERROR, and DDS::RETCODE_OK.
01049 { 01050 DDS::InstanceHandle_t publication_handle = this->participant_servant_->id_to_handle(id); 01051 01052 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 01053 guard, 01054 this->publication_handle_lock_, 01055 DDS::RETCODE_ERROR); 01056 01057 DDS::PublicationBuiltinTopicDataSeq data; 01058 01059 DDS::ReturnCode_t ret = instance_handle_to_bit_data<DDS::PublicationBuiltinTopicDataDataReader_var>( 01060 participant_servant_, 01061 BUILT_IN_PUBLICATION_TOPIC, 01062 publication_handle, 01063 data); 01064 01065 if (ret == DDS::RETCODE_OK) { 01066 key = data[0].key; 01067 } 01068 01069 return ret; 01070 }
DDS::ReturnCode_t OpenDDS::DCPS::RecorderImpl::set_listener | ( | const RecorderListener_rch & | a_listener, | |
DDS::StatusMask | mask | |||
) | [virtual] |
Change the listener for this Recorder.
Implements OpenDDS::DCPS::Recorder.
Definition at line 897 of file RecorderImpl.cpp.
References listener_, listener_mask_, and DDS::RETCODE_OK.
00899 { 00900 listener_mask_ = mask; 00901 //note: OK to duplicate a nil object ref 00902 listener_ = a_listener; 00903 return DDS::RETCODE_OK; 00904 }
DDS::ReturnCode_t OpenDDS::DCPS::RecorderImpl::set_qos | ( | const DDS::SubscriberQos & | subscriber_qos, | |
const DDS::DataReaderQos & | datareader_qos | |||
) | [virtual] |
Set the Quality of Service settings for the Recorder.
Implements OpenDDS::DCPS::Recorder.
Definition at line 827 of file RecorderImpl.cpp.
References ACE_TEXT(), OpenDDS::DCPS::Qos_Helper::changeable(), OpenDDS::DCPS::Qos_Helper::consistent(), domain_id_, OpenDDS::DCPS::EntityImpl::enabled_, OpenDDS::DCPS::DomainParticipantImpl::get_domain_id(), OpenDDS::DCPS::DomainParticipantImpl::get_id(), OpenDDS::DCPS::EntityImpl::is_enabled(), LM_ERROR, OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK, OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK, OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK, OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK, participant_servant_, qos_, DDS::RETCODE_ERROR, DDS::RETCODE_IMMUTABLE_POLICY, DDS::RETCODE_INCONSISTENT_POLICY, DDS::RETCODE_OK, DDS::RETCODE_UNSUPPORTED, status, subqos_, OpenDDS::DCPS::WriterInfoListener::subscription_id_, TheServiceParticipant, and OpenDDS::DCPS::Qos_Helper::valid().
00830 { 00831 00832 OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK(subscriber_qos, DDS::RETCODE_UNSUPPORTED); 00833 00834 if (Qos_Helper::valid(subscriber_qos) && Qos_Helper::consistent(subscriber_qos)) { 00835 if (subqos_ != subscriber_qos) { 00836 // for the not changeable qos, it can be changed before enable 00837 if (!Qos_Helper::changeable(subqos_, subscriber_qos) && enabled_ == true) { 00838 return DDS::RETCODE_IMMUTABLE_POLICY; 00839 00840 } else { 00841 subqos_ = subscriber_qos; 00842 } 00843 } 00844 } else { 00845 return DDS::RETCODE_INCONSISTENT_POLICY; 00846 } 00847 00848 OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED); 00849 OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED); 00850 OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED); 00851 00852 if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) { 00853 if (qos_ == qos) 00854 return DDS::RETCODE_OK; 00855 00856 if (!Qos_Helper::changeable(qos_, qos) && this->is_enabled()) { 00857 return DDS::RETCODE_IMMUTABLE_POLICY; 00858 00859 } else { 00860 Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_); 00861 const bool status = 00862 disco->update_subscription_qos( 00863 this->participant_servant_->get_domain_id(), 00864 this->participant_servant_->get_id(), 00865 this->subscription_id_, 00866 qos, 00867 subscriber_qos); 00868 if (!status) { 00869 ACE_ERROR_RETURN((LM_ERROR, 00870 ACE_TEXT("(%P|%t) RecorderImpl::set_qos, ") 00871 ACE_TEXT("qos not updated. \n")), 00872 DDS::RETCODE_ERROR); 00873 } 00874 } 00875 00876 qos_ = qos; 00877 subqos_ = subscriber_qos; 00878 00879 return DDS::RETCODE_OK; 00880 00881 } else { 00882 return DDS::RETCODE_INCONSISTENT_POLICY; 00883 } 00884 }
void OpenDDS::DCPS::RecorderImpl::signal_liveliness | ( | const RepoId & | remote_participant | ) | [virtual] |
Implements OpenDDS::DCPS::DataReaderCallbacks.
Definition at line 798 of file RecorderImpl.cpp.
References OpenDDS::DCPS::GUID_t::entityId, ACE_OS::gettimeofday(), OpenDDS::DCPS::GUID_t::guidPrefix, OpenDDS::DCPS::TransportClient::OPENDDS_VECTOR(), sample_lock_, writers_, and writers_lock_.
00799 { 00800 RepoId prefix = remote_participant; 00801 prefix.entityId = EntityId_t(); 00802 00803 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->sample_lock_); 00804 00805 typedef std::pair<RepoId, RcHandle<WriterInfo> > WriterSetElement; 00806 typedef OPENDDS_VECTOR(WriterSetElement) WriterSet; 00807 WriterSet writers; 00808 00809 { 00810 ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_); 00811 for (WriterMapType::iterator pos = writers_.lower_bound(prefix), 00812 limit = writers_.end(); 00813 pos != limit && GuidPrefixEqual() (pos->first.guidPrefix, prefix.guidPrefix); 00814 ++pos) { 00815 writers.push_back(std::make_pair(pos->first, pos->second)); 00816 } 00817 } 00818 00819 ACE_Time_Value when = ACE_OS::gettimeofday(); 00820 for (WriterSet::iterator pos = writers.begin(), limit = writers.end(); 00821 pos != limit; 00822 ++pos) { 00823 pos->second->received_activity(when); 00824 } 00825 }
void OpenDDS::DCPS::RecorderImpl::unregister_for_writer | ( | const RepoId & | participant, | |
const RepoId & | readerid, | |||
const RepoId & | writerid | |||
) | [virtual] |
Reimplemented from OpenDDS::DCPS::DataReaderCallbacks.
Definition at line 1038 of file RecorderImpl.cpp.
01041 { 01042 TransportClient::unregister_for_writer(participant, readerid, writerid); 01043 }
void OpenDDS::DCPS::RecorderImpl::update_incompatible_qos | ( | const IncompatibleQosStatus & | status | ) | [virtual] |
Implements OpenDDS::DCPS::DataReaderCallbacks.
Definition at line 752 of file RecorderImpl.cpp.
References OpenDDS::DCPS::IncompatibleQosStatus::count_since_last_send, OpenDDS::DCPS::IncompatibleQosStatus::last_policy_id, DDS::RequestedIncompatibleQosStatus::last_policy_id, OpenDDS::DCPS::IncompatibleQosStatus::policies, DDS::RequestedIncompatibleQosStatus::policies, publication_handle_lock_, requested_incompatible_qos_status_, OpenDDS::DCPS::IncompatibleQosStatus::total_count, DDS::RequestedIncompatibleQosStatus::total_count, and DDS::RequestedIncompatibleQosStatus::total_count_change.
00753 { 00754 00755 00756 ACE_GUARD(ACE_Recursive_Thread_Mutex, 00757 guard, 00758 this->publication_handle_lock_); 00759 00760 if (this->requested_incompatible_qos_status_.total_count == status.total_count) { 00761 // This test should make the method idempotent. 00762 return; 00763 } 00764 00765 // set_status_changed_flag(DDS::REQUESTED_INCOMPATIBLE_QOS_STATUS, 00766 // true); 00767 00768 // copy status and increment change 00769 requested_incompatible_qos_status_.total_count = status.total_count; 00770 requested_incompatible_qos_status_.total_count_change += 00771 status.count_since_last_send; 00772 requested_incompatible_qos_status_.last_policy_id = 00773 status.last_policy_id; 00774 requested_incompatible_qos_status_.policies = status.policies; 00775 00776 // if (!CORBA::is_nil(listener.in())) { 00777 // listener->on_requested_incompatible_qos(this, 00778 // requested_incompatible_qos_status_); 00779 // 00780 // // TBD - why does the spec say to change total_count_change but not 00781 // // change the ChangeFlagStatus after a listener call? 00782 // 00783 // // client just looked at it so next time it looks the 00784 // // change should be 0 00785 // requested_incompatible_qos_status_.total_count_change = 0; 00786 // } 00787 // 00788 // notify_status_condition(); 00789 }
friend class ::DDS_TEST [friend] |
Reimplemented from OpenDDS::DCPS::TransportClient.
Definition at line 168 of file RecorderImpl.h.
friend class RemoveAssociationSweeper< RecorderImpl > [friend] |
Definition at line 166 of file RecorderImpl.h.
Definition at line 173 of file RecorderImpl.h.
Referenced by add_association(), cleanup(), enable(), init(), and set_qos().
RepoIdToHandleMap OpenDDS::DCPS::RecorderImpl::id_to_handle_map_ [private] |
Definition at line 179 of file RecorderImpl.h.
Referenced by add_association(), and remove_associations_i().
bool OpenDDS::DCPS::RecorderImpl::is_bit_ [private] |
Flag indicates that this datareader is a builtin topic datareader.
Definition at line 186 of file RecorderImpl.h.
Referenced by add_association(), init(), and remove_associations_i().
bool OpenDDS::DCPS::RecorderImpl::is_exclusive_ownership_ [private] |
Definition at line 159 of file RecorderImpl.h.
Referenced by init().
Definition at line 172 of file RecorderImpl.h.
Referenced by add_association(), data_received(), get_listener(), init(), remove_associations_i(), and set_listener().
Definition at line 171 of file RecorderImpl.h.
Referenced by init(), and set_listener().
Definition at line 161 of file RecorderImpl.h.
Referenced by init().
Definition at line 155 of file RecorderImpl.h.
Referenced by add_association(), cleanup(), enable(), get_instance_handle(), init(), lookup_instance_handles(), repoid_to_bit_key(), and set_qos().
Definition at line 176 of file RecorderImpl.h.
Referenced by add_association(), remove_all_associations(), remove_associations_i(), repoid_to_bit_key(), and update_incompatible_qos().
Definition at line 150 of file RecorderImpl.h.
Referenced by check_transport_qos(), enable(), get_qos(), init(), and set_qos().
RcHandle<RemoveAssociationSweeper<RecorderImpl> > OpenDDS::DCPS::RecorderImpl::remove_association_sweeper_ [private] |
Definition at line 174 of file RecorderImpl.h.
Referenced by cleanup(), remove_associations_i(), and ~RecorderImpl().
DDS::RequestedIncompatibleQosStatus OpenDDS::DCPS::RecorderImpl::requested_incompatible_qos_status_ [private] |
Definition at line 181 of file RecorderImpl.h.
Referenced by RecorderImpl(), and update_incompatible_qos().
lock protecting sample container as well as statuses.
Definition at line 153 of file RecorderImpl.h.
Referenced by add_association(), data_received(), and signal_liveliness().
Definition at line 182 of file RecorderImpl.h.
Referenced by add_association(), RecorderImpl(), and remove_associations_i().
DDS::TopicDescription_var OpenDDS::DCPS::RecorderImpl::topic_desc_ [private] |
Definition at line 170 of file RecorderImpl.h.
Referenced by init().
Definition at line 156 of file RecorderImpl.h.
Referenced by enable(), inconsistent_topic(), and init().
WriterMapType OpenDDS::DCPS::RecorderImpl::writers_ [private] |
Definition at line 191 of file RecorderImpl.h.
Referenced by add_association(), cleanup(), remove_all_associations(), remove_associations_i(), remove_publication(), signal_liveliness(), and ~RecorderImpl().
RW lock for reading/writing publications.
Definition at line 194 of file RecorderImpl.h.
Referenced by add_association(), cleanup(), remove_all_associations(), remove_associations_i(), remove_publication(), signal_liveliness(), and ~RecorderImpl().