Implements the OpenDDS::DCPS::DataWriterRemote interfaces and DDS::DataWriter interfaces. More...
#include <DataWriterImpl.h>
Implements the OpenDDS::DCPS::DataWriterRemote interfaces and DDS::DataWriter interfaces.
See the DDS specification, OMG formal/04-12-02, for a description of the interface this class is implementing.
This class must be inherited by the type-specific datawriter which is specific to the data-type associated with the topic.
Definition at line 83 of file DataWriterImpl.h.
OpenDDS::DCPS::DataWriterImpl::DataWriterImpl | ( | ) |
Definition at line 58 of file DataWriterImpl.cpp.
References DDS::PublicationMatchedStatus::current_count, DDS::PublicationMatchedStatus::current_count_change, DDS::HANDLE_NIL, DDS::OfferedDeadlineMissedStatus::last_instance_handle, DDS::OfferedIncompatibleQosStatus::last_policy_id, DDS::PublicationMatchedStatus::last_subscription_handle, liveliness_lost_status_, monitor_, offered_deadline_missed_status_, offered_incompatible_qos_status_, periodic_monitor_, DDS::OfferedIncompatibleQosStatus::policies, publication_match_status_, TheServiceParticipant, DDS::PublicationMatchedStatus::total_count, DDS::OfferedIncompatibleQosStatus::total_count, DDS::OfferedDeadlineMissedStatus::total_count, DDS::LivelinessLostStatus::total_count, DDS::PublicationMatchedStatus::total_count_change, DDS::OfferedIncompatibleQosStatus::total_count_change, DDS::OfferedDeadlineMissedStatus::total_count_change, and DDS::LivelinessLostStatus::total_count_change.
00059 : data_dropped_count_(0), 00060 data_delivered_count_(0), 00061 controlTracker("DataWriterImpl"), 00062 n_chunks_(TheServiceParticipant->n_chunks()), 00063 association_chunk_multiplier_(TheServiceParticipant->association_chunk_multiplier()), 00064 qos_(TheServiceParticipant->initial_DataWriterQos()), 00065 db_lock_pool_(new DataBlockLockPool((unsigned long)TheServiceParticipant->n_chunks())), 00066 topic_id_(GUID_UNKNOWN), 00067 topic_servant_(0), 00068 listener_mask_(DEFAULT_STATUS_MASK), 00069 domain_id_(0), 00070 publication_id_(GUID_UNKNOWN), 00071 sequence_number_(SequenceNumber::SEQUENCENUMBER_UNKNOWN()), 00072 coherent_(false), 00073 coherent_samples_(0), 00074 liveliness_lost_(false), 00075 reactor_(0), 00076 liveliness_check_interval_(ACE_Time_Value::max_time), 00077 last_liveliness_activity_time_(ACE_Time_Value::zero), 00078 last_deadline_missed_total_count_(0), 00079 watchdog_(), 00080 is_bit_(false), 00081 min_suspended_transaction_id_(0), 00082 max_suspended_transaction_id_(0), 00083 monitor_(0), 00084 periodic_monitor_(0), 00085 liveliness_asserted_(false), 00086 liveness_timer_(make_rch<LivenessTimer>(ref(*this))) 00087 { 00088 liveliness_lost_status_.total_count = 0; 00089 liveliness_lost_status_.total_count_change = 0; 00090 00091 offered_deadline_missed_status_.total_count = 0; 00092 offered_deadline_missed_status_.total_count_change = 0; 00093 offered_deadline_missed_status_.last_instance_handle = DDS::HANDLE_NIL; 00094 00095 offered_incompatible_qos_status_.total_count = 0; 00096 offered_incompatible_qos_status_.total_count_change = 0; 00097 offered_incompatible_qos_status_.last_policy_id = 0; 00098 offered_incompatible_qos_status_.policies.length(0); 00099 00100 publication_match_status_.total_count = 0; 00101 publication_match_status_.total_count_change = 0; 00102 publication_match_status_.current_count = 0; 00103 publication_match_status_.current_count_change = 0; 00104 publication_match_status_.last_subscription_handle = DDS::HANDLE_NIL; 00105 00106 monitor_ = 00107 TheServiceParticipant->monitor_factory_->create_data_writer_monitor(this); 00108 periodic_monitor_ = 00109 TheServiceParticipant->monitor_factory_->create_data_writer_periodic_monitor(this); 00110 }
OpenDDS::DCPS::DataWriterImpl::~DataWriterImpl | ( | ) | [virtual] |
Definition at line 114 of file DataWriterImpl.cpp.
References DBG_ENTRY_LVL.
00115 { 00116 DBG_ENTRY_LVL("DataWriterImpl","~DataWriterImpl",6); 00117 }
void OpenDDS::DCPS::DataWriterImpl::add_association | ( | const RepoId & | yourId, | |
const ReaderAssociation & | reader, | |||
bool | active | |||
) | [virtual] |
Implements OpenDDS::DCPS::DataWriterCallbacks.
Definition at line 190 of file DataWriterImpl.cpp.
References ACE_TEXT(), OpenDDS::DCPS::TransportClient::associate(), DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, DDS::DataReaderQos::durability, OpenDDS::DCPS::EntityImpl::entity_deleted_, OpenDDS::DCPS::ReaderAssociation::exprParams, OpenDDS::DCPS::ReaderAssociation::filterClassName, OpenDDS::DCPS::ReaderAssociation::filterExpression, get_publication_id(), OpenDDS::DCPS::GUID_UNKNOWN, is_bit_, LM_DEBUG, LM_ERROR, OPENDDS_STRING, participant_servant_, publication_id_, qos_, reader_info_, reader_info_lock_, OpenDDS::DCPS::ReaderAssociation::readerId, OpenDDS::DCPS::ReaderAssociation::readerQos, OpenDDS::DCPS::ReaderAssociation::readerTransInfo, DDS::DataReaderQos::reliability, DDS::RELIABLE_RELIABILITY_QOS, OpenDDS::DCPS::AssociationData::remote_data_, OpenDDS::DCPS::AssociationData::remote_durable_, OpenDDS::DCPS::AssociationData::remote_id_, OpenDDS::DCPS::AssociationData::remote_reliable_, TheServiceParticipant, DDS::DataWriterQos::transport_priority, ACE_Atomic_Op< ACE_LOCK, TYPE >::value(), and DDS::VOLATILE_DURABILITY_QOS.
00193 { 00194 DBG_ENTRY_LVL("DataWriterImpl", "add_association", 6); 00195 00196 if (DCPS_debug_level) { 00197 GuidConverter writer_converter(yourId); 00198 GuidConverter reader_converter(reader.readerId); 00199 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DataWriterImpl::add_association - ") 00200 ACE_TEXT("bit %d local %C remote %C\n"), is_bit_, 00201 OPENDDS_STRING(writer_converter).c_str(), 00202 OPENDDS_STRING(reader_converter).c_str())); 00203 } 00204 00205 if (entity_deleted_.value()) { 00206 if (DCPS_debug_level) 00207 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DataWriterImpl::add_association") 00208 ACE_TEXT(" This is a deleted datawriter, ignoring add.\n"))); 00209 00210 return; 00211 } 00212 00213 if (GUID_UNKNOWN == publication_id_) { 00214 publication_id_ = yourId; 00215 } 00216 00217 { 00218 ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_); 00219 reader_info_.insert(std::make_pair(reader.readerId, 00220 ReaderInfo(reader.filterClassName, 00221 TheServiceParticipant->publisher_content_filter() ? reader.filterExpression : "", 00222 reader.exprParams, participant_servant_, 00223 reader.readerQos.durability.kind > DDS::VOLATILE_DURABILITY_QOS))); 00224 } 00225 00226 if (DCPS_debug_level > 4) { 00227 GuidConverter converter(get_publication_id()); 00228 ACE_DEBUG((LM_DEBUG, 00229 ACE_TEXT("(%P|%t) DataWriterImpl::add_association(): ") 00230 ACE_TEXT("adding subscription to publication %C with priority %d.\n"), 00231 OPENDDS_STRING(converter).c_str(), 00232 qos_.transport_priority.value)); 00233 } 00234 00235 AssociationData data; 00236 data.remote_id_ = reader.readerId; 00237 data.remote_data_ = reader.readerTransInfo; 00238 data.remote_reliable_ = 00239 (reader.readerQos.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS); 00240 data.remote_durable_ = 00241 (reader.readerQos.durability.kind > DDS::VOLATILE_DURABILITY_QOS); 00242 00243 if (!associate(data, active)) { 00244 //FUTURE: inform inforepo and try again as passive peer 00245 if (DCPS_debug_level) { 00246 ACE_ERROR((LM_ERROR, 00247 ACE_TEXT("(%P|%t) DataWriterImpl::add_association: ") 00248 ACE_TEXT("ERROR: transport layer failed to associate.\n"))); 00249 } 00250 } 00251 }
DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::assert_liveliness | ( | ) | [virtual] |
Implements DDS::DataWriter.
Definition at line 1136 of file DataWriterImpl.cpp.
References DDS::AUTOMATIC_LIVELINESS_QOS, ACE_OS::gettimeofday(), DDS::DataWriterQos::liveliness, OpenDDS::DCPS::WeakRcHandle< T >::lock(), DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS, DDS::MANUAL_BY_TOPIC_LIVELINESS_QOS, participant_servant_, qos_, DDS::RETCODE_ERROR, DDS::RETCODE_OK, and send_liveliness().
01137 { 01138 switch (this->qos_.liveliness.kind) { 01139 case DDS::AUTOMATIC_LIVELINESS_QOS: 01140 // Do nothing. 01141 break; 01142 case DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS: 01143 { 01144 RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock(); 01145 if (participant) 01146 return participant->assert_liveliness(); 01147 return DDS::RETCODE_OK; 01148 } 01149 case DDS::MANUAL_BY_TOPIC_LIVELINESS_QOS: 01150 if (this->send_liveliness(ACE_OS::gettimeofday()) == false) { 01151 return DDS::RETCODE_ERROR; 01152 } 01153 break; 01154 } 01155 01156 return DDS::RETCODE_OK; 01157 }
DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::assert_liveliness_by_participant | ( | ) | [virtual] |
Definition at line 1160 of file DataWriterImpl.cpp.
References DDS::DataWriterQos::liveliness, liveliness_asserted_, DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS, qos_, and DDS::RETCODE_OK.
01161 { 01162 // This operation is called by participant. 01163 01164 if (this->qos_.liveliness.kind == DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS) { 01165 // Set a flag indicating that we should send a liveliness message on the timer if necessary. 01166 liveliness_asserted_ = true; 01167 } 01168 01169 return DDS::RETCODE_OK; 01170 }
void OpenDDS::DCPS::DataWriterImpl::association_complete | ( | const RepoId & | remote_id | ) | [virtual] |
Implements OpenDDS::DCPS::DataWriterCallbacks.
Definition at line 374 of file DataWriterImpl.cpp.
References ACE_TEXT(), assoc_complete_readers_, association_complete_i(), DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, is_bit_, LM_DEBUG, lock_, OPENDDS_STRING, pending_readers_, publication_id_, and OpenDDS::DCPS::remove().
00375 { 00376 DBG_ENTRY_LVL("DataWriterImpl", "association_complete", 6); 00377 00378 if (DCPS_debug_level >= 1) { 00379 GuidConverter writer_converter(this->publication_id_); 00380 GuidConverter reader_converter(remote_id); 00381 ACE_DEBUG((LM_DEBUG, 00382 ACE_TEXT("(%P|%t) DataWriterImpl::association_complete - ") 00383 ACE_TEXT("bit %d local %C remote %C\n"), 00384 is_bit_, 00385 OPENDDS_STRING(writer_converter).c_str(), 00386 OPENDDS_STRING(reader_converter).c_str())); 00387 } 00388 00389 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_); 00390 00391 if (OpenDDS::DCPS::remove(pending_readers_, remote_id) == -1) { 00392 if (DCPS_debug_level) { 00393 GuidConverter writer_converter(this->publication_id_); 00394 GuidConverter reader_converter(remote_id); 00395 ACE_DEBUG((LM_DEBUG, 00396 ACE_TEXT("(%P|%t) DataWriterImpl::association_complete - ") 00397 ACE_TEXT("bit %d local %C did not find pending reader: %C") 00398 ACE_TEXT("defer association_complete_i until add_association resumes\n"), 00399 is_bit_, 00400 OPENDDS_STRING(writer_converter).c_str(), 00401 OPENDDS_STRING(reader_converter).c_str())); 00402 } 00403 // Not found in pending_readers_, defer calling association_complete_i() 00404 // until add_association() resumes and sees this ID in assoc_complete_readers_. 00405 assoc_complete_readers_.insert(remote_id); 00406 00407 } else { 00408 association_complete_i(remote_id); 00409 } 00410 }
void OpenDDS::DCPS::DataWriterImpl::association_complete_i | ( | const RepoId & | remote_id | ) | [private] |
Definition at line 413 of file DataWriterImpl.cpp.
References ACE_TEXT(), available_data_list_, OpenDDS::DCPS::SendStateDataSampleList::begin(), OpenDDS::DCPS::bind(), controlTracker, create_control_message(), DDS::PublicationMatchedStatus::current_count, DDS::PublicationMatchedStatus::current_count_change, data_container_, DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::SendStateDataSampleList::end(), OpenDDS::DCPS::END_HISTORIC_SAMPLES, OpenDDS::DCPS::SendStateDataSampleList::enqueue_tail(), OpenDDS::DCPS::gen_find_size(), OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), get_db_lock(), get_lock(), get_resend_data(), ACE_OS::gettimeofday(), header, id_to_handle_map_, OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::insert(), is_bit_, CORBA::is_nil(), DDS::PublicationMatchedStatus::last_subscription_handle, DDS::DataWriterQos::lifespan, listener_for(), LM_DEBUG, LM_ERROR, LM_INFO, LM_WARNING, OpenDDS::DCPS::WeakRcHandle< T >::lock(), lock_, ACE_Message_Block::MB_DATA, OpenDDS::DCPS::MessageTracker::message_dropped(), OpenDDS::DCPS::MessageTracker::message_sent(), monitor_, OpenDDS::DCPS::move(), OpenDDS::DCPS::EntityImpl::notify_status_condition(), OPENDDS_STRING, participant_servant_, publication_id_, publication_match_status_, DDS::PUBLICATION_MATCHED_STATUS, publisher_servant_, qos_, reader_info_, reader_info_lock_, readers_, OpenDDS::DCPS::Monitor::report(), OpenDDS::DCPS::SEND_CONTROL_ERROR, OpenDDS::DCPS::TransportClient::send_w_control(), OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), size, OpenDDS::DCPS::time_value_to_time(), timestamp(), DDS::PublicationMatchedStatus::total_count, and DDS::PublicationMatchedStatus::total_count_change.
Referenced by association_complete(), and transport_assoc_done().
00414 { 00415 DBG_ENTRY_LVL("DataWriterImpl", "association_complete_i", 6); 00416 00417 if (DCPS_debug_level >= 1) { 00418 GuidConverter writer_converter(this->publication_id_); 00419 GuidConverter reader_converter(remote_id); 00420 ACE_DEBUG((LM_DEBUG, 00421 ACE_TEXT("(%P|%t) DataWriterImpl::association_complete_i - ") 00422 ACE_TEXT("bit %d local %C remote %C\n"), 00423 is_bit_, 00424 OPENDDS_STRING(writer_converter).c_str(), 00425 OPENDDS_STRING(reader_converter).c_str())); 00426 } 00427 00428 bool reader_durable = false; 00429 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC 00430 OPENDDS_STRING filterClassName; 00431 RcHandle<FilterEvaluator> eval; 00432 DDS::StringSeq expression_params; 00433 #endif 00434 { 00435 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_); 00436 00437 if (OpenDDS::DCPS::insert(readers_, remote_id) == -1) { 00438 GuidConverter converter(remote_id); 00439 ACE_ERROR((LM_ERROR, 00440 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::association_complete_i: ") 00441 ACE_TEXT("insert %C from pending failed.\n"), 00442 OPENDDS_STRING(converter).c_str())); 00443 } 00444 } 00445 { 00446 ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_); 00447 RepoIdToReaderInfoMap::const_iterator it = reader_info_.find(remote_id); 00448 00449 if (it != reader_info_.end()) { 00450 reader_durable = it->second.durable_; 00451 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC 00452 filterClassName = it->second.filter_class_name_; 00453 eval = it->second.eval_; 00454 expression_params = it->second.expression_params_; 00455 #endif 00456 } 00457 } 00458 00459 if (this->monitor_) { 00460 this->monitor_->report(); 00461 } 00462 00463 if (!is_bit_) { 00464 00465 RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock(); 00466 00467 if (!participant) 00468 return; 00469 00470 DDS::InstanceHandle_t handle = 00471 participant->id_to_handle(remote_id); 00472 00473 { 00474 // protect publication_match_status_ and status changed flags. 00475 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_); 00476 00477 if (OpenDDS::DCPS::bind(id_to_handle_map_, remote_id, handle) != 0) { 00478 GuidConverter converter(remote_id); 00479 ACE_DEBUG((LM_WARNING, 00480 ACE_TEXT("(%P|%t) WARNING: DataWriterImpl::association_complete_i: ") 00481 ACE_TEXT("id_to_handle_map_%C = 0x%x failed.\n"), 00482 OPENDDS_STRING(converter).c_str(), 00483 handle)); 00484 return; 00485 00486 } else if (DCPS_debug_level > 4) { 00487 GuidConverter converter(remote_id); 00488 ACE_DEBUG((LM_DEBUG, 00489 ACE_TEXT("(%P|%t) DataWriterImpl::association_complete_i: ") 00490 ACE_TEXT("id_to_handle_map_%C = 0x%x.\n"), 00491 OPENDDS_STRING(converter).c_str(), 00492 handle)); 00493 } 00494 00495 ++publication_match_status_.total_count; 00496 ++publication_match_status_.total_count_change; 00497 ++publication_match_status_.current_count; 00498 ++publication_match_status_.current_count_change; 00499 publication_match_status_.last_subscription_handle = handle; 00500 set_status_changed_flag(DDS::PUBLICATION_MATCHED_STATUS, true); 00501 } 00502 00503 DDS::DataWriterListener_var listener = 00504 listener_for(DDS::PUBLICATION_MATCHED_STATUS); 00505 00506 if (!CORBA::is_nil(listener.in())) { 00507 00508 listener->on_publication_matched(this, publication_match_status_); 00509 00510 // TBD - why does the spec say to change this but not 00511 // change the ChangeFlagStatus after a listener call? 00512 publication_match_status_.total_count_change = 0; 00513 publication_match_status_.current_count_change = 0; 00514 } 00515 00516 notify_status_condition(); 00517 } 00518 00519 // Support DURABILITY QoS 00520 if (reader_durable) { 00521 // Tell the WriteDataContainer to resend all sending/sent 00522 // samples. 00523 this->data_container_->reenqueue_all(remote_id, this->qos_.lifespan 00524 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC 00525 , filterClassName, eval.in(), expression_params 00526 #endif 00527 ); 00528 00529 // Acquire the data writer container lock to avoid deadlock. The 00530 // thread calling association_complete() has to acquire lock in the 00531 // same order as the write()/register() operation. 00532 00533 // Since the thread calling association_complete() is the ORB 00534 // thread, it may have some performance penalty. If the 00535 // performance is an issue, we may need a new thread to handle the 00536 // data_available() calls. 00537 ACE_GUARD(ACE_Recursive_Thread_Mutex, 00538 guard, 00539 this->get_lock()); 00540 00541 SendStateDataSampleList list = this->get_resend_data(); 00542 { 00543 ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_); 00544 // Update the reader's expected sequence 00545 SequenceNumber& seq = 00546 reader_info_.find(remote_id)->second.expected_sequence_; 00547 00548 for (SendStateDataSampleList::iterator list_el = list.begin(); 00549 list_el != list.end(); ++list_el) { 00550 list_el->get_header().historic_sample_ = true; 00551 00552 if (list_el->get_header().sequence_ > seq) { 00553 seq = list_el->get_header().sequence_; 00554 } 00555 } 00556 } 00557 00558 RcHandle<PublisherImpl> publisher = this->publisher_servant_.lock(); 00559 if (!publisher || publisher->is_suspended()) { 00560 this->available_data_list_.enqueue_tail(list); 00561 00562 } else { 00563 if (DCPS_debug_level >= 4) { 00564 ACE_DEBUG((LM_INFO, "(%P|%t) Sending historic samples\n")); 00565 } 00566 00567 size_t size = 0, padding = 0; 00568 gen_find_size(remote_id, size, padding); 00569 Message_Block_Ptr data( 00570 new ACE_Message_Block(size, ACE_Message_Block::MB_DATA, 0, 0, 0, 00571 get_db_lock())); 00572 Serializer ser(data.get()); 00573 ser << remote_id; 00574 00575 const DDS::Time_t timestamp = time_value_to_time(ACE_OS::gettimeofday()); 00576 DataSampleHeader header; 00577 Message_Block_Ptr end_historic_samples( 00578 create_control_message(END_HISTORIC_SAMPLES, header, move(data), timestamp)); 00579 00580 this->controlTracker.message_sent(); 00581 guard.release(); 00582 SendControlStatus ret = send_w_control(list, header, move(end_historic_samples), remote_id); 00583 if (ret == SEND_CONTROL_ERROR) { 00584 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ") 00585 ACE_TEXT("DataWriterImpl::association_complete_i: ") 00586 ACE_TEXT("send_w_control failed.\n"))); 00587 this->controlTracker.message_dropped(); 00588 } 00589 } 00590 } 00591 }
void OpenDDS::DCPS::DataWriterImpl::begin_coherent_changes | ( | ) |
Starts a coherent change set; should only be called once.
Definition at line 2235 of file DataWriterImpl.cpp.
References coherent_, and get_lock().
02236 { 02237 ACE_GUARD(ACE_Recursive_Thread_Mutex, 02238 guard, 02239 get_lock()); 02240 02241 this->coherent_ = true; 02242 }
bool OpenDDS::DCPS::DataWriterImpl::check_transport_qos | ( | const TransportInst & | inst | ) | [virtual] |
Implements OpenDDS::DCPS::TransportClient.
Definition at line 2214 of file DataWriterImpl.cpp.
void OpenDDS::DCPS::DataWriterImpl::cleanup | ( | void | ) |
cleanup the DataWriter.
Definition at line 121 of file DataWriterImpl.cpp.
References OpenDDS::DCPS::NO_STATUS_MASK, set_listener(), and topic_servant_.
Referenced by OpenDDS::DCPS::PublisherImpl::delete_datawriter().
00122 { 00123 // As first step set our listener to nill which will prevent us from calling 00124 // back onto the listener at the moment the related DDS entity has been 00125 // deleted 00126 set_listener(0, NO_STATUS_MASK); 00127 topic_servant_ = 0; 00128 }
bool OpenDDS::DCPS::DataWriterImpl::coherent_changes_pending | ( | ) |
Are coherent changes pending?
Definition at line 2224 of file DataWriterImpl.cpp.
References coherent_, and get_lock().
02225 { 02226 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 02227 guard, 02228 get_lock(), 02229 false); 02230 02231 return this->coherent_; 02232 }
void OpenDDS::DCPS::DataWriterImpl::control_delivered | ( | const Message_Block_Ptr & | sample | ) | [virtual] |
This is called by transport to notify that the control message is delivered.
Reimplemented from OpenDDS::DCPS::TransportSendListener.
Definition at line 2173 of file DataWriterImpl.cpp.
References controlTracker, DBG_ENTRY_LVL, and OpenDDS::DCPS::MessageTracker::message_delivered().
02174 { 02175 DBG_ENTRY_LVL("DataWriterImpl","control_delivered",6); 02176 controlTracker.message_delivered(); 02177 }
void OpenDDS::DCPS::DataWriterImpl::control_dropped | ( | const Message_Block_Ptr & | sample, | |
bool | dropped_by_transport | |||
) | [virtual] |
This is called by transport to notify that the control message is dropped.
Reimplemented from OpenDDS::DCPS::TransportSendListener.
Definition at line 2317 of file DataWriterImpl.cpp.
References controlTracker, DBG_ENTRY_LVL, and OpenDDS::DCPS::MessageTracker::message_dropped().
02319 { 02320 DBG_ENTRY_LVL("DataWriterImpl","control_dropped",6); 02321 controlTracker.message_dropped(); 02322 }
DataWriterImpl::AckToken OpenDDS::DCPS::DataWriterImpl::create_ack_token | ( | DDS::Duration_t | max_wait | ) | const |
Create an AckToken for ack operations.
Definition at line 976 of file DataWriterImpl.cpp.
References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::SequenceNumber::getValue(), LM_DEBUG, and sequence_number_.
Referenced by wait_for_acknowledgments().
00977 { 00978 if (DCPS_debug_level > 0) { 00979 ACE_DEBUG((LM_DEBUG, 00980 ACE_TEXT("(%P|%t) DataWriterImpl::create_ack_token() - ") 00981 ACE_TEXT("for sequence %q \n"), 00982 this->sequence_number_.getValue())); 00983 } 00984 return AckToken(max_wait, this->sequence_number_); 00985 }
ACE_Message_Block * OpenDDS::DCPS::DataWriterImpl::create_control_message | ( | MessageId | message_id, | |
DataSampleHeader & | header, | |||
Message_Block_Ptr | data, | |||
const DDS::Time_t & | source_timestamp | |||
) | [private] |
This method create a header message block and chain with the registered sample. The header contains the information needed. e.g. message id, length of whole message... The fast allocator is not used for the header.
Definition at line 1971 of file DataWriterImpl.cpp.
References ACE_TEXT(), OpenDDS::DCPS::DataSampleHeader::byte_order_, OpenDDS::DCPS::DataSampleHeader::coherent_change_, db_allocator_, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::DISPOSE_INSTANCE, OpenDDS::DCPS::DISPOSE_UNREGISTER_INSTANCE, OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), get_db_lock(), OpenDDS::DCPS::INSTANCE_REGISTRATION, OpenDDS::DCPS::DataSampleHeader::key_fields_only_, LM_DEBUG, OpenDDS::DCPS::WeakRcHandle< T >::lock(), OpenDDS::DCPS::DataSampleHeader::max_marshaled_size(), ACE_Time_Value::max_time, mb_allocator_, ACE_Message_Block::MB_DATA, OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::DataSampleHeader::message_length_, DDS::Time_t::nanosec, need_sequence_repair(), OPENDDS_STRING, publication_id_, OpenDDS::DCPS::DataSampleHeader::publication_id_, OpenDDS::DCPS::DataSampleHeader::publisher_id_, publisher_servant_, reader_info_, reader_info_lock_, OpenDDS::DCPS::unique_ptr< T, Deleter >::release(), OpenDDS::DCPS::REQUEST_ACK, DDS::Time_t::sec, OpenDDS::DCPS::DataSampleHeader::sequence_, sequence_number_, OpenDDS::DCPS::DataSampleHeader::sequence_repair_, OpenDDS::DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN(), OpenDDS::RTPS::SEQUENCENUMBER_UNKNOWN, OpenDDS::DCPS::DataSampleHeader::source_timestamp_nanosec_, OpenDDS::DCPS::DataSampleHeader::source_timestamp_sec_, OpenDDS::DCPS::TransportClient::swap_bytes(), OpenDDS::DCPS::to_string(), OpenDDS::DCPS::UNREGISTER_INSTANCE, and ACE_Time_Value::zero.
Referenced by association_complete_i(), dispose(), dispose_and_unregister(), end_coherent_changes(), register_instance_i(), send_liveliness(), send_request_ack(), and unregister_instance_i().
01975 { 01976 header_data.message_id_ = message_id; 01977 header_data.byte_order_ = 01978 this->swap_bytes() ? !ACE_CDR_BYTE_ORDER : ACE_CDR_BYTE_ORDER; 01979 header_data.coherent_change_ = 0; 01980 01981 if (data) { 01982 header_data.message_length_ = static_cast<ACE_UINT32>(data->total_length()); 01983 } 01984 01985 header_data.sequence_ = SequenceNumber::SEQUENCENUMBER_UNKNOWN(); 01986 header_data.sequence_repair_ = false; // set below 01987 header_data.source_timestamp_sec_ = source_timestamp.sec; 01988 header_data.source_timestamp_nanosec_ = source_timestamp.nanosec; 01989 header_data.publication_id_ = publication_id_; 01990 01991 RcHandle<PublisherImpl> publisher = this->publisher_servant_.lock(); 01992 if (!publisher) { 01993 return 0; 01994 } 01995 01996 header_data.publisher_id_ = publisher->publisher_id_; 01997 01998 if (message_id == INSTANCE_REGISTRATION 01999 || message_id == DISPOSE_INSTANCE 02000 || message_id == UNREGISTER_INSTANCE 02001 || message_id == DISPOSE_UNREGISTER_INSTANCE 02002 || message_id == REQUEST_ACK) { 02003 02004 header_data.sequence_repair_ = need_sequence_repair(); 02005 02006 // Use the sequence number here for the sake of RTPS (where these 02007 // control messages map onto the Data Submessage). 02008 if (this->sequence_number_ == SequenceNumber::SEQUENCENUMBER_UNKNOWN()) { 02009 this->sequence_number_ = SequenceNumber(); 02010 02011 } else { 02012 ++this->sequence_number_; 02013 } 02014 02015 header_data.sequence_ = this->sequence_number_; 02016 header_data.key_fields_only_ = true; 02017 } 02018 02019 ACE_Message_Block* message = 0; 02020 ACE_NEW_MALLOC_RETURN(message, 02021 static_cast<ACE_Message_Block*>( 02022 mb_allocator_->malloc(sizeof(ACE_Message_Block))), 02023 ACE_Message_Block( 02024 DataSampleHeader::max_marshaled_size(), 02025 ACE_Message_Block::MB_DATA, 02026 header_data.message_length_ ? data.release() : 0, //cont 02027 0, //data 02028 0, //allocator_strategy 02029 get_db_lock(), //locking_strategy 02030 ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY, 02031 ACE_Time_Value::zero, 02032 ACE_Time_Value::max_time, 02033 db_allocator_.get(), 02034 mb_allocator_.get()), 02035 0); 02036 02037 *message << header_data; 02038 02039 // If we incremented sequence number for this control message 02040 if (header_data.sequence_ != SequenceNumber::SEQUENCENUMBER_UNKNOWN()) { 02041 ACE_GUARD_RETURN(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_, 0); 02042 // Update the expected sequence number for all readers 02043 RepoIdToReaderInfoMap::iterator reader; 02044 02045 for (reader = reader_info_.begin(); reader != reader_info_.end(); ++reader) { 02046 reader->second.expected_sequence_ = sequence_number_; 02047 } 02048 } 02049 if (DCPS_debug_level >= 4) { 02050 const GuidConverter converter(publication_id_); 02051 ACE_DEBUG((LM_DEBUG, 02052 ACE_TEXT("(%P|%t) DataWriterImpl::create_control_message: ") 02053 ACE_TEXT("from publication %C sending control sample: %C .\n"), 02054 OPENDDS_STRING(converter).c_str(), 02055 to_string(header_data).c_str())); 02056 } 02057 return message; 02058 }
DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::create_sample_data_message | ( | Message_Block_Ptr | data, | |
DDS::InstanceHandle_t | instance_handle, | |||
DataSampleHeader & | header_data, | |||
Message_Block_Ptr & | message, | |||
const DDS::Time_t & | source_timestamp, | |||
bool | content_filter | |||
) |
This method create a header message block and chain with the sample data. The header contains the information needed. e.g. message id, length of whole message... The fast allocator is used to allocate the message block, data block and header.
Definition at line 2061 of file DataWriterImpl.cpp.
References ACE_TEXT(), OpenDDS::DCPS::DataSampleHeader::byte_order_, OpenDDS::DCPS::TransportClient::cdr_encapsulation(), OpenDDS::DCPS::DataSampleHeader::cdr_encapsulation_, coherent_, OpenDDS::DCPS::DataSampleHeader::coherent_change_, OpenDDS::DCPS::DataSampleHeader::content_filter_, data_container_, db_allocator_, OpenDDS::DCPS::DCPS_debug_level, DDS::DURATION_INFINITE_NSEC, DDS::DURATION_INFINITE_SEC, OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), get_db_lock(), OpenDDS::DCPS::DataSampleHeader::group_coherent_, DDS::GROUP_PRESENTATION_QOS, header_allocator_, DDS::DataWriterQos::lifespan, OpenDDS::DCPS::DataSampleHeader::lifespan_duration_, OpenDDS::DCPS::DataSampleHeader::lifespan_duration_nanosec_, OpenDDS::DCPS::DataSampleHeader::lifespan_duration_sec_, LM_DEBUG, LM_ERROR, OpenDDS::DCPS::WeakRcHandle< T >::lock(), OpenDDS::DCPS::DataSampleHeader::max_marshaled_size(), ACE_Time_Value::max_time, mb_allocator_, ACE_Message_Block::MB_DATA, OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::DataSampleHeader::message_length_, DDS::Time_t::nanosec, need_sequence_repair(), OPENDDS_STRING, publication_id_, OpenDDS::DCPS::DataSampleHeader::publication_id_, OpenDDS::DCPS::DataSampleHeader::publisher_id_, publisher_servant_, qos_, OpenDDS::DCPS::unique_ptr< T, Deleter >::release(), OpenDDS::DCPS::unique_ptr< T, Deleter >::reset(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::SAMPLE_DATA, DDS::Time_t::sec, OpenDDS::DCPS::DataSampleHeader::sequence_, sequence_number_, OpenDDS::DCPS::DataSampleHeader::sequence_repair_, OpenDDS::DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN(), OpenDDS::DCPS::DataSampleHeader::source_timestamp_nanosec_, OpenDDS::DCPS::DataSampleHeader::source_timestamp_sec_, OpenDDS::DCPS::TransportClient::swap_bytes(), OpenDDS::DCPS::to_string(), and ACE_Time_Value::zero.
Referenced by write().
02067 { 02068 PublicationInstance_rch instance = 02069 data_container_->get_handle_instance(instance_handle); 02070 02071 if (0 == instance) { 02072 ACE_ERROR_RETURN((LM_ERROR, 02073 ACE_TEXT("(%P|%t) DataWriterImpl::create_sample_data_message ") 02074 ACE_TEXT("failed to find instance for handle %d\n"), 02075 instance_handle), 02076 DDS::RETCODE_ERROR); 02077 } 02078 02079 header_data.message_id_ = SAMPLE_DATA; 02080 header_data.byte_order_ = 02081 this->swap_bytes() ? !ACE_CDR_BYTE_ORDER : ACE_CDR_BYTE_ORDER; 02082 header_data.coherent_change_ = this->coherent_; 02083 02084 RcHandle<PublisherImpl> publisher = this->publisher_servant_.lock(); 02085 02086 if (!publisher) 02087 return DDS::RETCODE_ERROR; 02088 02089 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 02090 header_data.group_coherent_ = 02091 publisher->qos_.presentation.access_scope 02092 == DDS::GROUP_PRESENTATION_QOS; 02093 #endif 02094 header_data.content_filter_ = content_filter; 02095 header_data.cdr_encapsulation_ = this->cdr_encapsulation(); 02096 header_data.message_length_ = static_cast<ACE_UINT32>(data->total_length()); 02097 header_data.sequence_repair_ = need_sequence_repair(); 02098 02099 if (this->sequence_number_ == SequenceNumber::SEQUENCENUMBER_UNKNOWN()) { 02100 this->sequence_number_ = SequenceNumber(); 02101 02102 } else { 02103 ++this->sequence_number_; 02104 } 02105 02106 header_data.sequence_ = this->sequence_number_; 02107 header_data.source_timestamp_sec_ = source_timestamp.sec; 02108 header_data.source_timestamp_nanosec_ = source_timestamp.nanosec; 02109 02110 if (qos_.lifespan.duration.sec != DDS::DURATION_INFINITE_SEC 02111 || qos_.lifespan.duration.nanosec != DDS::DURATION_INFINITE_NSEC) { 02112 header_data.lifespan_duration_ = true; 02113 header_data.lifespan_duration_sec_ = qos_.lifespan.duration.sec; 02114 header_data.lifespan_duration_nanosec_ = qos_.lifespan.duration.nanosec; 02115 } 02116 02117 header_data.publication_id_ = publication_id_; 02118 header_data.publisher_id_ = publisher->publisher_id_; 02119 size_t max_marshaled_size = header_data.max_marshaled_size(); 02120 02121 ACE_Message_Block* tmp_message; 02122 ACE_NEW_MALLOC_RETURN(tmp_message, 02123 static_cast<ACE_Message_Block*>( 02124 mb_allocator_->malloc(sizeof(ACE_Message_Block))), 02125 ACE_Message_Block(max_marshaled_size, 02126 ACE_Message_Block::MB_DATA, 02127 data.release(), //cont 02128 0, //data 02129 header_allocator_.get(), //alloc_strategy 02130 get_db_lock(), //locking_strategy 02131 ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY, 02132 ACE_Time_Value::zero, 02133 ACE_Time_Value::max_time, 02134 db_allocator_.get(), 02135 mb_allocator_.get()), 02136 DDS::RETCODE_ERROR); 02137 message.reset(tmp_message); 02138 *message << header_data; 02139 if (DCPS_debug_level >= 4) { 02140 const GuidConverter converter(publication_id_); 02141 ACE_DEBUG((LM_DEBUG, 02142 ACE_TEXT("(%P|%t) DataWriterImpl::create_sample_data_message: ") 02143 ACE_TEXT("from publication %C sending data sample: %C .\n"), 02144 OPENDDS_STRING(converter).c_str(), 02145 to_string(header_data).c_str())); 02146 } 02147 return DDS::RETCODE_OK; 02148 }
void OpenDDS::DCPS::DataWriterImpl::data_delivered | ( | const DataSampleElement * | sample | ) | [virtual] |
This is called by transport to notify that the sample is delivered and it is delegated to WriteDataContainer to adjust the internal data sample threads.
Reimplemented from OpenDDS::DCPS::TransportSendListener.
Definition at line 2151 of file DataWriterImpl.cpp.
References ACE_TEXT(), data_container_, data_delivered_count_, DBG_ENTRY_LVL, OpenDDS::DCPS::DataSampleElement::get_pub_id(), LM_ERROR, OPENDDS_STRING, and publication_id_.
02152 { 02153 DBG_ENTRY_LVL("DataWriterImpl","data_delivered",6); 02154 02155 if (!(sample->get_pub_id() == this->publication_id_)) { 02156 GuidConverter sample_converter(sample->get_pub_id()); 02157 GuidConverter writer_converter(publication_id_); 02158 ACE_ERROR((LM_ERROR, 02159 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::data_delivered: ") 02160 ACE_TEXT(" The publication id %C from delivered element ") 02161 ACE_TEXT("does not match the datawriter's id %C\n"), 02162 OPENDDS_STRING(sample_converter).c_str(), 02163 OPENDDS_STRING(writer_converter).c_str())); 02164 return; 02165 } 02166 //provided for statistics tracking in tests 02167 ++data_delivered_count_; 02168 02169 this->data_container_->data_delivered(sample); 02170 }
void OpenDDS::DCPS::DataWriterImpl::data_dropped | ( | const DataSampleElement * | element, | |
bool | dropped_by_transport | |||
) | [virtual] |
This mothod is called by transport to notify the instance sample is dropped and it delegates to WriteDataContainer to update the internal list.
Reimplemented from OpenDDS::DCPS::TransportSendListener.
Definition at line 2305 of file DataWriterImpl.cpp.
References data_container_, data_dropped_count_, and DBG_ENTRY_LVL.
02307 { 02308 DBG_ENTRY_LVL("DataWriterImpl","data_dropped",6); 02309 02310 //provided for statistics tracking in tests 02311 ++data_dropped_count_; 02312 02313 this->data_container_->data_dropped(element, dropped_by_transport); 02314 }
DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::dispose | ( | DDS::InstanceHandle_t | handle, | |
const DDS::Time_t & | source_timestamp | |||
) |
Delegate to the WriteDataContainer to dispose all data samples for a given instance and tell the transport to broadcast the disposed instance.
Definition at line 1880 of file DataWriterImpl.cpp.
References ACE_TEXT(), create_control_message(), data_container_, DBG_ENTRY_LVL, OpenDDS::DCPS::DISPOSE_INSTANCE, OpenDDS::DCPS::EntityImpl::enabled_, OpenDDS::DCPS::DataSampleElement::get_header(), get_lock(), LM_ERROR, OpenDDS::DCPS::move(), DDS::RETCODE_ERROR, DDS::RETCODE_NOT_ENABLED, DDS::RETCODE_OK, send_all_to_flush_control(), and OpenDDS::DCPS::DataSampleElement::set_sample().
01882 { 01883 DBG_ENTRY_LVL("DataWriterImpl","dispose",6); 01884 01885 if (enabled_ == false) { 01886 ACE_ERROR_RETURN((LM_ERROR, 01887 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::dispose: ") 01888 ACE_TEXT(" Entity is not enabled. \n")), 01889 DDS::RETCODE_NOT_ENABLED); 01890 } 01891 01892 DDS::ReturnCode_t ret = DDS::RETCODE_ERROR; 01893 01894 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, guard, get_lock(), ret); 01895 01896 Message_Block_Ptr registered_sample_data; 01897 ret = this->data_container_->dispose(handle, registered_sample_data); 01898 01899 if (ret != DDS::RETCODE_OK) { 01900 ACE_ERROR_RETURN((LM_ERROR, 01901 ACE_TEXT("(%P|%t) ERROR: ") 01902 ACE_TEXT("DataWriterImpl::dispose: ") 01903 ACE_TEXT("dispose failed.\n")), 01904 ret); 01905 } 01906 01907 DataSampleElement* element = 0; 01908 ret = this->data_container_->obtain_buffer_for_control(element); 01909 01910 if (ret != DDS::RETCODE_OK) { 01911 ACE_ERROR_RETURN((LM_ERROR, 01912 ACE_TEXT("(%P|%t) ERROR: ") 01913 ACE_TEXT("DataWriterImpl::dispose: ") 01914 ACE_TEXT("obtain_buffer_for_control returned %d.\n"), 01915 ret), 01916 ret); 01917 } 01918 01919 Message_Block_Ptr sample(create_control_message(DISPOSE_INSTANCE, 01920 element->get_header(), 01921 move(registered_sample_data), 01922 source_timestamp)); 01923 element->set_sample(move(sample)); 01924 ret = this->data_container_->enqueue_control(element); 01925 01926 if (ret != DDS::RETCODE_OK) { 01927 ACE_ERROR_RETURN((LM_ERROR, 01928 ACE_TEXT("(%P|%t) ERROR: ") 01929 ACE_TEXT("DataWriterImpl::dispose: ") 01930 ACE_TEXT("enqueue_control failed.\n")), 01931 ret); 01932 } 01933 01934 send_all_to_flush_control(guard); 01935 01936 return DDS::RETCODE_OK; 01937 }
DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::dispose_and_unregister | ( | DDS::InstanceHandle_t | handle, | |
const DDS::Time_t & | timestamp | |||
) | [private] |
Definition at line 1651 of file DataWriterImpl.cpp.
References ACE_TEXT(), create_control_message(), data_container_, DBG_ENTRY_LVL, OpenDDS::DCPS::DISPOSE_UNREGISTER_INSTANCE, OpenDDS::DCPS::DataSampleElement::get_header(), get_lock(), LM_ERROR, OpenDDS::DCPS::move(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, send_all_to_flush_control(), and OpenDDS::DCPS::DataSampleElement::set_sample().
Referenced by unregister_instance_i().
01653 { 01654 DBG_ENTRY_LVL("DataWriterImpl", "dispose_and_unregister", 6); 01655 01656 DDS::ReturnCode_t ret = DDS::RETCODE_ERROR; 01657 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, get_lock(), ret); 01658 01659 Message_Block_Ptr data_sample; 01660 ret = this->data_container_->dispose(handle, data_sample); 01661 01662 if (ret != DDS::RETCODE_OK) { 01663 ACE_ERROR_RETURN((LM_ERROR, 01664 ACE_TEXT("(%P|%t) ERROR: ") 01665 ACE_TEXT("DataWriterImpl::dispose_and_unregister: ") 01666 ACE_TEXT("dispose on container failed. \n")), 01667 ret); 01668 } 01669 01670 ret = this->data_container_->unregister(handle, data_sample, false); 01671 01672 if (ret != DDS::RETCODE_OK) { 01673 ACE_ERROR_RETURN((LM_ERROR, 01674 ACE_TEXT("(%P|%t) ERROR: ") 01675 ACE_TEXT("DataWriterImpl::dispose_and_unregister: ") 01676 ACE_TEXT("unregister with container failed. \n")), 01677 ret); 01678 } 01679 01680 DataSampleElement* element = 0; 01681 ret = this->data_container_->obtain_buffer_for_control(element); 01682 01683 if (ret != DDS::RETCODE_OK) { 01684 ACE_ERROR_RETURN((LM_ERROR, 01685 ACE_TEXT("(%P|%t) ERROR: ") 01686 ACE_TEXT("DataWriterImpl::dispose_and_unregister: ") 01687 ACE_TEXT("obtain_buffer_for_control returned %d.\n"), 01688 ret), 01689 ret); 01690 } 01691 01692 Message_Block_Ptr sample(create_control_message(DISPOSE_UNREGISTER_INSTANCE, 01693 element->get_header(), 01694 move(data_sample), 01695 source_timestamp)); 01696 element->set_sample(move(sample)); 01697 ret = this->data_container_->enqueue_control(element); 01698 01699 if (ret != DDS::RETCODE_OK) { 01700 ACE_ERROR_RETURN((LM_ERROR, 01701 ACE_TEXT("(%P|%t) ERROR: ") 01702 ACE_TEXT("DataWriterImpl::dispose_and_unregister: ") 01703 ACE_TEXT("enqueue_control failed.\n")), 01704 ret); 01705 } 01706 01707 send_all_to_flush_control(guard); 01708 return DDS::RETCODE_OK; 01709 }
DDS::DomainId_t OpenDDS::DCPS::DataWriterImpl::domain_id | ( | ) | const [inline, private, virtual] |
Implements OpenDDS::DCPS::TransportClient.
Definition at line 555 of file DataWriterImpl.h.
00555 { 00556 return this->domain_id_; 00557 }
DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::enable | ( | ) | [virtual] |
Implements DDS::Entity.
Definition at line 1259 of file DataWriterImpl.cpp.
References ACE_TEXT(), association_chunk_multiplier_, OpenDDS::DCPS::TransportClient::connection_info(), data_container_, db_allocator_, OpenDDS::DCPS::DCPS_debug_level, DDS::DataWriterQos::deadline, domain_id_, dp_id_, DDS::DataWriterQos::durability, DDS::DataWriterQos::durability_service, DDS::DURATION_INFINITE_NSEC, DDS::DURATION_INFINITE_SEC, OpenDDS::DCPS::duration_to_time_value(), OpenDDS::DCPS::TransportClient::enable_transport(), OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), get_type_name(), OpenDDS::DCPS::GUID_UNKNOWN, header_allocator_, DDS::DataWriterQos::history, if(), OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::EntityImpl::is_enabled(), DDS::KEEP_ALL_HISTORY_QOS, last_deadline_missed_total_count_, DDS::LENGTH_UNLIMITED, DDS::DataWriterQos::lifespan, DDS::DataWriterQos::liveliness, liveliness_check_interval_, liveness_timer_, LM_DEBUG, LM_ERROR, LM_WARNING, OpenDDS::DCPS::WeakRcHandle< T >::lock(), lock_, mb_allocator_, monitor_, n_chunks_, DDS::Duration_t::nanosec, offered_deadline_missed_status_, participant_servant_, publication_id_, publisher_servant_, qos_, reactor_, OpenDDS::DCPS::ref(), DDS::DataWriterQos::reliability, DDS::RELIABLE_RELIABILITY_QOS, OpenDDS::DCPS::Monitor::report(), OpenDDS::DCPS::unique_ptr< T, Deleter >::reset(), DDS::DataWriterQos::resource_limits, DDS::RETCODE_ERROR, DDS::RETCODE_OK, DDS::RETCODE_PRECONDITION_NOT_MET, ACE_Reactor_Timer_Interface::schedule_timer(), DDS::Duration_t::sec, OpenDDS::DCPS::EntityImpl::set_enabled(), TheServiceParticipant, topic_name_, topic_servant_, DDS::VOLATILE_DURABILITY_QOS, watchdog_, WriteDataContainer, and ACE_Time_Value::zero.
Referenced by OpenDDS::DCPS::PublisherImpl::create_datawriter().
01260 { 01261 //According spec: 01262 // - Calling enable on an already enabled Entity returns OK and has no 01263 // effect. 01264 // - Calling enable on an Entity whose factory is not enabled will fail 01265 // and return PRECONDITION_NOT_MET. 01266 01267 if (this->is_enabled()) { 01268 return DDS::RETCODE_OK; 01269 } 01270 01271 RcHandle<PublisherImpl> publisher = this->publisher_servant_.lock(); 01272 if (!publisher || !publisher->is_enabled()) { 01273 return DDS::RETCODE_PRECONDITION_NOT_MET; 01274 } 01275 01276 RcHandle<DomainParticipantImpl> participant = participant_servant_.lock(); 01277 if (participant) { 01278 dp_id_ = participant->get_id(); 01279 } 01280 01281 // Note: do configuration based on QoS in enable() because 01282 // before enable is called the QoS can be changed -- even 01283 // for Changeable=NO 01284 01285 // Configure WriteDataContainer constructor parameters from qos. 01286 01287 const bool reliable = qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS; 01288 01289 CORBA::Long const max_samples_per_instance = 01290 (qos_.resource_limits.max_samples_per_instance == DDS::LENGTH_UNLIMITED) 01291 ? 0x7fffffff : qos_.resource_limits.max_samples_per_instance; 01292 01293 CORBA::Long max_instances = 0, max_total_samples = 0; 01294 01295 if (qos_.resource_limits.max_samples != DDS::LENGTH_UNLIMITED) { 01296 n_chunks_ = qos_.resource_limits.max_samples; 01297 01298 if (qos_.resource_limits.max_instances == DDS::LENGTH_UNLIMITED || 01299 (qos_.resource_limits.max_samples < qos_.resource_limits.max_instances) 01300 || (qos_.resource_limits.max_samples < 01301 (qos_.resource_limits.max_instances * max_samples_per_instance))) { 01302 max_total_samples = reliable ? qos_.resource_limits.max_samples : 0; 01303 } 01304 } 01305 01306 if (reliable && qos_.resource_limits.max_instances != DDS::LENGTH_UNLIMITED) 01307 max_instances = qos_.resource_limits.max_instances; 01308 01309 const CORBA::Long history_depth = 01310 (qos_.history.kind == DDS::KEEP_ALL_HISTORY_QOS || 01311 qos_.history.depth == DDS::LENGTH_UNLIMITED) ? 0x7fffffff : qos_.history.depth; 01312 01313 const CORBA::Long max_durable_per_instance = 01314 qos_.durability.kind == DDS::VOLATILE_DURABILITY_QOS ? 0 : history_depth; 01315 01316 // enable the type specific part of this DataWriter 01317 this->enable_specific(); 01318 01319 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE 01320 // Get data durability cache if DataWriter QoS requires durable 01321 // samples. Publisher servant retains ownership of the cache. 01322 DataDurabilityCache* const durability_cache = 01323 TheServiceParticipant->get_data_durability_cache(qos_.durability); 01324 #endif 01325 01326 //Note: the QoS used to set n_chunks_ is Changable=No so 01327 // it is OK that we cannot change the size of our allocators. 01328 data_container_ .reset(new WriteDataContainer(this, 01329 max_samples_per_instance, 01330 history_depth, 01331 max_durable_per_instance, 01332 qos_.reliability.max_blocking_time, 01333 n_chunks_, 01334 domain_id_, 01335 topic_name_, 01336 get_type_name(), 01337 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE 01338 durability_cache, 01339 qos_.durability_service, 01340 #endif 01341 max_instances, 01342 max_total_samples)); 01343 01344 // +1 because we might allocate one before releasing another 01345 // TBD - see if this +1 can be removed. 01346 mb_allocator_.reset(new MessageBlockAllocator(n_chunks_ * association_chunk_multiplier_)); 01347 db_allocator_.reset(new DataBlockAllocator(n_chunks_+1)); 01348 header_allocator_.reset(new DataSampleHeaderAllocator(n_chunks_+1)); 01349 01350 if (DCPS_debug_level >= 2) { 01351 ACE_DEBUG((LM_DEBUG, 01352 "(%P|%t) DataWriterImpl::enable-mb" 01353 " Cached_Allocator_With_Overflow %x with %d chunks\n", 01354 mb_allocator_.get(), 01355 n_chunks_)); 01356 01357 ACE_DEBUG((LM_DEBUG, 01358 "(%P|%t) DataWriterImpl::enable-db" 01359 " Cached_Allocator_With_Overflow %x with %d chunks\n", 01360 db_allocator_.get(), 01361 n_chunks_)); 01362 01363 ACE_DEBUG((LM_DEBUG, 01364 "(%P|%t) DataWriterImpl::enable-header" 01365 " Cached_Allocator_With_Overflow %x with %d chunks\n", 01366 header_allocator_.get(), 01367 n_chunks_)); 01368 } 01369 01370 if (qos_.liveliness.lease_duration.sec != DDS::DURATION_INFINITE_SEC && 01371 qos_.liveliness.lease_duration.nanosec != DDS::DURATION_INFINITE_NSEC) { 01372 liveliness_check_interval_ = duration_to_time_value(qos_.liveliness.lease_duration); 01373 liveliness_check_interval_ *= TheServiceParticipant->liveliness_factor()/100.0; 01374 // Must be at least 1 micro second. 01375 if (liveliness_check_interval_ == ACE_Time_Value::zero) { 01376 liveliness_check_interval_ = ACE_Time_Value (0, 1); 01377 } 01378 01379 if (reactor_->schedule_timer(liveness_timer_.in(), 01380 0, 01381 liveliness_check_interval_, 01382 liveliness_check_interval_) == -1) { 01383 ACE_ERROR((LM_ERROR, 01384 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::enable: %p.\n"), 01385 ACE_TEXT("schedule_timer"))); 01386 01387 } 01388 } 01389 01390 if (!participant) 01391 return DDS::RETCODE_ERROR; 01392 01393 participant->add_adjust_liveliness_timers(this); 01394 01395 // Setup the offered deadline watchdog if the configured deadline 01396 // period is not the default (infinite). 01397 DDS::Duration_t const deadline_period = this->qos_.deadline.period; 01398 01399 if (deadline_period.sec != DDS::DURATION_INFINITE_SEC 01400 || deadline_period.nanosec != DDS::DURATION_INFINITE_NSEC) { 01401 this->watchdog_ = make_rch<OfferedDeadlineWatchdog>( 01402 ref(this->lock_), 01403 this->qos_.deadline, 01404 ref(*this), 01405 ref(this->offered_deadline_missed_status_), 01406 ref(this->last_deadline_missed_total_count_)); 01407 } 01408 01409 Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_); 01410 disco->pre_writer(this); 01411 01412 this->set_enabled(); 01413 01414 try { 01415 this->enable_transport(reliable, 01416 this->qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS); 01417 01418 } catch (const Transport::Exception&) { 01419 ACE_ERROR((LM_ERROR, 01420 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::enable, ") 01421 ACE_TEXT("Transport Exception.\n"))); 01422 data_container_->shutdown_ = true; 01423 return DDS::RETCODE_ERROR; 01424 } 01425 01426 const TransportLocatorSeq& trans_conf_info = connection_info(); 01427 01428 DDS::PublisherQos pub_qos; 01429 01430 publisher->get_qos(pub_qos); 01431 01432 this->publication_id_ = 01433 disco->add_publication(this->domain_id_, 01434 this->dp_id_, 01435 this->topic_servant_->get_id(), 01436 this, 01437 this->qos_, 01438 trans_conf_info, 01439 pub_qos); 01440 01441 01442 if (!publisher || this->publication_id_ == GUID_UNKNOWN) { 01443 ACE_DEBUG((LM_WARNING, 01444 ACE_TEXT("(%P|%t) WARNING: DataWriterImpl::enable, ") 01445 ACE_TEXT("add_publication returned invalid id. \n"))); 01446 data_container_->shutdown_ = true; 01447 return DDS::RETCODE_ERROR; 01448 } 01449 01450 this->data_container_->publication_id_ = this->publication_id_; 01451 01452 const DDS::ReturnCode_t writer_enabled_result = 01453 publisher->writer_enabled(topic_name_.in(), this); 01454 01455 if (this->monitor_) { 01456 this->monitor_->report(); 01457 } 01458 01459 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE 01460 01461 // Move cached data from the durability cache to the unsent data 01462 // queue. 01463 if (durability_cache != 0) { 01464 01465 if (!durability_cache->get_data(this->domain_id_, 01466 this->topic_name_, 01467 get_type_name(), 01468 this, 01469 this->mb_allocator_.get(), 01470 this->db_allocator_.get(), 01471 this->qos_.lifespan)) { 01472 ACE_ERROR((LM_ERROR, 01473 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::enable: ") 01474 ACE_TEXT("unable to retrieve durable data\n"))); 01475 } 01476 } 01477 01478 #endif 01479 01480 return writer_enabled_result; 01481 }
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::enable_specific | ( | ) | [protected, pure virtual] |
Implemented in OpenDDS::DCPS::DataWriterImpl_T< MessageType >.
void OpenDDS::DCPS::DataWriterImpl::end_coherent_changes | ( | const GroupCoherentSamples & | group_samples | ) |
Ends a coherent change set; should only be called once.
Definition at line 2245 of file DataWriterImpl.cpp.
References ACE_TEXT(), coherent_, coherent_samples_, OpenDDS::DCPS::CoherentChangeControl::coherent_samples_, create_control_message(), OpenDDS::DCPS::END_COHERENT_CHANGES, OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), get_db_lock(), get_lock(), ACE_OS::gettimeofday(), OpenDDS::DCPS::CoherentChangeControl::group_coherent_, OpenDDS::DCPS::CoherentChangeControl::group_coherent_samples_, DDS::GROUP_PRESENTATION_QOS, header, OpenDDS::DCPS::WriterCoherentSample::last_sample_, LM_ERROR, OpenDDS::DCPS::WeakRcHandle< T >::lock(), OpenDDS::DCPS::CoherentChangeControl::max_marshaled_size(), ACE_Message_Block::MB_DATA, OpenDDS::DCPS::move(), OpenDDS::DCPS::WriterCoherentSample::num_samples_, OpenDDS::DCPS::CoherentChangeControl::publisher_id_, publisher_servant_, send_control(), OpenDDS::DCPS::SEND_CONTROL_ERROR, sequence_number_, OpenDDS::DCPS::TransportClient::swap_bytes(), and OpenDDS::DCPS::time_value_to_time().
02246 { 02247 // PublisherImpl::pi_lock_ should be held. 02248 ACE_GUARD(ACE_Recursive_Thread_Mutex, 02249 guard, 02250 get_lock()); 02251 02252 CoherentChangeControl end_msg; 02253 end_msg.coherent_samples_.num_samples_ = this->coherent_samples_; 02254 end_msg.coherent_samples_.last_sample_ = this->sequence_number_; 02255 02256 RcHandle<PublisherImpl> publisher = this->publisher_servant_.lock(); 02257 02258 if (publisher) { 02259 end_msg.group_coherent_ 02260 = publisher->qos_.presentation.access_scope == DDS::GROUP_PRESENTATION_QOS; 02261 } 02262 02263 if (publisher && end_msg.group_coherent_) { 02264 end_msg.publisher_id_ = publisher->publisher_id_; 02265 end_msg.group_coherent_samples_ = group_samples; 02266 } 02267 02268 size_t max_marshaled_size = end_msg.max_marshaled_size(); 02269 02270 Message_Block_Ptr data( new ACE_Message_Block(max_marshaled_size, 02271 ACE_Message_Block::MB_DATA, 02272 0, //cont 02273 0, //data 02274 0, //alloc_strategy 02275 get_db_lock())); 02276 02277 Serializer serializer( 02278 data.get(), 02279 this->swap_bytes()); 02280 02281 serializer << end_msg; 02282 02283 DDS::Time_t source_timestamp = 02284 time_value_to_time(ACE_OS::gettimeofday()); 02285 02286 DataSampleHeader header; 02287 Message_Block_Ptr control( 02288 create_control_message(END_COHERENT_CHANGES, header, move(data), source_timestamp)); 02289 02290 02291 this->coherent_ = false; 02292 this->coherent_samples_ = 0; 02293 02294 guard.release(); 02295 if (this->send_control(header, move(control)) == SEND_CONTROL_ERROR) { 02296 ACE_ERROR((LM_ERROR, 02297 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::end_coherent_changes:") 02298 ACE_TEXT(" unable to send END_COHERENT_CHANGES control message!\n"))); 02299 } 02300 }
bool OpenDDS::DCPS::DataWriterImpl::filter_out | ( | const DataSampleElement & | elt, | |
const OPENDDS_STRING & | filterClassName, | |||
const FilterEvaluator & | evaluator, | |||
const DDS::StringSeq & | expression_params | |||
) | const |
Definition at line 2187 of file DataWriterImpl.cpp.
References OpenDDS::DCPS::DataSampleHeader::byte_order_, OpenDDS::DCPS::DataSampleHeader::cdr_encapsulation_, ACE_Message_Block::cont(), OpenDDS::DCPS::FilterEvaluator::eval(), OpenDDS::DCPS::DataSampleElement::get_header(), OpenDDS::DCPS::DataSampleElement::get_sample(), OpenDDS::DCPS::TypeSupportImpl::getMetaStructForType(), LM_ERROR, and topic_servant_.
Referenced by OpenDDS::DCPS::WriteDataContainer::copy_and_prepend().
02191 { 02192 TypeSupportImpl* const typesupport = 02193 dynamic_cast<TypeSupportImpl*>(topic_servant_->get_type_support()); 02194 02195 if (!typesupport) { 02196 ACE_ERROR((LM_ERROR, "(%P|%t) ERROR DataWriterImpl::filter_out - Could not cast type support, not filtering\n")); 02197 return false; 02198 } 02199 02200 if (filterClassName == "DDSSQL" || 02201 filterClassName == "OPENDDSSQL") { 02202 return !evaluator.eval(elt.get_sample()->cont(), 02203 elt.get_header().byte_order_ != ACE_CDR_BYTE_ORDER, 02204 elt.get_header().cdr_encapsulation_, typesupport->getMetaStructForType(), 02205 expression_params); 02206 } 02207 else { 02208 return false; 02209 } 02210 }
DataBlockLockPool::DataBlockLock* OpenDDS::DCPS::DataWriterImpl::get_db_lock | ( | ) | [inline] |
Definition at line 456 of file DataWriterImpl.h.
Referenced by association_complete_i(), create_control_message(), create_sample_data_message(), end_coherent_changes(), and OpenDDS::DCPS::DataDurabilityCache::get_data().
00456 { 00457 return db_lock_pool_->get_lock(); 00458 }
RepoId OpenDDS::DCPS::DataWriterImpl::get_dp_id | ( | ) |
Accessor of the repository id of the domain participant.
Definition at line 1959 of file DataWriterImpl.cpp.
References dp_id_.
Referenced by OpenDDS::DCPS::DWMonitorImpl::report().
01960 { 01961 return dp_id_; 01962 }
PublicationInstance_rch OpenDDS::DCPS::DataWriterImpl::get_handle_instance | ( | DDS::InstanceHandle_t | handle | ) |
Attempt to locate an existing instance for the given handle.
Definition at line 2449 of file DataWriterImpl.cpp.
References data_container_.
02450 { 02451 02452 if (0 != data_container_) { 02453 return data_container_->get_handle_instance(handle); 02454 } 02455 02456 return PublicationInstance_rch(); 02457 }
DDS::InstanceHandle_t OpenDDS::DCPS::DataWriterImpl::get_instance_handle | ( | ) | [virtual] |
Implements OpenDDS::DCPS::EntityImpl.
Definition at line 170 of file DataWriterImpl.cpp.
References OpenDDS::DCPS::WeakRcHandle< T >::lock(), participant_servant_, and publication_id_.
00171 { 00172 using namespace OpenDDS::DCPS; 00173 RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock(); 00174 if (participant) 00175 return participant->id_to_handle(publication_id_); 00176 return 0; 00177 }
void OpenDDS::DCPS::DataWriterImpl::get_instance_handles | ( | InstanceHandleVec & | instance_handles | ) |
Definition at line 2623 of file DataWriterImpl.cpp.
References data_container_.
Referenced by OpenDDS::DCPS::DWMonitorImpl::report().
02624 { 02625 this->data_container_->get_instance_handles(instance_handles); 02626 }
DDS::DataWriterListener_ptr OpenDDS::DCPS::DataWriterImpl::get_listener | ( | ) | [virtual] |
Implements DDS::DataWriter.
Definition at line 955 of file DataWriterImpl.cpp.
References CORBA::LocalObject::_duplicate(), and listener_.
00956 { 00957 return DDS::DataWriterListener::_duplicate(listener_.in()); 00958 }
DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::get_liveliness_lost_status | ( | DDS::LivelinessLostStatus & | status | ) | [virtual] |
Definition at line 1067 of file DataWriterImpl.cpp.
References DDS::LIVELINESS_LOST_STATUS, liveliness_lost_status_, lock_, DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), and DDS::LivelinessLostStatus::total_count_change.
01069 { 01070 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 01071 guard, 01072 this->lock_, 01073 DDS::RETCODE_ERROR); 01074 set_status_changed_flag(DDS::LIVELINESS_LOST_STATUS, false); 01075 status = liveliness_lost_status_; 01076 liveliness_lost_status_.total_count_change = 0; 01077 return DDS::RETCODE_OK; 01078 }
ACE_INLINE ACE_Recursive_Thread_Mutex& OpenDDS::DCPS::DataWriterImpl::get_lock | ( | ) | [inline] |
Accessor of the WriterDataContainer's lock.
Definition at line 364 of file DataWriterImpl.h.
References ACE_Recursive_Thread_Mutex::lock_.
Referenced by association_complete_i(), begin_coherent_changes(), coherent_changes_pending(), dispose(), dispose_and_unregister(), end_coherent_changes(), register_instance_from_durable_data(), send_request_ack(), unregister_instance_i(), and write().
00364 { 00365 return data_container_->lock_; 00366 }
DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::get_matched_subscription_data | ( | DDS::SubscriptionBuiltinTopicData & | subscription_data, | |
DDS::InstanceHandle_t | subscription_handle | |||
) | [virtual] |
Definition at line 1226 of file DataWriterImpl.cpp.
References ACE_TEXT(), OpenDDS::DCPS::BUILT_IN_SUBSCRIPTION_TOPIC, OpenDDS::DCPS::EntityImpl::enabled_, OpenDDS::DCPS::RcHandle< T >::in(), LM_ERROR, OpenDDS::DCPS::WeakRcHandle< T >::lock(), participant_servant_, DDS::RETCODE_ERROR, DDS::RETCODE_NOT_ENABLED, and DDS::RETCODE_OK.
01229 { 01230 if (enabled_ == false) { 01231 ACE_ERROR_RETURN((LM_ERROR, 01232 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::") 01233 ACE_TEXT("get_matched_subscription_data: ") 01234 ACE_TEXT("Entity is not enabled. \n")), 01235 DDS::RETCODE_NOT_ENABLED); 01236 } 01237 RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock(); 01238 01239 DDS::ReturnCode_t ret = DDS::RETCODE_ERROR; 01240 DDS::SubscriptionBuiltinTopicDataSeq data; 01241 01242 if (participant) { 01243 ret = instance_handle_to_bit_data<DDS::SubscriptionBuiltinTopicDataDataReader_var>( 01244 participant.in(), 01245 BUILT_IN_SUBSCRIPTION_TOPIC, 01246 subscription_handle, 01247 data); 01248 } 01249 01250 if (ret == DDS::RETCODE_OK) { 01251 subscription_data = data[0]; 01252 } 01253 01254 return ret; 01255 }
DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::get_matched_subscriptions | ( | DDS::InstanceHandleSeq & | subscription_handles | ) | [virtual] |
Definition at line 1193 of file DataWriterImpl.cpp.
References ACE_TEXT(), OpenDDS::DCPS::EntityImpl::enabled_, id_to_handle_map_, LM_ERROR, lock_, DDS::RETCODE_ERROR, DDS::RETCODE_NOT_ENABLED, and DDS::RETCODE_OK.
01195 { 01196 if (enabled_ == false) { 01197 ACE_ERROR_RETURN((LM_ERROR, 01198 ACE_TEXT("(%P|%t) ERROR: ") 01199 ACE_TEXT("DataWriterImpl::get_matched_subscriptions: ") 01200 ACE_TEXT(" Entity is not enabled. \n")), 01201 DDS::RETCODE_NOT_ENABLED); 01202 } 01203 01204 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 01205 guard, 01206 this->lock_, 01207 DDS::RETCODE_ERROR); 01208 01209 // Copy out the handles for the current set of subscriptions. 01210 int index = 0; 01211 subscription_handles.length( 01212 static_cast<CORBA::ULong>(this->id_to_handle_map_.size())); 01213 01214 for (RepoIdToHandleMap::iterator 01215 current = this->id_to_handle_map_.begin(); 01216 current != this->id_to_handle_map_.end(); 01217 ++current, ++index) { 01218 subscription_handles[index] = current->second; 01219 } 01220 01221 return DDS::RETCODE_OK; 01222 }
DDS::InstanceHandle_t OpenDDS::DCPS::DataWriterImpl::get_next_handle | ( | ) |
Get an instance handle for a new instance.
Definition at line 180 of file DataWriterImpl.cpp.
References OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::WeakRcHandle< T >::lock(), and participant_servant_.
Referenced by OpenDDS::DCPS::WriteDataContainer::register_instance().
00181 { 00182 using namespace OpenDDS::DCPS; 00183 RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock(); 00184 if (participant) 00185 return participant->id_to_handle(GUID_UNKNOWN); 00186 return 0; 00187 }
DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::get_offered_deadline_missed_status | ( | DDS::OfferedDeadlineMissedStatus & | status | ) | [virtual] |
Definition at line 1081 of file DataWriterImpl.cpp.
References last_deadline_missed_total_count_, lock_, DDS::OFFERED_DEADLINE_MISSED_STATUS, offered_deadline_missed_status_, DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), DDS::OfferedDeadlineMissedStatus::total_count, and DDS::OfferedDeadlineMissedStatus::total_count_change.
01083 { 01084 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 01085 guard, 01086 this->lock_, 01087 DDS::RETCODE_ERROR); 01088 01089 set_status_changed_flag(DDS::OFFERED_DEADLINE_MISSED_STATUS, false); 01090 01091 this->offered_deadline_missed_status_.total_count_change = 01092 this->offered_deadline_missed_status_.total_count 01093 - this->last_deadline_missed_total_count_; 01094 01095 // Update for next status check. 01096 this->last_deadline_missed_total_count_ = 01097 this->offered_deadline_missed_status_.total_count; 01098 01099 status = offered_deadline_missed_status_; 01100 01101 this->offered_deadline_missed_status_.total_count_change = 0; 01102 01103 return DDS::RETCODE_OK; 01104 }
DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::get_offered_incompatible_qos_status | ( | DDS::OfferedIncompatibleQosStatus & | status | ) | [virtual] |
Definition at line 1107 of file DataWriterImpl.cpp.
References lock_, DDS::OFFERED_INCOMPATIBLE_QOS_STATUS, offered_incompatible_qos_status_, DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), and DDS::OfferedIncompatibleQosStatus::total_count_change.
01109 { 01110 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 01111 guard, 01112 this->lock_, 01113 DDS::RETCODE_ERROR); 01114 set_status_changed_flag(DDS::OFFERED_INCOMPATIBLE_QOS_STATUS, false); 01115 status = offered_incompatible_qos_status_; 01116 offered_incompatible_qos_status_.total_count_change = 0; 01117 return DDS::RETCODE_OK; 01118 }
CORBA::Long OpenDDS::DCPS::DataWriterImpl::get_priority_value | ( | const AssociationData & | ) | const [inline, private, virtual] |
Implements OpenDDS::DCPS::TransportClient.
Definition at line 559 of file DataWriterImpl.h.
00559 { 00560 return this->qos_.transport_priority.value; 00561 }
RepoId OpenDDS::DCPS::DataWriterImpl::get_publication_id | ( | ) |
Accessor of the repository id of this datawriter/publication.
Definition at line 1953 of file DataWriterImpl.cpp.
References publication_id_.
Referenced by add_association(), OpenDDS::DCPS::PublisherImpl::delete_datawriter(), OpenDDS::Federator::ManagerImpl::initialize(), OpenDDS::DCPS::DWPeriodicMonitorImpl::report(), and OpenDDS::DCPS::DWMonitorImpl::report().
01954 { 01955 return publication_id_; 01956 }
DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::get_publication_matched_status | ( | DDS::PublicationMatchedStatus & | status | ) | [virtual] |
Definition at line 1121 of file DataWriterImpl.cpp.
References DDS::PublicationMatchedStatus::current_count_change, lock_, publication_match_status_, DDS::PUBLICATION_MATCHED_STATUS, DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), and DDS::PublicationMatchedStatus::total_count_change.
01123 { 01124 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 01125 guard, 01126 this->lock_, 01127 DDS::RETCODE_ERROR); 01128 set_status_changed_flag(DDS::PUBLICATION_MATCHED_STATUS, false); 01129 status = publication_match_status_; 01130 publication_match_status_.total_count_change = 0; 01131 publication_match_status_.current_count_change = 0; 01132 return DDS::RETCODE_OK; 01133 }
DDS::Publisher_ptr OpenDDS::DCPS::DataWriterImpl::get_publisher | ( | ) | [virtual] |
Implements DDS::DataWriter.
Definition at line 1061 of file DataWriterImpl.cpp.
References OpenDDS::DCPS::WeakRcHandle< T >::lock(), and publisher_servant_.
Referenced by OpenDDS::DCPS::PublisherImpl::delete_datawriter(), OpenDDS::DCPS::StaticDiscovery::pre_writer(), and OpenDDS::DCPS::DWMonitorImpl::report().
01062 { 01063 return publisher_servant_.lock()._retn(); 01064 }
DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::get_qos | ( | DDS::DataWriterQos & | qos | ) | [virtual] |
Definition at line 938 of file DataWriterImpl.cpp.
References qos_, and DDS::RETCODE_OK.
Referenced by OpenDDS::DCPS::StaticDiscovery::pre_writer().
00939 { 00940 qos = qos_; 00941 return DDS::RETCODE_OK; 00942 }
void OpenDDS::DCPS::DataWriterImpl::get_readers | ( | RepoIdSet & | readers | ) |
Definition at line 2629 of file DataWriterImpl.cpp.
References lock_, and readers_.
Referenced by OpenDDS::DCPS::DWMonitorImpl::report().
02630 { 02631 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_); 02632 readers = this->readers_; 02633 }
const RepoId& OpenDDS::DCPS::DataWriterImpl::get_repo_id | ( | ) | const [inline, private, virtual] |
Implements OpenDDS::DCPS::TransportClient.
Definition at line 551 of file DataWriterImpl.h.
00551 { 00552 return this->publication_id_; 00553 }
SendStateDataSampleList OpenDDS::DCPS::DataWriterImpl::get_resend_data | ( | ) | [inline] |
Definition at line 283 of file DataWriterImpl.h.
Referenced by association_complete_i().
00283 { 00284 return data_container_->get_resend_data(); 00285 }
DDS::Topic_ptr OpenDDS::DCPS::DataWriterImpl::get_topic | ( | ) | [virtual] |
Implements DDS::DataWriter.
Definition at line 961 of file DataWriterImpl.cpp.
References CORBA::LocalObject::_duplicate(), OpenDDS::DCPS::TopicDescriptionPtr< Topic >::get(), and topic_servant_.
Referenced by OpenDDS::DCPS::DWMonitorImpl::report().
00962 { 00963 return DDS::Topic::_duplicate(topic_servant_.get()); 00964 }
char const * OpenDDS::DCPS::DataWriterImpl::get_type_name | ( | ) | const |
Get associated topic type name.
Definition at line 1965 of file DataWriterImpl.cpp.
References type_name_.
Referenced by enable().
01966 { 01967 return type_name_.in(); 01968 }
ACE_UINT64 OpenDDS::DCPS::DataWriterImpl::get_unsent_data | ( | SendStateDataSampleList & | list | ) | [inline] |
Retrieve the unsent data from the WriteDataContainer.
Definition at line 279 of file DataWriterImpl.h.
Referenced by send_all_to_flush_control(), and write().
00279 { 00280 return data_container_->get_unsent_data(list); 00281 }
int OpenDDS::DCPS::DataWriterImpl::handle_timeout | ( | const ACE_Time_Value & | tv, | |
const void * | arg | |||
) | [virtual] |
Handle the assert liveliness timeout.
Definition at line 2343 of file DataWriterImpl.cpp.
References ACE_TEXT(), DDS::AUTOMATIC_LIVELINESS_QOS, ACE_Reactor_Timer_Interface::cancel_timer(), OpenDDS::DCPS::duration_to_time_value(), OpenDDS::DCPS::RcHandle< T >::in(), CORBA::is_nil(), last_liveliness_activity_time_, listener_for(), DDS::DataWriterQos::liveliness, liveliness_asserted_, liveliness_check_interval_, liveliness_lost_, DDS::LIVELINESS_LOST_STATUS, liveliness_lost_status_, liveness_timer_, LM_ERROR, DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS, DDS::MANUAL_BY_TOPIC_LIVELINESS_QOS, qos_, reactor_, ACE_Reactor_Timer_Interface::schedule_timer(), send_liveliness(), DDS::LivelinessLostStatus::total_count, and DDS::LivelinessLostStatus::total_count_change.
02345 { 02346 bool liveliness_lost = false; 02347 02348 ACE_Time_Value elapsed = tv - last_liveliness_activity_time_; 02349 02350 // Do we need to send a liveliness message? 02351 if (elapsed >= liveliness_check_interval_) { 02352 switch (this->qos_.liveliness.kind) { 02353 case DDS::AUTOMATIC_LIVELINESS_QOS: 02354 if (this->send_liveliness(tv) == false) { 02355 liveliness_lost = true; 02356 } 02357 break; 02358 02359 case DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS: 02360 if (liveliness_asserted_) { 02361 if (this->send_liveliness(tv) == false) { 02362 liveliness_lost = true; 02363 } 02364 } 02365 break; 02366 02367 case DDS::MANUAL_BY_TOPIC_LIVELINESS_QOS: 02368 // Do nothing. 02369 break; 02370 } 02371 } 02372 else { 02373 // Reschedule. 02374 if (reactor_->cancel_timer(liveness_timer_.in()) == -1) { 02375 ACE_ERROR((LM_ERROR, 02376 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::handle_timeout: %p.\n"), 02377 ACE_TEXT("cancel_timer"))); 02378 } 02379 if (reactor_->schedule_timer(liveness_timer_.in(), 0, liveliness_check_interval_ - elapsed, 02380 liveliness_check_interval_) == -1) 02381 { 02382 ACE_ERROR((LM_ERROR, 02383 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::handle_timeout: %p.\n"), 02384 ACE_TEXT("schedule_timer"))); 02385 } 02386 return 0; 02387 } 02388 02389 liveliness_asserted_ = false; 02390 elapsed = tv - last_liveliness_activity_time_; 02391 02392 // Have we lost liveliness? 02393 if (elapsed >= duration_to_time_value(qos_.liveliness.lease_duration)) { 02394 liveliness_lost = true; 02395 } 02396 02397 if (!this->liveliness_lost_ && liveliness_lost) { 02398 ++ this->liveliness_lost_status_.total_count; 02399 ++ this->liveliness_lost_status_.total_count_change; 02400 02401 DDS::DataWriterListener_var listener = 02402 listener_for(DDS::LIVELINESS_LOST_STATUS); 02403 02404 if (!CORBA::is_nil(listener.in())) { 02405 listener->on_liveliness_lost(this, this->liveliness_lost_status_); 02406 this->liveliness_lost_status_.total_count_change = 0; 02407 } 02408 } 02409 02410 this->liveliness_lost_ = liveliness_lost; 02411 return 0; 02412 }
void OpenDDS::DCPS::DataWriterImpl::inconsistent_topic | ( | ) | [virtual] |
Implements OpenDDS::DCPS::DataWriterCallbacks.
Definition at line 858 of file DataWriterImpl.cpp.
References topic_servant_.
00859 { 00860 topic_servant_->inconsistent_topic(); 00861 }
void OpenDDS::DCPS::DataWriterImpl::init | ( | TopicImpl * | topic_servant, | |
const DDS::DataWriterQos & | qos, | |||
DDS::DataWriterListener_ptr | a_listener, | |||
const DDS::StatusMask & | mask, | |||
WeakRcHandle< OpenDDS::DCPS::DomainParticipantImpl > | participant_servant, | |||
OpenDDS::DCPS::PublisherImpl * | publisher_servant | |||
) |
Initialize the data members.
Definition at line 131 of file DataWriterImpl.cpp.
References CORBA::LocalObject::_duplicate(), DBG_ENTRY_LVL, domain_id_, is_bit_, listener_, listener_mask_, OpenDDS::DCPS::WeakRcHandle< T >::lock(), participant_servant_, publisher_servant_, qos_, reactor_, TheServiceParticipant, topic_id_, topic_name_, topic_servant_, OpenDDS::DCPS::topicIsBIT(), and type_name_.
Referenced by OpenDDS::DCPS::PublisherImpl::create_datawriter().
00138 { 00139 DBG_ENTRY_LVL("DataWriterImpl","init",6); 00140 topic_servant_ = topic_servant; 00141 topic_name_ = topic_servant_->get_name(); 00142 topic_id_ = topic_servant_->get_id(); 00143 type_name_ = topic_servant_->get_type_name(); 00144 00145 #if !defined (DDS_HAS_MINIMUM_BIT) 00146 is_bit_ = topicIsBIT(topic_name_.in(), type_name_.in()); 00147 #endif // !defined (DDS_HAS_MINIMUM_BIT) 00148 00149 qos_ = qos; 00150 00151 //Note: OK to _duplicate(nil). 00152 listener_ = DDS::DataWriterListener::_duplicate(a_listener); 00153 listener_mask_ = mask; 00154 00155 // Only store the participant pointer, since it is our "grand" 00156 // parent, we will exist as long as it does. 00157 participant_servant_ = participant_servant; 00158 00159 RcHandle<DomainParticipantImpl> participant = participant_servant.lock(); 00160 domain_id_ = participant->get_domain_id(); 00161 00162 // Only store the publisher pointer, since it is our parent, we will 00163 // exist as long as it does. 00164 publisher_servant_ = *publisher_servant; 00165 00166 this->reactor_ = TheServiceParticipant->timer(); 00167 }
DDS::DataWriterListener_ptr OpenDDS::DCPS::DataWriterImpl::listener_for | ( | DDS::StatusKind | kind | ) |
This is used to retrieve the listener for a certain status change.
If this datawriter has a registered listener and the status kind is in the listener mask then the listener is returned. Otherwise, the query for the listener is propagated up to the factory/publisher.
Definition at line 2325 of file DataWriterImpl.cpp.
References CORBA::LocalObject::_duplicate(), CORBA::is_nil(), listener_, listener_mask_, OpenDDS::DCPS::WeakRcHandle< T >::lock(), and publisher_servant_.
Referenced by association_complete_i(), OpenDDS::DCPS::OfferedDeadlineWatchdog::execute(), handle_timeout(), and update_incompatible_qos().
02326 { 02327 // per 2.1.4.3.1 Listener Access to Plain Communication Status 02328 // use this entities factory if listener is mask not enabled 02329 // for this kind. 02330 RcHandle<PublisherImpl> publisher = publisher_servant_.lock(); 02331 if (!publisher) 02332 return 0; 02333 02334 if (CORBA::is_nil(listener_.in()) || (listener_mask_ & kind) == 0) { 02335 return publisher->listener_for(kind); 02336 02337 } else { 02338 return DDS::DataWriterListener::_duplicate(listener_.in()); 02339 } 02340 }
ACE_Time_Value OpenDDS::DCPS::DataWriterImpl::liveliness_check_interval | ( | DDS::LivelinessQosPolicyKind | kind | ) |
Definition at line 1173 of file DataWriterImpl.cpp.
References DDS::DataWriterQos::liveliness, liveliness_check_interval_, ACE_Time_Value::max_time, and qos_.
Referenced by OpenDDS::DCPS::DomainParticipantImpl::LivelinessTimer::add_adjust().
01174 { 01175 if (this->qos_.liveliness.kind == kind) { 01176 return liveliness_check_interval_; 01177 } else { 01178 return ACE_Time_Value::max_time; 01179 } 01180 }
void OpenDDS::DCPS::DataWriterImpl::lookup_instance_handles | ( | const ReaderIdSeq & | ids, | |
DDS::InstanceHandleSeq & | hdls | |||
) | [private] |
Lookup the instance handles by the subscription repo ids.
Definition at line 2558 of file DataWriterImpl.cpp.
References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, LM_DEBUG, OpenDDS::DCPS::WeakRcHandle< T >::lock(), OPENDDS_STRING, and participant_servant_.
Referenced by notify_publication_disconnected(), notify_publication_lost(), and notify_publication_reconnected().
02560 { 02561 CORBA::ULong const num_rds = ids.length(); 02562 RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock(); 02563 02564 if (!participant) 02565 return; 02566 02567 if (DCPS_debug_level > 9) { 02568 OPENDDS_STRING separator; 02569 OPENDDS_STRING buffer; 02570 02571 for (CORBA::ULong i = 0; i < num_rds; ++i) { 02572 buffer += separator + OPENDDS_STRING(GuidConverter(ids[i])); 02573 separator = ", "; 02574 } 02575 02576 ACE_DEBUG((LM_DEBUG, 02577 ACE_TEXT("(%P|%t) DataWriterImpl::lookup_instance_handles: ") 02578 ACE_TEXT("searching for handles for reader Ids: %C.\n"), 02579 buffer.c_str())); 02580 } 02581 02582 hdls.length(num_rds); 02583 02584 for (CORBA::ULong i = 0; i < num_rds; ++i) { 02585 hdls[i] = participant->id_to_handle(ids[i]); 02586 } 02587 }
bool OpenDDS::DCPS::DataWriterImpl::need_sequence_repair | ( | ) | [private] |
Definition at line 2655 of file DataWriterImpl.cpp.
References need_sequence_repair_i(), and reader_info_lock_.
Referenced by create_control_message(), and create_sample_data_message().
02656 { 02657 ACE_GUARD_RETURN(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_, false); 02658 return need_sequence_repair_i(); 02659 }
bool OpenDDS::DCPS::DataWriterImpl::need_sequence_repair_i | ( | ) | const [private] |
Definition at line 2662 of file DataWriterImpl.cpp.
References reader_info_, and sequence_number_.
Referenced by need_sequence_repair().
02663 { 02664 for (RepoIdToReaderInfoMap::const_iterator it = reader_info_.begin(), 02665 end = reader_info_.end(); it != end; ++it) { 02666 if (it->second.expected_sequence_ != sequence_number_) { 02667 return true; 02668 } 02669 } 02670 02671 return false; 02672 }
void OpenDDS::DCPS::DataWriterImpl::notify_publication_disconnected | ( | const ReaderIdSeq & | subids | ) | [virtual] |
Implements OpenDDS::DCPS::TransportSendListener.
Definition at line 2460 of file DataWriterImpl.cpp.
References OpenDDS::DCPS::LocalObject< DDS::DataWriter >::_narrow(), DBG_ENTRY_LVL, is_bit_, CORBA::is_nil(), listener_, lookup_instance_handles(), status, and OpenDDS::DCPS::PublicationLostStatus::subscription_handles.
02461 { 02462 DBG_ENTRY_LVL("DataWriterImpl","notify_publication_disconnected",6); 02463 02464 if (!is_bit_) { 02465 // Narrow to DDS::DCPS::DataWriterListener. If a DDS::DataWriterListener 02466 // is given to this DataWriter then narrow() fails. 02467 DataWriterListener_var the_listener = 02468 DataWriterListener::_narrow(this->listener_.in()); 02469 02470 if (!CORBA::is_nil(the_listener.in())) { 02471 PublicationDisconnectedStatus status; 02472 // Since this callback may come after remove_association which 02473 // removes the reader from id_to_handle map, we can ignore this 02474 // error. 02475 this->lookup_instance_handles(subids, 02476 status.subscription_handles); 02477 the_listener->on_publication_disconnected(this, status); 02478 } 02479 } 02480 }
void OpenDDS::DCPS::DataWriterImpl::notify_publication_lost | ( | const DDS::InstanceHandleSeq & | handles | ) | [private] |
Definition at line 2530 of file DataWriterImpl.cpp.
References OpenDDS::DCPS::LocalObject< DDS::DataWriter >::_narrow(), DBG_ENTRY_LVL, is_bit_, CORBA::is_nil(), listener_, status, and OpenDDS::DCPS::PublicationLostStatus::subscription_handles.
02531 { 02532 DBG_ENTRY_LVL("DataWriterImpl","notify_publication_lost",6); 02533 02534 if (!is_bit_) { 02535 // Narrow to DDS::DCPS::DataWriterListener. If a 02536 // DDS::DataWriterListener is given to this DataWriter then 02537 // narrow() fails. 02538 DataWriterListener_var the_listener = 02539 DataWriterListener::_narrow(this->listener_.in()); 02540 02541 if (!CORBA::is_nil(the_listener.in())) { 02542 PublicationLostStatus status; 02543 02544 CORBA::ULong len = handles.length(); 02545 status.subscription_handles.length(len); 02546 02547 for (CORBA::ULong i = 0; i < len; ++ i) { 02548 status.subscription_handles[i] = handles[i]; 02549 } 02550 02551 the_listener->on_publication_lost(this, status); 02552 } 02553 } 02554 }
void OpenDDS::DCPS::DataWriterImpl::notify_publication_lost | ( | const ReaderIdSeq & | subids | ) | [virtual] |
Implements OpenDDS::DCPS::TransportSendListener.
Definition at line 2506 of file DataWriterImpl.cpp.
References OpenDDS::DCPS::LocalObject< DDS::DataWriter >::_narrow(), DBG_ENTRY_LVL, is_bit_, CORBA::is_nil(), listener_, lookup_instance_handles(), status, and OpenDDS::DCPS::PublicationLostStatus::subscription_handles.
02507 { 02508 DBG_ENTRY_LVL("DataWriterImpl","notify_publication_lost",6); 02509 02510 if (!is_bit_) { 02511 // Narrow to DDS::DCPS::DataWriterListener. If a 02512 // DDS::DataWriterListener is given to this DataWriter then 02513 // narrow() fails. 02514 DataWriterListener_var the_listener = 02515 DataWriterListener::_narrow(this->listener_.in()); 02516 02517 if (!CORBA::is_nil(the_listener.in())) { 02518 PublicationLostStatus status; 02519 02520 // Since this callback may come after remove_association which removes 02521 // the reader from id_to_handle map, we can ignore this error. 02522 this->lookup_instance_handles(subids, 02523 status.subscription_handles); 02524 the_listener->on_publication_lost(this, status); 02525 } 02526 } 02527 }
void OpenDDS::DCPS::DataWriterImpl::notify_publication_reconnected | ( | const ReaderIdSeq & | subids | ) | [virtual] |
Implements OpenDDS::DCPS::TransportSendListener.
Definition at line 2483 of file DataWriterImpl.cpp.
References OpenDDS::DCPS::LocalObject< DDS::DataWriter >::_narrow(), DBG_ENTRY_LVL, is_bit_, CORBA::is_nil(), listener_, lookup_instance_handles(), status, and OpenDDS::DCPS::PublicationLostStatus::subscription_handles.
02484 { 02485 DBG_ENTRY_LVL("DataWriterImpl","notify_publication_reconnected",6); 02486 02487 if (!is_bit_) { 02488 // Narrow to DDS::DCPS::DataWriterListener. If a 02489 // DDS::DataWriterListener is given to this DataWriter then 02490 // narrow() fails. 02491 DataWriterListener_var the_listener = 02492 DataWriterListener::_narrow(this->listener_.in()); 02493 02494 if (!CORBA::is_nil(the_listener.in())) { 02495 PublicationDisconnectedStatus status; 02496 02497 // If it's reconnected then the reader should be in id_to_handle 02498 this->lookup_instance_handles(subids, status.subscription_handles); 02499 02500 the_listener->on_publication_reconnected(this, status); 02501 } 02502 } 02503 }
DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::num_samples | ( | DDS::InstanceHandle_t | handle, | |
size_t & | size | |||
) |
Return the number of samples for a given instance.
Definition at line 1940 of file DataWriterImpl.cpp.
References data_container_.
01942 { 01943 return data_container_->num_samples(handle, size); 01944 }
typedef OpenDDS::DCPS::DataWriterImpl::OPENDDS_MAP_CMP | ( | RepoId | , | |
DDS::InstanceHandle_t | , | |||
GUID_tKeyLessThan | ||||
) | [private] |
typedef OpenDDS::DCPS::DataWriterImpl::OPENDDS_MAP_CMP | ( | RepoId | , | |
ReaderInfo | , | |||
GUID_tKeyLessThan | ||||
) | [protected] |
typedef OpenDDS::DCPS::DataWriterImpl::OPENDDS_MAP_CMP | ( | RepoId | , | |
SequenceNumber | , | |||
GUID_tKeyLessThan | ||||
) |
typedef OpenDDS::DCPS::DataWriterImpl::OPENDDS_VECTOR | ( | DDS::InstanceHandle_t | ) |
RcHandle< EntityImpl > OpenDDS::DCPS::DataWriterImpl::parent | ( | void | ) | const [virtual] |
Reimplemented from OpenDDS::DCPS::EntityImpl.
Definition at line 2180 of file DataWriterImpl.cpp.
References OpenDDS::DCPS::WeakRcHandle< T >::lock(), and publisher_servant_.
02181 { 02182 return this->publisher_servant_.lock(); 02183 }
bool OpenDDS::DCPS::DataWriterImpl::participant_liveliness_activity_after | ( | const ACE_Time_Value & | tv | ) |
Definition at line 1183 of file DataWriterImpl.cpp.
References last_liveliness_activity_time_, DDS::DataWriterQos::liveliness, DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS, and qos_.
01184 { 01185 if (this->qos_.liveliness.kind == DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS) { 01186 return last_liveliness_activity_time_ > tv; 01187 } else { 01188 return false; 01189 } 01190 }
bool OpenDDS::DCPS::DataWriterImpl::persist_data | ( | ) |
Make sent data available beyond the lifetime of this DataWriter
.
Definition at line 2591 of file DataWriterImpl.cpp.
References data_container_.
Referenced by OpenDDS::DCPS::PublisherImpl::delete_datawriter().
02592 { 02593 return this->data_container_->persist_data(); 02594 }
void OpenDDS::DCPS::DataWriterImpl::prepare_to_delete | ( | ) | [protected] |
Definition at line 2442 of file DataWriterImpl.cpp.
References OpenDDS::DCPS::EntityImpl::set_deleted(), and OpenDDS::DCPS::TransportClient::stop_associating().
Referenced by OpenDDS::DCPS::PublisherImpl::delete_datawriter().
02443 { 02444 this->set_deleted(true); 02445 this->stop_associating(); 02446 }
void OpenDDS::DCPS::DataWriterImpl::register_for_reader | ( | const RepoId & | participant, | |
const RepoId & | writerid, | |||
const RepoId & | readerid, | |||
const TransportLocatorSeq & | locators, | |||
DiscoveryListener * | listener | |||
) | [virtual] |
Reimplemented from OpenDDS::DCPS::DataWriterCallbacks.
Definition at line 776 of file DataWriterImpl.cpp.
00781 { 00782 TransportClient::register_for_reader(participant, writerid, readerid, locators, listener); 00783 }
DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::register_instance_from_durable_data | ( | DDS::InstanceHandle_t & | handle, | |
Message_Block_Ptr | data, | |||
const DDS::Time_t & | source_timestamp | |||
) |
Delegate to the WriteDataContainer to register and tell the transport to broadcast the registered instance.
Definition at line 1563 of file DataWriterImpl.cpp.
References ACE_TEXT(), DBG_ENTRY_LVL, get_lock(), LM_ERROR, OpenDDS::DCPS::move(), register_instance_i(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, and send_all_to_flush_control().
Referenced by OpenDDS::DCPS::DataDurabilityCache::get_data().
01566 { 01567 DBG_ENTRY_LVL("DataWriterImpl","register_instance_from_durable_data",6); 01568 01569 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 01570 guard, 01571 get_lock(), 01572 DDS::RETCODE_ERROR); 01573 01574 DDS::ReturnCode_t ret = register_instance_i(handle, move(data), source_timestamp); 01575 if (ret != DDS::RETCODE_OK) { 01576 ACE_ERROR_RETURN((LM_ERROR, 01577 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::register_instance_from_durable_data: ") 01578 ACE_TEXT("register instance with container failed.\n")), 01579 ret); 01580 } 01581 01582 send_all_to_flush_control(guard); 01583 01584 return ret; 01585 }
DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::register_instance_i | ( | DDS::InstanceHandle_t & | handle, | |
Message_Block_Ptr | data, | |||
const DDS::Time_t & | source_timestamp | |||
) |
Delegate to the WriteDataContainer to register Must tell the transport to broadcast the registered instance upon returning.
Definition at line 1501 of file DataWriterImpl.cpp.
References ACE_TEXT(), create_control_message(), data_container_, DBG_ENTRY_LVL, OpenDDS::DCPS::EntityImpl::enabled_, OpenDDS::DCPS::DataSampleElement::get_header(), OpenDDS::DCPS::INSTANCE_REGISTRATION, LM_ERROR, monitor_, OpenDDS::DCPS::move(), OpenDDS::DCPS::Monitor::report(), DDS::RETCODE_NOT_ENABLED, DDS::RETCODE_OK, and OpenDDS::DCPS::DataSampleElement::set_sample().
Referenced by register_instance_from_durable_data().
01504 { 01505 DBG_ENTRY_LVL("DataWriterImpl","register_instance_i",6); 01506 01507 if (enabled_ == false) { 01508 ACE_ERROR_RETURN((LM_ERROR, 01509 ACE_TEXT("(%P|%t) ERROR: ") 01510 ACE_TEXT("DataWriterImpl::register_instance_i: ") 01511 ACE_TEXT(" Entity is not enabled. \n")), 01512 DDS::RETCODE_NOT_ENABLED); 01513 } 01514 01515 DDS::ReturnCode_t ret = 01516 this->data_container_->register_instance(handle, data); 01517 01518 if (ret != DDS::RETCODE_OK) { 01519 ACE_ERROR_RETURN((LM_ERROR, 01520 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::register_instance_i: ") 01521 ACE_TEXT("register instance with container failed.\n")), 01522 ret); 01523 } 01524 01525 if (this->monitor_) { 01526 this->monitor_->report(); 01527 } 01528 01529 DataSampleElement* element = 0; 01530 ret = this->data_container_->obtain_buffer_for_control(element); 01531 01532 if (ret != DDS::RETCODE_OK) { 01533 ACE_ERROR_RETURN((LM_ERROR, 01534 ACE_TEXT("(%P|%t) ERROR: ") 01535 ACE_TEXT("DataWriterImpl::register_instance_i: ") 01536 ACE_TEXT("obtain_buffer_for_control returned %d.\n"), 01537 ret), 01538 ret); 01539 } 01540 01541 // Add header with the registration sample data. 01542 Message_Block_Ptr sample(create_control_message(INSTANCE_REGISTRATION, 01543 element->get_header(), 01544 move(data), 01545 source_timestamp)); 01546 01547 element->set_sample(move(sample)); 01548 01549 ret = this->data_container_->enqueue_control(element); 01550 01551 if (ret != DDS::RETCODE_OK) { 01552 ACE_ERROR_RETURN((LM_ERROR, 01553 ACE_TEXT("(%P|%t) ERROR: ") 01554 ACE_TEXT("DataWriterImpl::register_instance_i: ") 01555 ACE_TEXT("enqueue_control failed.\n")), 01556 ret); 01557 } 01558 01559 return ret; 01560 }
void OpenDDS::DCPS::DataWriterImpl::remove_all_associations | ( | ) |
Definition at line 724 of file DataWriterImpl.cpp.
References ACE_TEXT(), DBG_ENTRY_LVL, LM_WARNING, lock_, pending_readers_, readers_, remove_associations(), size, and OpenDDS::DCPS::TransportClient::stop_associating().
Referenced by OpenDDS::DCPS::PublisherImpl::delete_datawriter().
00725 { 00726 DBG_ENTRY_LVL("DataWriterImpl", "remove_all_associations", 6); 00727 // stop pending associations 00728 this->stop_associating(); 00729 00730 OpenDDS::DCPS::ReaderIdSeq readers; 00731 CORBA::ULong size; 00732 CORBA::ULong num_pending_readers; 00733 { 00734 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, lock_); 00735 00736 num_pending_readers = static_cast<CORBA::ULong>(pending_readers_.size()); 00737 size = static_cast<CORBA::ULong>(readers_.size()) + num_pending_readers; 00738 readers.length(size); 00739 00740 RepoIdSet::iterator itEnd = readers_.end(); 00741 int i = 0; 00742 00743 for (RepoIdSet::iterator it = readers_.begin(); it != itEnd; ++it) { 00744 readers[i ++] = *it; 00745 } 00746 00747 itEnd = pending_readers_.end(); 00748 00749 for (RepoIdSet::iterator it = pending_readers_.begin(); it != itEnd; ++it) { 00750 readers[i ++] = *it; 00751 } 00752 00753 if (num_pending_readers > 0) { 00754 ACE_DEBUG((LM_WARNING, 00755 ACE_TEXT("(%P|%t) WARNING: DataWriterImpl::remove_all_associations() - ") 00756 ACE_TEXT("%d subscribers were pending and never fully associated.\n"), 00757 num_pending_readers)); 00758 } 00759 } 00760 00761 try { 00762 if (0 < size) { 00763 CORBA::Boolean dont_notify_lost = false; 00764 00765 this->remove_associations(readers, dont_notify_lost); 00766 } 00767 00768 } catch (const CORBA::Exception&) { 00769 ACE_DEBUG((LM_WARNING, 00770 ACE_TEXT("(%P|%t) WARNING: DataWriterImpl::remove_all_associations() - ") 00771 ACE_TEXT("caught exception from remove_associations.\n"))); 00772 } 00773 }
virtual void OpenDDS::DCPS::DataWriterImpl::remove_associations | ( | const ReaderIdSeq & | readers, | |
bool | callback | |||
) | [virtual] |
Implements OpenDDS::DCPS::TransportSendListener.
Referenced by remove_all_associations().
void OpenDDS::DCPS::DataWriterImpl::reschedule_deadline | ( | ) |
Definition at line 2598 of file DataWriterImpl.cpp.
References data_container_, OpenDDS::DCPS::RcHandle< T >::in(), and watchdog_.
02599 { 02600 if (this->watchdog_.in()) { 02601 this->data_container_->reschedule_deadline(); 02602 } 02603 }
void OpenDDS::DCPS::DataWriterImpl::retrieve_inline_qos_data | ( | TransportSendListener::InlineQosData & | qos_data | ) | const [virtual] |
Reimplemented from OpenDDS::DCPS::TransportSendListener.
Definition at line 2636 of file DataWriterImpl.cpp.
References OpenDDS::DCPS::TransportSendListener::InlineQosData::dw_qos, OpenDDS::DCPS::WeakRcHandle< T >::lock(), OpenDDS::DCPS::TransportSendListener::InlineQosData::pub_qos, publisher_servant_, qos_, OpenDDS::DCPS::TransportSendListener::InlineQosData::topic_name, and topic_name_.
02637 { 02638 RcHandle<PublisherImpl> publisher = this->publisher_servant_.lock(); 02639 if (publisher) { 02640 publisher->get_qos(qos_data.pub_qos); 02641 } 02642 qos_data.dw_qos = this->qos_; 02643 qos_data.topic_name = this->topic_name_.in(); 02644 }
void OpenDDS::DCPS::DataWriterImpl::send_all_to_flush_control | ( | ACE_Guard< ACE_Recursive_Thread_Mutex > & | guard | ) |
Definition at line 1484 of file DataWriterImpl.cpp.
References controlTracker, DBG_ENTRY_LVL, get_unsent_data(), OpenDDS::DCPS::MessageTracker::message_sent(), ACE_Guard< ACE_LOCK >::release(), and OpenDDS::DCPS::TransportClient::send().
Referenced by dispose(), dispose_and_unregister(), register_instance_from_durable_data(), send_request_ack(), and unregister_instance_i().
01485 { 01486 DBG_ENTRY_LVL("DataWriterImpl","send_all_to_flush_control",6); 01487 01488 SendStateDataSampleList list; 01489 01490 ACE_UINT64 transaction_id = this->get_unsent_data(list); 01491 01492 controlTracker.message_sent(); 01493 01494 //need to release guard to call down to transport 01495 guard.release(); 01496 01497 this->send(list, transaction_id); 01498 }
SendControlStatus OpenDDS::DCPS::DataWriterImpl::send_control | ( | const DataSampleHeader & | header, | |
Message_Block_Ptr | msg | |||
) | [protected, virtual] |
Reimplemented from OpenDDS::DCPS::TransportClient.
Definition at line 2675 of file DataWriterImpl.cpp.
References controlTracker, OpenDDS::DCPS::MessageTracker::message_dropped(), OpenDDS::DCPS::MessageTracker::message_sent(), OpenDDS::DCPS::move(), OpenDDS::DCPS::SEND_CONTROL_OK, and status.
Referenced by end_coherent_changes(), and send_liveliness().
02677 { 02678 controlTracker.message_sent(); 02679 02680 SendControlStatus status = TransportClient::send_control(header, move(msg)); 02681 02682 if (status != SEND_CONTROL_OK) { 02683 controlTracker.message_dropped(); 02684 } 02685 02686 return status; 02687 }
DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::send_end_historic_samples | ( | const RepoId & | readerId | ) | [private] |
bool OpenDDS::DCPS::DataWriterImpl::send_liveliness | ( | const ACE_Time_Value & | now | ) | [private] |
Send the liveliness message.
Definition at line 2415 of file DataWriterImpl.cpp.
References ACE_TEXT(), create_control_message(), OpenDDS::DCPS::DATAWRITER_LIVELINESS, domain_id_, header, last_liveliness_activity_time_, DDS::DataWriterQos::liveliness, LM_ERROR, DDS::MANUAL_BY_TOPIC_LIVELINESS_QOS, OpenDDS::DCPS::move(), qos_, send_control(), OpenDDS::DCPS::SEND_CONTROL_ERROR, TheServiceParticipant, and OpenDDS::DCPS::time_value_to_time().
Referenced by assert_liveliness(), and handle_timeout().
02416 { 02417 if (this->qos_.liveliness.kind == DDS::MANUAL_BY_TOPIC_LIVELINESS_QOS || 02418 !TheServiceParticipant->get_discovery(domain_id_)->supports_liveliness()) { 02419 DDS::Time_t t = time_value_to_time(now); 02420 DataSampleHeader header; 02421 Message_Block_Ptr empty; 02422 Message_Block_Ptr liveliness_msg( 02423 this->create_control_message(DATAWRITER_LIVELINESS, header, move(empty), t)); 02424 02425 if (this->send_control(header, move(liveliness_msg)) == SEND_CONTROL_ERROR) { 02426 ACE_ERROR_RETURN((LM_ERROR, 02427 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::send_liveliness: ") 02428 ACE_TEXT(" send_control failed. \n")), 02429 false); 02430 02431 } else { 02432 last_liveliness_activity_time_ = now; 02433 return true; 02434 } 02435 } else { 02436 last_liveliness_activity_time_ = now; 02437 return true; 02438 } 02439 }
DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::send_request_ack | ( | ) | [private] |
Definition at line 990 of file DataWriterImpl.cpp.
References ACE_TEXT(), create_control_message(), data_container_, OpenDDS::DCPS::DataSampleElement::get_header(), get_lock(), ACE_OS::gettimeofday(), LM_ERROR, OpenDDS::DCPS::move(), OpenDDS::DCPS::REQUEST_ACK, DDS::RETCODE_ERROR, DDS::RETCODE_OK, send_all_to_flush_control(), OpenDDS::DCPS::DataSampleElement::set_sample(), and OpenDDS::DCPS::time_value_to_time().
Referenced by wait_for_acknowledgments().
00991 { 00992 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 00993 guard, 00994 get_lock(), 00995 DDS::RETCODE_ERROR); 00996 00997 00998 DataSampleElement* element = 0; 00999 DDS::ReturnCode_t ret = this->data_container_->obtain_buffer_for_control(element); 01000 01001 if (ret != DDS::RETCODE_OK) { 01002 ACE_ERROR_RETURN((LM_ERROR, 01003 ACE_TEXT("(%P|%t) ERROR: ") 01004 ACE_TEXT("DataWriterImpl::send_request_ack: ") 01005 ACE_TEXT("obtain_buffer_for_control returned %d.\n"), 01006 ret), 01007 ret); 01008 } 01009 01010 Message_Block_Ptr blk; 01011 // Add header with the registration sample data. 01012 Message_Block_Ptr sample(create_control_message(REQUEST_ACK, 01013 element->get_header(), 01014 move(blk), 01015 time_value_to_time( ACE_OS::gettimeofday() ))); 01016 element->set_sample(move(sample)); 01017 01018 ret = this->data_container_->enqueue_control(element); 01019 01020 if (ret != DDS::RETCODE_OK) { 01021 ACE_ERROR_RETURN((LM_ERROR, 01022 ACE_TEXT("(%P|%t) ERROR: ") 01023 ACE_TEXT("DataWriterImpl::send_request_ack: ") 01024 ACE_TEXT("enqueue_control failed.\n")), 01025 ret); 01026 } 01027 01028 01029 send_all_to_flush_control(guard); 01030 01031 return DDS::RETCODE_OK; 01032 }
void OpenDDS::DCPS::DataWriterImpl::send_suspended_data | ( | ) |
Called by the PublisherImpl to indicate that the Publisher is now resumed and any data collected while it was suspended should now be sent.
Definition at line 1861 of file DataWriterImpl.cpp.
References available_data_list_, max_suspended_transaction_id_, min_suspended_transaction_id_, OpenDDS::DCPS::SendStateDataSampleList::reset(), and OpenDDS::DCPS::TransportClient::send().
01862 { 01863 //this serves to get TransportClient's max_transaction_id_seen_ 01864 //to the correct value for this list of transactions 01865 if (max_suspended_transaction_id_ != 0) { 01866 this->send(this->available_data_list_, max_suspended_transaction_id_); 01867 max_suspended_transaction_id_ = 0; 01868 } 01869 01870 //this serves to actually have the send proceed in 01871 //sending the samples to the datalinks by passing it 01872 //the min_suspended_transaction_id_ which should be the 01873 //TransportClient's expected_transaction_id_ 01874 this->send(this->available_data_list_, min_suspended_transaction_id_); 01875 min_suspended_transaction_id_ = 0; 01876 this->available_data_list_.reset(); 01877 }
DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::set_listener | ( | DDS::DataWriterListener_ptr | a_listener, | |
DDS::StatusMask | mask | |||
) | [virtual] |
Definition at line 945 of file DataWriterImpl.cpp.
References CORBA::LocalObject::_duplicate(), listener_, listener_mask_, and DDS::RETCODE_OK.
Referenced by cleanup().
00947 { 00948 listener_mask_ = mask; 00949 //note: OK to duplicate a nil object ref 00950 listener_ = DDS::DataWriterListener::_duplicate(a_listener); 00951 return DDS::RETCODE_OK; 00952 }
DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::set_qos | ( | const DDS::DataWriterQos & | qos | ) | [virtual] |
Definition at line 864 of file DataWriterImpl.cpp.
References ACE_TEXT(), OpenDDS::DCPS::Qos_Helper::changeable(), OpenDDS::DCPS::Qos_Helper::consistent(), DDS::DataWriterQos::deadline, domain_id_, dp_id_, DDS::DURATION_INFINITE_NSEC, DDS::DURATION_INFINITE_SEC, OpenDDS::DCPS::duration_to_time_value(), OpenDDS::DCPS::EntityImpl::enabled_, last_deadline_missed_total_count_, LM_ERROR, OpenDDS::DCPS::WeakRcHandle< T >::lock(), lock_, offered_deadline_missed_status_, OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK, OPENDDS_NO_DURABILITY_SERVICE_COMPATIBILITY_CHECK, OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK, OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK, OPENDDS_NO_OWNERSHIP_STRENGTH_COMPATIBILITY_CHECK, publication_id_, publisher_servant_, qos_, OpenDDS::DCPS::ref(), OpenDDS::DCPS::RcHandle< T >::reset(), DDS::RETCODE_ERROR, DDS::RETCODE_IMMUTABLE_POLICY, DDS::RETCODE_INCONSISTENT_POLICY, DDS::RETCODE_OK, DDS::RETCODE_UNSUPPORTED, status, TheServiceParticipant, OpenDDS::DCPS::Qos_Helper::valid(), and watchdog_.
00865 { 00866 00867 OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED); 00868 OPENDDS_NO_OWNERSHIP_STRENGTH_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED); 00869 OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED); 00870 OPENDDS_NO_DURABILITY_SERVICE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED); 00871 OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED); 00872 00873 if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) { 00874 if (qos_ == qos) 00875 return DDS::RETCODE_OK; 00876 00877 if (!Qos_Helper::changeable(qos_, qos) && enabled_ == true) { 00878 return DDS::RETCODE_IMMUTABLE_POLICY; 00879 00880 } else { 00881 Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_); 00882 DDS::PublisherQos publisherQos; 00883 RcHandle<PublisherImpl> publisher = this->publisher_servant_.lock(); 00884 00885 bool status = false; 00886 if (publisher) { 00887 publisher->get_qos(publisherQos); 00888 status 00889 = disco->update_publication_qos(domain_id_, 00890 dp_id_, 00891 this->publication_id_, 00892 qos, 00893 publisherQos); 00894 } 00895 if (!status) { 00896 ACE_ERROR_RETURN((LM_ERROR, 00897 ACE_TEXT("(%P|%t) DataWriterImpl::set_qos, ") 00898 ACE_TEXT("qos not updated. \n")), 00899 DDS::RETCODE_ERROR); 00900 } 00901 } 00902 00903 if (!(qos_ == qos)) { 00904 // Reset the deadline timer if the period has changed. 00905 if (qos_.deadline.period.sec != qos.deadline.period.sec 00906 || qos_.deadline.period.nanosec != qos.deadline.period.nanosec) { 00907 if (qos_.deadline.period.sec == DDS::DURATION_INFINITE_SEC 00908 && qos_.deadline.period.nanosec == DDS::DURATION_INFINITE_NSEC) { 00909 this->watchdog_= make_rch<OfferedDeadlineWatchdog>( 00910 ref(this->lock_), 00911 qos.deadline, 00912 ref(*this), 00913 ref(this->offered_deadline_missed_status_), 00914 ref(this->last_deadline_missed_total_count_)); 00915 00916 } else if (qos.deadline.period.sec == DDS::DURATION_INFINITE_SEC 00917 && qos.deadline.period.nanosec == DDS::DURATION_INFINITE_NSEC) { 00918 this->watchdog_->cancel_all(); 00919 this->watchdog_.reset(); 00920 00921 } else { 00922 this->watchdog_->reset_interval( 00923 duration_to_time_value(qos.deadline.period)); 00924 } 00925 } 00926 00927 qos_ = qos; 00928 } 00929 00930 return DDS::RETCODE_OK; 00931 00932 } else { 00933 return DDS::RETCODE_INCONSISTENT_POLICY; 00934 } 00935 }
bool OpenDDS::DCPS::DataWriterImpl::should_ack | ( | ) | const |
Does this writer have samples to be acknowledged?
Definition at line 967 of file DataWriterImpl.cpp.
References readers_.
00968 { 00969 // N.B. It may be worthwhile to investigate a more efficient 00970 // heuristic for determining if a writer should send SAMPLE_ACK 00971 // control samples. Perhaps based on a sequence number delta? 00972 return this->readers_.size() != 0; 00973 }
void OpenDDS::DCPS::DataWriterImpl::track_sequence_number | ( | GUIDSeq * | filter_out | ) | [private] |
Definition at line 1828 of file DataWriterImpl.cpp.
References reader_info_, reader_info_lock_, and sequence_number_.
Referenced by write().
01829 { 01830 ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_); 01831 01832 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC 01833 // Track individual expected sequence numbers in ReaderInfo 01834 RepoIdSet excluded; 01835 01836 if (filter_out && !reader_info_.empty()) { 01837 const GUID_t* buf = filter_out->get_buffer(); 01838 excluded.insert(buf, buf + filter_out->length()); 01839 } 01840 01841 for (RepoIdToReaderInfoMap::iterator iter = reader_info_.begin(), 01842 end = reader_info_.end(); iter != end; ++iter) { 01843 // If not excluding this reader, update expected sequence 01844 if (excluded.count(iter->first) == 0) { 01845 iter->second.expected_sequence_ = sequence_number_; 01846 } 01847 } 01848 01849 #else 01850 ACE_UNUSED_ARG(filter_out); 01851 for (RepoIdToReaderInfoMap::iterator iter = reader_info_.begin(), 01852 end = reader_info_.end(); iter != end; ++iter) { 01853 iter->second.expected_sequence_ = sequence_number_; 01854 } 01855 01856 #endif // OPENDDS_NO_CONTENT_FILTERED_TOPIC 01857 01858 }
void OpenDDS::DCPS::DataWriterImpl::transport_assoc_done | ( | int | flags, | |
const RepoId & | remote_id | |||
) | [virtual] |
Reimplemented from OpenDDS::DCPS::TransportClient.
Definition at line 254 of file DataWriterImpl.cpp.
References ACE_TEXT(), OpenDDS::DCPS::TransportClient::ASSOC_ACTIVE, assoc_complete_readers_, OpenDDS::DCPS::TransportClient::ASSOC_OK, association_complete_i(), DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, domain_id_, dp_id_, OpenDDS::DCPS::insert(), LM_DEBUG, LM_ERROR, LM_INFO, lock_, OPENDDS_STRING, pending_readers_, publication_id_, and TheServiceParticipant.
00255 { 00256 DBG_ENTRY_LVL("DataWriterImpl", "transport_assoc_done", 6); 00257 00258 if (!(flags & ASSOC_OK)) { 00259 if (DCPS_debug_level) { 00260 const GuidConverter conv(remote_id); 00261 ACE_ERROR((LM_ERROR, 00262 ACE_TEXT("(%P|%t) DataWriterImpl::transport_assoc_done: ") 00263 ACE_TEXT("ERROR: transport layer failed to associate %C\n"), 00264 OPENDDS_STRING(conv).c_str())); 00265 } 00266 00267 return; 00268 } 00269 if (DCPS_debug_level) { 00270 const GuidConverter writer_conv(publication_id_); 00271 const GuidConverter conv(remote_id); 00272 ACE_DEBUG((LM_INFO, 00273 ACE_TEXT("(%P|%t) DataWriterImpl::transport_assoc_done: ") 00274 ACE_TEXT(" writer %C succeeded in associating with reader %C\n"), 00275 OPENDDS_STRING(writer_conv).c_str(), 00276 OPENDDS_STRING(conv).c_str())); 00277 } 00278 if (flags & ASSOC_ACTIVE) { 00279 00280 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, lock_); 00281 00282 // Have we already received an association_complete() callback? 00283 if (assoc_complete_readers_.count(remote_id)) { 00284 if (DCPS_debug_level) { 00285 const GuidConverter writer_conv(publication_id_); 00286 const GuidConverter converter(remote_id); 00287 ACE_DEBUG((LM_DEBUG, 00288 ACE_TEXT("(%P|%t) DataWriterImpl::transport_assoc_done: ") 00289 ACE_TEXT("writer %C found assoc_complete_reader %C, continue with association_complete_i\n"), 00290 OPENDDS_STRING(writer_conv).c_str(), 00291 OPENDDS_STRING(converter).c_str())); 00292 } 00293 assoc_complete_readers_.erase(remote_id); 00294 association_complete_i(remote_id); 00295 00296 // Add to pending_readers_ -> pending means we are waiting 00297 // for the association_complete() callback. 00298 00299 } else if (OpenDDS::DCPS::insert(pending_readers_, remote_id) == -1) { 00300 const GuidConverter converter(remote_id); 00301 ACE_ERROR((LM_ERROR, 00302 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::transport_assoc_done: ") 00303 ACE_TEXT("failed to mark %C as pending.\n"), 00304 OPENDDS_STRING(converter).c_str())); 00305 00306 } else { 00307 if (DCPS_debug_level) { 00308 const GuidConverter converter(remote_id); 00309 ACE_DEBUG((LM_DEBUG, 00310 ACE_TEXT("(%P|%t) DataWriterImpl::transport_assoc_done: ") 00311 ACE_TEXT("marked %C as pending.\n"), 00312 OPENDDS_STRING(converter).c_str())); 00313 } 00314 } 00315 00316 } else { 00317 // In the current implementation, DataWriter is always active, so this 00318 // code will not be applicable. 00319 if (DCPS_debug_level) { 00320 const GuidConverter conv(publication_id_); 00321 ACE_ERROR((LM_ERROR, 00322 ACE_TEXT("(%P|%t) DataWriterImpl::transport_assoc_done: ") 00323 ACE_TEXT("ERROR: DataWriter (%C) should always be active in current implementation\n"), 00324 OPENDDS_STRING(conv).c_str())); 00325 } 00326 Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_); 00327 disco->association_complete(domain_id_, dp_id_, 00328 publication_id_, remote_id); 00329 } 00330 }
void OpenDDS::DCPS::DataWriterImpl::unregister_all | ( | ) |
Delegate to WriteDataContainer to unregister all instances.
Definition at line 1947 of file DataWriterImpl.cpp.
References data_container_.
Referenced by OpenDDS::DCPS::PublisherImpl::delete_datawriter().
01948 { 01949 data_container_->unregister_all(); 01950 }
void OpenDDS::DCPS::DataWriterImpl::unregister_for_reader | ( | const RepoId & | participant, | |
const RepoId & | writerid, | |||
const RepoId & | readerid | |||
) | [virtual] |
Reimplemented from OpenDDS::DCPS::DataWriterCallbacks.
Definition at line 786 of file DataWriterImpl.cpp.
00789 { 00790 TransportClient::unregister_for_reader(participant, writerid, readerid); 00791 }
DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::unregister_instance_i | ( | DDS::InstanceHandle_t | handle, | |
const DDS::Time_t & | source_timestamp | |||
) |
Delegate to the WriteDataContainer to unregister and tell the transport to broadcast the unregistered instance.
Definition at line 1588 of file DataWriterImpl.cpp.
References ACE_TEXT(), create_control_message(), data_container_, DBG_ENTRY_LVL, dispose_and_unregister(), OpenDDS::DCPS::EntityImpl::enabled_, OpenDDS::DCPS::DataSampleElement::get_header(), get_lock(), LM_ERROR, OpenDDS::DCPS::move(), qos_, DDS::RETCODE_ERROR, DDS::RETCODE_NOT_ENABLED, DDS::RETCODE_OK, send_all_to_flush_control(), OpenDDS::DCPS::DataSampleElement::set_sample(), OpenDDS::DCPS::UNREGISTER_INSTANCE, and DDS::DataWriterQos::writer_data_lifecycle.
Referenced by OpenDDS::DCPS::DataWriterImpl_T< MessageType >::unregister_instance_w_timestamp(), and unregister_instances().
01590 { 01591 DBG_ENTRY_LVL("DataWriterImpl","unregister_instance_i",6); 01592 01593 if (enabled_ == false) { 01594 ACE_ERROR_RETURN((LM_ERROR, 01595 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::unregister_instance_i: ") 01596 ACE_TEXT(" Entity is not enabled.\n")), 01597 DDS::RETCODE_NOT_ENABLED); 01598 } 01599 01600 // According to spec 1.2, autodispose_unregistered_instances true causes 01601 // dispose on the instance prior to calling unregister operation. 01602 if (this->qos_.writer_data_lifecycle.autodispose_unregistered_instances) { 01603 return this->dispose_and_unregister(handle, source_timestamp); 01604 } 01605 01606 DDS::ReturnCode_t ret = DDS::RETCODE_ERROR; 01607 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, get_lock(), ret); 01608 Message_Block_Ptr unregistered_sample_data; 01609 ret = this->data_container_->unregister(handle, unregistered_sample_data); 01610 01611 if (ret != DDS::RETCODE_OK) { 01612 ACE_ERROR_RETURN((LM_ERROR, 01613 ACE_TEXT("(%P|%t) ERROR: ") 01614 ACE_TEXT("DataWriterImpl::unregister_instance_i: ") 01615 ACE_TEXT(" unregister with container failed. \n")), 01616 ret); 01617 } 01618 01619 DataSampleElement* element = 0; 01620 ret = this->data_container_->obtain_buffer_for_control(element); 01621 01622 if (ret != DDS::RETCODE_OK) { 01623 ACE_ERROR_RETURN((LM_ERROR, 01624 ACE_TEXT("(%P|%t) ERROR: ") 01625 ACE_TEXT("DataWriterImpl::unregister_instance_i: ") 01626 ACE_TEXT("obtain_buffer_for_control returned %d.\n"), 01627 ret), 01628 ret); 01629 } 01630 01631 Message_Block_Ptr sample(create_control_message(UNREGISTER_INSTANCE, 01632 element->get_header(), 01633 move(unregistered_sample_data), 01634 source_timestamp)); 01635 element->set_sample(move(sample)); 01636 ret = this->data_container_->enqueue_control(element); 01637 01638 if (ret != DDS::RETCODE_OK) { 01639 ACE_ERROR_RETURN((LM_ERROR, 01640 ACE_TEXT("(%P|%t) ERROR: ") 01641 ACE_TEXT("DataWriterImpl::unregister_instance_i: ") 01642 ACE_TEXT("enqueue_control failed.\n")), 01643 ret); 01644 } 01645 01646 send_all_to_flush_control(guard); 01647 return DDS::RETCODE_OK; 01648 }
void OpenDDS::DCPS::DataWriterImpl::unregister_instances | ( | const DDS::Time_t & | source_timestamp | ) |
Unregister all registered instances and tell the transport to broadcast the unregistered instances.
Definition at line 1712 of file DataWriterImpl.cpp.
References data_container_, sync_unreg_rem_assocs_lock_, and unregister_instance_i().
Referenced by OpenDDS::DCPS::PublisherImpl::delete_datawriter().
01713 { 01714 { 01715 ACE_GUARD(ACE_Thread_Mutex, guard, sync_unreg_rem_assocs_lock_); 01716 01717 PublicationInstanceMapType::iterator it = 01718 this->data_container_->instances_.begin(); 01719 01720 while (it != this->data_container_->instances_.end()) { 01721 if (!it->second->unregistered_) { 01722 const DDS::InstanceHandle_t handle = it->first; 01723 ++it; // avoid mangling the iterator 01724 this->unregister_instance_i(handle, source_timestamp); 01725 } else { 01726 ++it; 01727 } 01728 } 01729 } 01730 }
void OpenDDS::DCPS::DataWriterImpl::update_incompatible_qos | ( | const IncompatibleQosStatus & | status | ) | [virtual] |
Implements OpenDDS::DCPS::DataWriterCallbacks.
Definition at line 794 of file DataWriterImpl.cpp.
References OpenDDS::DCPS::IncompatibleQosStatus::count_since_last_send, CORBA::is_nil(), OpenDDS::DCPS::IncompatibleQosStatus::last_policy_id, DDS::OfferedIncompatibleQosStatus::last_policy_id, listener_for(), lock_, OpenDDS::DCPS::EntityImpl::notify_status_condition(), DDS::OFFERED_INCOMPATIBLE_QOS_STATUS, offered_incompatible_qos_status_, OpenDDS::DCPS::IncompatibleQosStatus::policies, DDS::OfferedIncompatibleQosStatus::policies, OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), OpenDDS::DCPS::IncompatibleQosStatus::total_count, DDS::OfferedIncompatibleQosStatus::total_count, and DDS::OfferedIncompatibleQosStatus::total_count_change.
00795 { 00796 DDS::DataWriterListener_var listener = 00797 listener_for(DDS::OFFERED_INCOMPATIBLE_QOS_STATUS); 00798 00799 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_); 00800 00801 #if 0 00802 00803 if (this->offered_incompatible_qos_status_.total_count == status.total_count) { 00804 // This test should make the method idempotent. 00805 return; 00806 } 00807 00808 #endif 00809 00810 set_status_changed_flag(DDS::OFFERED_INCOMPATIBLE_QOS_STATUS, true); 00811 00812 // copy status and increment change 00813 offered_incompatible_qos_status_.total_count = status.total_count; 00814 offered_incompatible_qos_status_.total_count_change += 00815 status.count_since_last_send; 00816 offered_incompatible_qos_status_.last_policy_id = status.last_policy_id; 00817 offered_incompatible_qos_status_.policies = status.policies; 00818 00819 if (!CORBA::is_nil(listener.in())) { 00820 listener->on_offered_incompatible_qos(this, offered_incompatible_qos_status_); 00821 00822 // TBD - Why does the spec say to change this but not change the 00823 // ChangeFlagStatus after a listener call? 00824 offered_incompatible_qos_status_.total_count_change = 0; 00825 } 00826 00827 notify_status_condition(); 00828 }
void OpenDDS::DCPS::DataWriterImpl::update_subscription_params | ( | const RepoId & | readerId, | |
const DDS::StringSeq & | params | |||
) | [virtual] |
Implements OpenDDS::DCPS::DataWriterCallbacks.
Definition at line 831 of file DataWriterImpl.cpp.
References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, LM_WARNING, lock_, OPENDDS_STRING, publication_id_, reader_info_, reader_info_lock_, and TheServiceParticipant.
00833 { 00834 #ifdef OPENDDS_NO_CONTENT_FILTERED_TOPIC 00835 ACE_UNUSED_ARG(readerId); 00836 ACE_UNUSED_ARG(params); 00837 #else 00838 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_); 00839 ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_); 00840 RepoIdToReaderInfoMap::iterator iter = reader_info_.find(readerId); 00841 00842 if (iter != reader_info_.end()) { 00843 iter->second.expression_params_ = params; 00844 00845 } else if (DCPS_debug_level > 4 && 00846 TheServiceParticipant->publisher_content_filter()) { 00847 GuidConverter pubConv(this->publication_id_), subConv(readerId); 00848 ACE_DEBUG((LM_WARNING, 00849 ACE_TEXT("(%P|%t) WARNING: DataWriterImpl::update_subscription_params()") 00850 ACE_TEXT(" - writer: %C has no info about reader: %C\n"), 00851 OPENDDS_STRING(pubConv).c_str(), OPENDDS_STRING(subConv).c_str())); 00852 } 00853 00854 #endif 00855 }
void OpenDDS::DCPS::DataWriterImpl::wait_control_pending | ( | ) |
Wait until pending control elements have either been delivered or dropped.
Definition at line 2606 of file DataWriterImpl.cpp.
References controlTracker, OpenDDS::DCPS::TransportRegistry::instance(), OPENDDS_STRING, and OpenDDS::DCPS::MessageTracker::wait_messages_pending().
Referenced by OpenDDS::DCPS::PublisherImpl::delete_datawriter().
02607 { 02608 if (!TransportRegistry::instance()->released()) { 02609 OPENDDS_STRING caller_string("DataWriterImpl::wait_control_pending"); 02610 controlTracker.wait_messages_pending(caller_string); 02611 } 02612 }
DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::wait_for_acknowledgments | ( | const DDS::Duration_t & | max_wait | ) | [virtual] |
Definition at line 1035 of file DataWriterImpl.cpp.
References ACE_TEXT(), create_ack_token(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::SequenceNumber::getValue(), LM_DEBUG, qos_, DDS::DataWriterQos::reliability, DDS::RELIABLE_RELIABILITY_QOS, DDS::RETCODE_OK, send_request_ack(), OpenDDS::DCPS::DataWriterImpl::AckToken::sequence_, and wait_for_specific_ack().
01036 { 01037 if (this->qos_.reliability.kind != DDS::RELIABLE_RELIABILITY_QOS) 01038 return DDS::RETCODE_OK; 01039 01040 DDS::ReturnCode_t ret = send_request_ack(); 01041 01042 if (ret != DDS::RETCODE_OK) 01043 return ret; 01044 01045 DataWriterImpl::AckToken token = create_ack_token(max_wait); 01046 if (DCPS_debug_level) { 01047 ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) DataWriterImpl::wait_for_acknowledgments") 01048 ACE_TEXT(" waiting for acknowledgment of sequence %q at %T\n"), 01049 token.sequence_.getValue())); 01050 } 01051 return wait_for_specific_ack(token); 01052 }
DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::wait_for_specific_ack | ( | const AckToken & | token | ) | [protected] |
Definition at line 1055 of file DataWriterImpl.cpp.
References data_container_, OpenDDS::DCPS::DataWriterImpl::AckToken::deadline(), and OpenDDS::DCPS::DataWriterImpl::AckToken::sequence_.
Referenced by wait_for_acknowledgments().
01056 { 01057 return this->data_container_->wait_ack_of_seq(token.deadline(), token.sequence_); 01058 }
void OpenDDS::DCPS::DataWriterImpl::wait_pending | ( | ) |
Wait for pending samples to drain.
Definition at line 2615 of file DataWriterImpl.cpp.
References data_container_, and OpenDDS::DCPS::TransportRegistry::instance().
Referenced by OpenDDS::DCPS::PublisherImpl::delete_datawriter().
02616 { 02617 if (!TransportRegistry::instance()->released()) { 02618 data_container_->wait_pending(); 02619 } 02620 }
DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::write | ( | Message_Block_Ptr | sample, | |
DDS::InstanceHandle_t | handle, | |||
const DDS::Time_t & | source_timestamp, | |||
GUIDSeq * | filter_out | |||
) |
Delegate to the WriteDataContainer to queue the instance sample and finally tell the transport to send the sample.
filter_out | can either be null (if the writer can't or won't evaluate the filters), or a list of associated reader RepoIds that should NOT get the data sample due to content filtering. |
Definition at line 1733 of file DataWriterImpl.cpp.
References ACE_TEXT(), available_data_list_, coherent_, coherent_samples_, create_sample_data_message(), data_container_, DBG_ENTRY_LVL, OpenDDS::DCPS::EntityImpl::enabled_, OpenDDS::DCPS::SendStateDataSampleList::enqueue_tail(), OpenDDS::DCPS::DataSampleElement::get_header(), get_lock(), get_unsent_data(), ACE_OS::gettimeofday(), last_liveliness_activity_time_, LM_ERROR, OpenDDS::DCPS::WeakRcHandle< T >::lock(), max_suspended_transaction_id_, min_suspended_transaction_id_, OpenDDS::DCPS::move(), publisher_servant_, DDS::RETCODE_ERROR, DDS::RETCODE_NOT_ENABLED, DDS::RETCODE_OK, DDS::RETCODE_TIMEOUT, OpenDDS::DCPS::TransportClient::send(), OpenDDS::DCPS::DataSampleElement::set_filter_out(), OpenDDS::DCPS::DataSampleElement::set_sample(), and track_sequence_number().
Referenced by OpenDDS::DCPS::DataDurabilityCache::get_data().
01737 { 01738 DBG_ENTRY_LVL("DataWriterImpl","write",6); 01739 01740 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, 01741 guard, 01742 get_lock (), 01743 DDS::RETCODE_ERROR); 01744 01745 // take ownership of sequence allocated in FooDWImpl::write_w_timestamp() 01746 GUIDSeq_var filter_out_var(filter_out); 01747 01748 if (enabled_ == false) { 01749 ACE_ERROR_RETURN((LM_ERROR, 01750 ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::write: ") 01751 ACE_TEXT(" Entity is not enabled. \n")), 01752 DDS::RETCODE_NOT_ENABLED); 01753 } 01754 01755 DataSampleElement* element = 0; 01756 DDS::ReturnCode_t ret = this->data_container_->obtain_buffer(element, handle); 01757 01758 if (ret == DDS::RETCODE_TIMEOUT) { 01759 return ret; // silent for timeout 01760 01761 } else if (ret != DDS::RETCODE_OK) { 01762 ACE_ERROR_RETURN((LM_ERROR, 01763 ACE_TEXT("(%P|%t) ERROR: ") 01764 ACE_TEXT("DataWriterImpl::write: ") 01765 ACE_TEXT("obtain_buffer returned %d.\n"), 01766 ret), 01767 ret); 01768 } 01769 01770 Message_Block_Ptr temp; 01771 ret = create_sample_data_message(move(data), 01772 handle, 01773 element->get_header(), 01774 temp, 01775 source_timestamp, 01776 (filter_out != 0)); 01777 element->set_sample(move(temp)); 01778 01779 if (ret != DDS::RETCODE_OK) { 01780 return ret; 01781 } 01782 01783 element->set_filter_out(filter_out_var._retn()); // ownership passed to element 01784 01785 ret = this->data_container_->enqueue(element, handle); 01786 01787 if (ret != DDS::RETCODE_OK) { 01788 ACE_ERROR_RETURN((LM_ERROR, 01789 ACE_TEXT("(%P|%t) ERROR: ") 01790 ACE_TEXT("DataWriterImpl::write: ") 01791 ACE_TEXT("enqueue failed.\n")), 01792 ret); 01793 } 01794 this->last_liveliness_activity_time_ = ACE_OS::gettimeofday(); 01795 01796 track_sequence_number(filter_out); 01797 01798 if (this->coherent_) { 01799 ++this->coherent_samples_; 01800 } 01801 SendStateDataSampleList list; 01802 01803 ACE_UINT64 transaction_id = this->get_unsent_data(list); 01804 01805 RcHandle<PublisherImpl> publisher = this->publisher_servant_.lock(); 01806 if (!publisher || publisher->is_suspended()) { 01807 if (min_suspended_transaction_id_ == 0) { 01808 //provides transaction id for lower bound of suspended transactions 01809 //or transaction id for single suspended write transaction 01810 min_suspended_transaction_id_ = transaction_id; 01811 } else { 01812 //when multiple write transactions have suspended, provides the upper bound 01813 //for suspended transactions. 01814 max_suspended_transaction_id_ = transaction_id; 01815 } 01816 this->available_data_list_.enqueue_tail(list); 01817 01818 } else { 01819 guard.release(); 01820 01821 this->send(list, transaction_id); 01822 } 01823 01824 return DDS::RETCODE_OK; 01825 }
friend class ::DDS_TEST [friend] |
Reimplemented from OpenDDS::DCPS::TransportClient.
Reimplemented in OpenDDS::DCPS::DataWriterImpl_T< MessageType >.
Definition at line 569 of file DataWriterImpl.h.
friend class PublisherImpl [friend] |
Definition at line 91 of file DataWriterImpl.h.
friend class WriteDataContainer [friend] |
Definition at line 90 of file DataWriterImpl.h.
Referenced by enable().
RepoIdSet OpenDDS::DCPS::DataWriterImpl::assoc_complete_readers_ [private] |
Definition at line 661 of file DataWriterImpl.h.
Referenced by association_complete(), and transport_assoc_done().
The multiplier for allocators affected by associations.
Definition at line 480 of file DataWriterImpl.h.
Referenced by enable().
Definition at line 666 of file DataWriterImpl.h.
Referenced by association_complete_i(), send_suspended_data(), and write().
bool OpenDDS::DCPS::DataWriterImpl::coherent_ [private] |
Flag indicating DataWriter current belongs to a coherent change set.
Definition at line 598 of file DataWriterImpl.h.
Referenced by begin_coherent_changes(), coherent_changes_pending(), create_sample_data_message(), end_coherent_changes(), and write().
The number of samples belonging to the current coherent change set.
Definition at line 601 of file DataWriterImpl.h.
Referenced by end_coherent_changes(), and write().
Definition at line 407 of file DataWriterImpl.h.
Referenced by association_complete_i(), control_delivered(), control_dropped(), OpenDDS::DCPS::WriteDataContainer::data_delivered(), OpenDDS::DCPS::WriteDataContainer::data_dropped(), send_all_to_flush_control(), send_control(), and wait_control_pending().
The sample data container.
Definition at line 603 of file DataWriterImpl.h.
Referenced by association_complete_i(), create_sample_data_message(), data_delivered(), data_dropped(), dispose(), dispose_and_unregister(), enable(), get_handle_instance(), get_instance_handles(), num_samples(), persist_data(), register_instance_i(), reschedule_deadline(), send_request_ack(), unregister_all(), unregister_instance_i(), unregister_instances(), wait_for_specific_ack(), wait_pending(), and write().
Definition at line 405 of file DataWriterImpl.h.
Referenced by data_delivered().
The data block allocator.
Reimplemented in OpenDDS::DCPS::DataWriterImpl_T< MessageType >.
Definition at line 639 of file DataWriterImpl.h.
Referenced by create_control_message(), create_sample_data_message(), and enable().
Definition at line 573 of file DataWriterImpl.h.
The domain id.
Definition at line 588 of file DataWriterImpl.h.
Referenced by enable(), init(), send_liveliness(), set_qos(), and transport_assoc_done().
RepoId OpenDDS::DCPS::DataWriterImpl::dp_id_ [private] |
Definition at line 589 of file DataWriterImpl.h.
Referenced by enable(), get_dp_id(), set_qos(), and transport_assoc_done().
The header data allocator.
Definition at line 641 of file DataWriterImpl.h.
Referenced by create_sample_data_message(), and enable().
RepoIdToHandleMap OpenDDS::DCPS::DataWriterImpl::id_to_handle_map_ [private] |
Definition at line 610 of file DataWriterImpl.h.
Referenced by association_complete_i(), and get_matched_subscriptions().
bool OpenDDS::DCPS::DataWriterImpl::is_bit_ [private] |
Flag indicates that this datawriter is a builtin topic datawriter.
Definition at line 659 of file DataWriterImpl.h.
Referenced by add_association(), association_complete(), association_complete_i(), init(), notify_publication_disconnected(), notify_publication_lost(), and notify_publication_reconnected().
Total number of offered deadlines missed during last offered deadline status check.
Definition at line 652 of file DataWriterImpl.h.
Referenced by enable(), get_offered_deadline_missed_status(), and set_qos().
Timestamp of last write/dispose/assert_liveliness.
Definition at line 649 of file DataWriterImpl.h.
Referenced by handle_timeout(), participant_liveliness_activity_after(), send_liveliness(), and write().
DDS::DataWriterListener_var OpenDDS::DCPS::DataWriterImpl::listener_ [private] |
Used to notify the entity for relevant events.
Definition at line 586 of file DataWriterImpl.h.
Referenced by get_listener(), init(), listener_for(), notify_publication_disconnected(), notify_publication_lost(), notify_publication_reconnected(), and set_listener().
The StatusKind bit mask indicates which status condition change can be notified by the listener of this entity.
Definition at line 584 of file DataWriterImpl.h.
Referenced by init(), listener_for(), and set_listener().
bool OpenDDS::DCPS::DataWriterImpl::liveliness_asserted_ [private] |
Definition at line 683 of file DataWriterImpl.h.
Referenced by assert_liveliness_by_participant(), and handle_timeout().
The time interval for sending liveliness message.
Definition at line 647 of file DataWriterImpl.h.
Referenced by enable(), handle_timeout(), and liveliness_check_interval().
bool OpenDDS::DCPS::DataWriterImpl::liveliness_lost_ [private] |
True if the writer failed to actively signal its liveliness within its offered liveliness period.
Definition at line 622 of file DataWriterImpl.h.
Referenced by handle_timeout().
Status conditions.
Definition at line 615 of file DataWriterImpl.h.
Referenced by DataWriterImpl(), get_liveliness_lost_status(), and handle_timeout().
Definition at line 688 of file DataWriterImpl.h.
Referenced by enable(), and handle_timeout().
The lock to protect the activate subscriptions and status changes.
Reimplemented from OpenDDS::DCPS::EntityImpl.
Definition at line 606 of file DataWriterImpl.h.
Referenced by association_complete(), association_complete_i(), enable(), get_liveliness_lost_status(), get_matched_subscriptions(), get_offered_deadline_missed_status(), get_offered_incompatible_qos_status(), get_publication_matched_status(), get_readers(), remove_all_associations(), set_qos(), transport_assoc_done(), update_incompatible_qos(), and update_subscription_params().
Definition at line 665 of file DataWriterImpl.h.
Referenced by send_suspended_data(), and write().
The message block allocator.
Reimplemented in OpenDDS::DCPS::DataWriterImpl_T< MessageType >.
Definition at line 637 of file DataWriterImpl.h.
Referenced by create_control_message(), create_sample_data_message(), and enable().
The cached available data while suspending and associated transaction ids.
Definition at line 664 of file DataWriterImpl.h.
Referenced by send_suspended_data(), and write().
Monitor* OpenDDS::DCPS::DataWriterImpl::monitor_ [private] |
Monitor object for this entity.
Definition at line 669 of file DataWriterImpl.h.
Referenced by association_complete_i(), DataWriterImpl(), enable(), and register_instance_i().
size_t OpenDDS::DCPS::DataWriterImpl::n_chunks_ [protected] |
The number of chunks for the cached allocator.
Definition at line 477 of file DataWriterImpl.h.
Referenced by enable().
DDS::OfferedDeadlineMissedStatus OpenDDS::DCPS::DataWriterImpl::offered_deadline_missed_status_ [private] |
Definition at line 616 of file DataWriterImpl.h.
Referenced by DataWriterImpl(), enable(), get_offered_deadline_missed_status(), and set_qos().
DDS::OfferedIncompatibleQosStatus OpenDDS::DCPS::DataWriterImpl::offered_incompatible_qos_status_ [private] |
Definition at line 617 of file DataWriterImpl.h.
Referenced by DataWriterImpl(), get_offered_incompatible_qos_status(), and update_incompatible_qos().
The participant servant which creats the publisher that creates this datawriter.
Definition at line 491 of file DataWriterImpl.h.
Referenced by add_association(), assert_liveliness(), association_complete_i(), enable(), get_instance_handle(), get_matched_subscription_data(), get_next_handle(), init(), and lookup_instance_handles().
RepoIdSet OpenDDS::DCPS::DataWriterImpl::pending_readers_ [private] |
Definition at line 661 of file DataWriterImpl.h.
Referenced by association_complete(), remove_all_associations(), and transport_assoc_done().
Periodic Monitor object for this entity.
Definition at line 672 of file DataWriterImpl.h.
Referenced by DataWriterImpl().
The repository id of this datawriter/publication.
Definition at line 593 of file DataWriterImpl.h.
Referenced by add_association(), association_complete(), association_complete_i(), create_control_message(), create_sample_data_message(), data_delivered(), enable(), get_instance_handle(), get_publication_id(), set_qos(), transport_assoc_done(), and update_subscription_params().
Definition at line 618 of file DataWriterImpl.h.
Referenced by association_complete_i(), DataWriterImpl(), and get_publication_matched_status().
The publisher servant which creates this datawriter.
Definition at line 591 of file DataWriterImpl.h.
Referenced by association_complete_i(), create_control_message(), create_sample_data_message(), enable(), end_coherent_changes(), get_publisher(), init(), listener_for(), parent(), retrieve_inline_qos_data(), set_qos(), and write().
The qos policy list of this datawriter.
Definition at line 487 of file DataWriterImpl.h.
Referenced by add_association(), assert_liveliness(), assert_liveliness_by_participant(), association_complete_i(), create_sample_data_message(), enable(), get_qos(), handle_timeout(), init(), liveliness_check_interval(), OpenDDS::DCPS::WriteDataContainer::obtain_buffer(), participant_liveliness_activity_after(), retrieve_inline_qos_data(), send_liveliness(), set_qos(), unregister_instance_i(), and wait_for_acknowledgments().
The orb's reactor to be used to register the liveliness timer.
Definition at line 645 of file DataWriterImpl.h.
Referenced by enable(), handle_timeout(), and init().
RepoIdToReaderInfoMap OpenDDS::DCPS::DataWriterImpl::reader_info_ [protected] |
Definition at line 512 of file DataWriterImpl.h.
Referenced by add_association(), association_complete_i(), create_control_message(), need_sequence_repair_i(), track_sequence_number(), and update_subscription_params().
Definition at line 494 of file DataWriterImpl.h.
Referenced by add_association(), association_complete_i(), create_control_message(), need_sequence_repair(), track_sequence_number(), and update_subscription_params().
RepoIdSet OpenDDS::DCPS::DataWriterImpl::readers_ [private] |
Definition at line 612 of file DataWriterImpl.h.
Referenced by association_complete_i(), get_readers(), remove_all_associations(), and should_ack().
The sequence number unique in DataWriter scope.
Definition at line 595 of file DataWriterImpl.h.
Referenced by create_ack_token(), create_control_message(), create_sample_data_message(), end_coherent_changes(), need_sequence_repair_i(), and track_sequence_number().
Definition at line 687 of file DataWriterImpl.h.
Referenced by unregister_instances().
The associated topic repository id.
Definition at line 578 of file DataWriterImpl.h.
Referenced by init().
The name of associated topic.
Definition at line 576 of file DataWriterImpl.h.
Referenced by enable(), init(), and retrieve_inline_qos_data().
The topic servant.
Definition at line 580 of file DataWriterImpl.h.
Referenced by cleanup(), enable(), filter_out(), get_topic(), inconsistent_topic(), and init().
The type name of associated topic.
Definition at line 484 of file DataWriterImpl.h.
Referenced by get_type_name(), and init().
Watchdog responsible for reporting missed offered deadlines.
Definition at line 655 of file DataWriterImpl.h.
Referenced by OpenDDS::DCPS::WriteDataContainer::dispose(), enable(), OpenDDS::DCPS::WriteDataContainer::enqueue(), OpenDDS::DCPS::WriteDataContainer::register_instance(), OpenDDS::DCPS::WriteDataContainer::reschedule_deadline(), reschedule_deadline(), set_qos(), and OpenDDS::DCPS::WriteDataContainer::unregister().