#include <DataWriterImpl_T.h>
Inheritance diagram for OpenDDS::DCPS::DataWriterImpl_T< MessageType >:
Public Types | |
typedef DDSTraits< MessageType > | TraitsType |
typedef ::OpenDDS::DCPS::Dynamic_Cached_Allocator_With_Overflow< ACE_Thread_Mutex > | DataAllocator |
cdr_header_size = 4 | |
enum | { cdr_header_size = 4 } |
Public Member Functions | |
typedef | OPENDDS_MAP_CMP (MessageType,::DDS::InstanceHandle_t, typename TraitsType::LessThanType) InstanceMap |
DataWriterImpl_T (void) | |
Constructor. | |
virtual | ~DataWriterImpl_T (void) |
Destructor. | |
virtual ::DDS::InstanceHandle_t | register_instance (const MessageType &instance) |
virtual ::DDS::InstanceHandle_t | register_instance_w_timestamp (const MessageType &instance, const ::DDS::Time_t ×tamp) |
virtual ::DDS::ReturnCode_t | unregister_instance (const MessageType &instance,::DDS::InstanceHandle_t handle) |
virtual ::DDS::ReturnCode_t | unregister_instance_w_timestamp (const MessageType &instance,::DDS::InstanceHandle_t handle, const ::DDS::Time_t ×tamp) |
virtual ::DDS::ReturnCode_t | write (const MessageType &instance_data,::DDS::InstanceHandle_t handle) |
virtual ::DDS::ReturnCode_t | write_w_timestamp (const MessageType &instance_data,::DDS::InstanceHandle_t handle, const ::DDS::Time_t &source_timestamp) |
virtual ::DDS::ReturnCode_t | dispose (const MessageType &instance_data,::DDS::InstanceHandle_t instance_handle) |
virtual ::DDS::ReturnCode_t | dispose_w_timestamp (const MessageType &instance_data,::DDS::InstanceHandle_t instance_handle, const ::DDS::Time_t &source_timestamp) |
virtual ::DDS::ReturnCode_t | get_key_value (MessageType &key_holder,::DDS::InstanceHandle_t handle) |
virtual ::DDS::InstanceHandle_t | lookup_instance (const MessageType &instance_data) |
virtual void | init (::DDS::Topic_ptr topic, OpenDDS::DCPS::TopicImpl *topic_servant, const ::DDS::DataWriterQos &qos,::DDS::DataWriterListener_ptr a_listener, const ::DDS::StatusMask &mask, OpenDDS::DCPS::DomainParticipantImpl *participant_servant, OpenDDS::DCPS::PublisherImpl *publisher_servant,::DDS::DataWriter_ptr dw_objref) |
virtual ::DDS::ReturnCode_t | enable_specific () |
virtual void | unregistered (::DDS::InstanceHandle_t instance_handle) |
ACE_INLINE DataAllocator * | data_allocator () const |
Private Member Functions | |
ACE_Message_Block * | dds_marshal (const MessageType &instance_data, OpenDDS::DCPS::MarshalingType marshaling_type) |
::DDS::ReturnCode_t | get_or_create_instance_handle (::DDS::InstanceHandle_t &handle, const MessageType &instance_data, const ::DDS::Time_t &source_timestamp) |
Private Attributes | |
InstanceMap | instance_map_ |
size_t | marshaled_size_ |
size_t | key_marshaled_size_ |
DataAllocator * | data_allocator_ |
::OpenDDS::DCPS::MessageBlockAllocator * | mb_allocator_ |
The message block allocator. | |
::OpenDDS::DCPS::DataBlockAllocator * | db_allocator_ |
The data block allocator. | |
Friends | |
class | ::DDS_TEST |
See the DDS specification, OMG formal/04-12-02, for a description of this interface.
Definition at line 20 of file DataWriterImpl_T.h.
typedef ::OpenDDS::DCPS::Dynamic_Cached_Allocator_With_Overflow<ACE_Thread_Mutex> OpenDDS::DCPS::DataWriterImpl_T< MessageType >::DataAllocator |
Definition at line 33 of file DataWriterImpl_T.h.
typedef DDSTraits<MessageType> OpenDDS::DCPS::DataWriterImpl_T< MessageType >::TraitsType |
Definition at line 29 of file DataWriterImpl_T.h.
anonymous enum |
OpenDDS::DCPS::DataWriterImpl_T< MessageType >::DataWriterImpl_T | ( | void | ) | [inline] |
Constructor.
Definition at line 40 of file DataWriterImpl_T.h.
00041 : marshaled_size_ (0) 00042 , key_marshaled_size_ (0) 00043 , data_allocator_ (0) 00044 , mb_allocator_ (0) 00045 , db_allocator_ (0) 00046 { 00047 }
virtual OpenDDS::DCPS::DataWriterImpl_T< MessageType >::~DataWriterImpl_T | ( | void | ) | [inline, virtual] |
Destructor.
Definition at line 50 of file DataWriterImpl_T.h.
00051 { 00052 delete data_allocator_; 00053 delete mb_allocator_; 00054 delete db_allocator_; 00055 }
ACE_INLINE DataAllocator* OpenDDS::DCPS::DataWriterImpl_T< MessageType >::data_allocator | ( | ) | const [inline] |
Accessor to the marshalled data sample allocator.
Definition at line 404 of file DataWriterImpl_T.h.
00404 { 00405 return data_allocator_; 00406 };
ACE_Message_Block* OpenDDS::DCPS::DataWriterImpl_T< MessageType >::dds_marshal | ( | const MessageType & | instance_data, | |
OpenDDS::DCPS::MarshalingType | marshaling_type | |||
) | [inline, private] |
Serialize the instance data.
instance_data | The data to serialize. | |
marshaling_type | Enumerated type specifying whether to marshal just the keys or the entire message. |
Definition at line 418 of file DataWriterImpl_T.h.
References OpenDDS::DCPS::Serializer::ALIGN_CDR, OpenDDS::DCPS::Serializer::ALIGN_NONE, OpenDDS::DCPS::gen_find_size(), OpenDDS::DCPS::KEY_ONLY_MARSHALING, OpenDDS::DCPS::Serializer::reset_alignment(), OpenDDS::DCPS::swap(), and OpenDDS::DCPS::Serializer::use_rti_serialization().
00421 { 00422 const bool cdr = this->cdr_encapsulation(), swap = this->swap_bytes(); 00423 00424 ACE_Message_Block* mb; 00425 if (marshaling_type == OpenDDS::DCPS::KEY_ONLY_MARSHALING) { 00426 // Don't use the cached allocator for the registered sample message 00427 // block. 00428 00429 OpenDDS::DCPS::KeyOnly<const MessageType > ko_instance_data(instance_data); 00430 size_t effective_size = 0, padding = 0; 00431 if (key_marshaled_size_) { 00432 effective_size = key_marshaled_size_; 00433 } else { 00434 if (cdr && !Serializer::use_rti_serialization()) { 00435 effective_size = cdr_header_size; // CDR encapsulation 00436 } 00437 TraitsType::gen_find_size(ko_instance_data, effective_size, padding); 00438 if (cdr && Serializer::use_rti_serialization()) { 00439 effective_size += (cdr_header_size); 00440 } 00441 } 00442 if (cdr) { 00443 effective_size += padding; 00444 } 00445 ACE_NEW_RETURN(mb, ACE_Message_Block(effective_size, 00446 ACE_Message_Block::MB_DATA, 00447 0, //cont 00448 0, //data 00449 0, //alloc_strategy 00450 get_db_lock()), 0); 00451 OpenDDS::DCPS::Serializer serializer(mb, swap, cdr 00452 ? OpenDDS::DCPS::Serializer::ALIGN_CDR 00453 : OpenDDS::DCPS::Serializer::ALIGN_NONE); 00454 if (cdr) { 00455 serializer << ACE_OutputCDR::from_octet(0); 00456 serializer << ACE_OutputCDR::from_octet(swap ? !ACE_CDR_BYTE_ORDER : ACE_CDR_BYTE_ORDER); 00457 serializer << ACE_CDR::UShort(0); 00458 } 00459 // If this is RTI serialization, start counting byte offset AFTER 00460 // the header 00461 if (cdr && Serializer::use_rti_serialization()) { 00462 // Start counting byte-offset AFTER header 00463 serializer.reset_alignment(); 00464 } 00465 serializer << ko_instance_data; 00466 } else { // OpenDDS::DCPS::FULL_MARSHALING 00467 size_t effective_size = 0, padding = 0; 00468 if (marshaled_size_) { 00469 effective_size = marshaled_size_; 00470 } else { 00471 if (cdr && !Serializer::use_rti_serialization()) { 00472 effective_size = cdr_header_size; // CDR encapsulation 00473 } 00474 TraitsType::gen_find_size(instance_data, effective_size, padding); 00475 if (cdr && Serializer::use_rti_serialization()) { 00476 effective_size += (cdr_header_size); 00477 } 00478 } 00479 if (cdr) { 00480 effective_size += padding; 00481 } 00482 ACE_NEW_MALLOC_RETURN(mb, 00483 static_cast<ACE_Message_Block*>( 00484 mb_allocator_->malloc( 00485 sizeof(ACE_Message_Block))), 00486 ACE_Message_Block( 00487 effective_size, 00488 ACE_Message_Block::MB_DATA, 00489 0, //cont 00490 0, //data 00491 data_allocator_, //allocator_strategy 00492 get_db_lock(), //data block locking_strategy 00493 ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY, 00494 ACE_Time_Value::zero, 00495 ACE_Time_Value::max_time, 00496 db_allocator_, 00497 mb_allocator_), 00498 0); 00499 OpenDDS::DCPS::Serializer serializer(mb, swap, cdr 00500 ? OpenDDS::DCPS::Serializer::ALIGN_CDR 00501 : OpenDDS::DCPS::Serializer::ALIGN_NONE); 00502 if (cdr) { 00503 serializer << ACE_OutputCDR::from_octet(0); 00504 serializer << ACE_OutputCDR::from_octet(swap ? !ACE_CDR_BYTE_ORDER : ACE_CDR_BYTE_ORDER); 00505 serializer << ACE_CDR::UShort(0); 00506 } 00507 // If this is RTI serialization, start counting byte offset AFTER 00508 // the header 00509 if (cdr && Serializer::use_rti_serialization()) { 00510 // Start counting byte-offset AFTER header 00511 serializer.reset_alignment(); 00512 } 00513 serializer << instance_data; 00514 } 00515 00516 return mb; 00517 }
virtual ::DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl_T< MessageType >::dispose | ( | const MessageType & | instance_data, | |
::DDS::InstanceHandle_t | instance_handle | |||
) | [inline] |
Definition at line 214 of file DataWriterImpl_T.h.
References dispose_w_timestamp(), and OpenDDS::DCPS::time_value_to_time().
00217 { 00218 ::DDS::Time_t const source_timestamp = 00219 ::OpenDDS::DCPS::time_value_to_time (ACE_OS::gettimeofday ()); 00220 return dispose_w_timestamp (instance_data, 00221 instance_handle, 00222 source_timestamp); 00223 }
virtual ::DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl_T< MessageType >::dispose_w_timestamp | ( | const MessageType & | instance_data, | |
::DDS::InstanceHandle_t | instance_handle, | |||
const ::DDS::Time_t & | source_timestamp | |||
) | [inline] |
Definition at line 225 of file DataWriterImpl_T.h.
References OpenDDS::DCPS::DataWriterImpl::dispose(), DDS::HANDLE_NIL, lookup_instance(), and DDS::RETCODE_ERROR.
00229 { 00230 if(instance_handle == ::DDS::HANDLE_NIL) 00231 { 00232 instance_handle = this->lookup_instance(instance_data); 00233 if (instance_handle == ::DDS::HANDLE_NIL) 00234 { 00235 ACE_ERROR_RETURN ((LM_ERROR, 00236 ACE_TEXT("(%P|%t) ") 00237 ACE_TEXT("%CDataWriterImpl::dispose, ") 00238 ACE_TEXT("The instance sample is not registered.\n"), 00239 TraitsType::type_name()), 00240 ::DDS::RETCODE_ERROR); 00241 } 00242 } 00243 00244 return OpenDDS::DCPS::DataWriterImpl::dispose(instance_handle, 00245 source_timestamp); 00246 }
virtual ::DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl_T< MessageType >::enable_specific | ( | ) | [inline, virtual] |
Do parts of enable specific to the datatype. Called by DataWriterImpl::enable().
Implements OpenDDS::DCPS::DataWriterImpl.
Definition at line 335 of file DataWriterImpl_T.h.
References OpenDDS::DCPS::DCPS_debug_level, and DDS::RETCODE_OK.
00336 { 00337 MessageType data; 00338 if (TraitsType::gen_is_bounded_size (data)) 00339 { 00340 data_allocator_ = new DataAllocator (n_chunks_, marshaled_size_); 00341 if (::OpenDDS::DCPS::DCPS_debug_level >= 2) 00342 ACE_DEBUG((LM_DEBUG, 00343 ACE_TEXT("(%P|%t) %CDataWriterImpl::") 00344 ACE_TEXT("enable_specific-data") 00345 ACE_TEXT(" Dynamic_Cached_Allocator_With_Overflow %x ") 00346 ACE_TEXT("with %d chunks\n"), 00347 TraitsType::type_name(), 00348 data_allocator_, 00349 n_chunks_)); 00350 } 00351 else 00352 { 00353 if (::OpenDDS::DCPS::DCPS_debug_level >= 2) 00354 ACE_DEBUG((LM_DEBUG, 00355 ACE_TEXT("(%P|%t) %CDataWriterImpl::enable_specific") 00356 ACE_TEXT(" is unbounded data - allocate from heap\n"), TraitsType::type_name())); 00357 } 00358 00359 mb_allocator_ = 00360 new ::OpenDDS::DCPS::MessageBlockAllocator ( 00361 n_chunks_ * association_chunk_multiplier_); 00362 db_allocator_ = new ::OpenDDS::DCPS::DataBlockAllocator (n_chunks_); 00363 00364 if (::OpenDDS::DCPS::DCPS_debug_level >= 2) 00365 { 00366 ACE_DEBUG((LM_DEBUG, 00367 ACE_TEXT("(%P|%t) %CDataWriterImpl::") 00368 ACE_TEXT("enable_specific-mb ") 00369 ACE_TEXT("Cached_Allocator_With_Overflow ") 00370 ACE_TEXT("%x with %d chunks\n"), 00371 TraitsType::type_name(), 00372 mb_allocator_, 00373 n_chunks_ * association_chunk_multiplier_)); 00374 ACE_DEBUG((LM_DEBUG, 00375 ACE_TEXT("(%P|%t) %CDataWriterImpl::") 00376 ACE_TEXT("enable_specific-db ") 00377 ACE_TEXT("Cached_Allocator_With_Overflow ") 00378 ACE_TEXT("%x with %d chunks\n"), 00379 TraitsType::type_name(), 00380 db_allocator_, 00381 n_chunks_)); 00382 } 00383 00384 return ::DDS::RETCODE_OK; 00385 }
virtual ::DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl_T< MessageType >::get_key_value | ( | MessageType & | key_holder, | |
::DDS::InstanceHandle_t | handle | |||
) | [inline] |
Definition at line 248 of file DataWriterImpl_T.h.
References DDS::RETCODE_ERROR, and DDS::RETCODE_OK.
00251 { 00252 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, 00253 guard, 00254 get_lock (), 00255 ::DDS::RETCODE_ERROR); 00256 00257 typename InstanceMap::iterator const the_end = instance_map_.end (); 00258 for (typename InstanceMap::iterator it = instance_map_.begin (); 00259 it != the_end; 00260 ++it) 00261 { 00262 if (it->second == handle) 00263 { 00264 key_holder = it->first; 00265 return ::DDS::RETCODE_OK; 00266 } 00267 } 00268 00269 return ::DDS::RETCODE_ERROR; 00270 }
::DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl_T< MessageType >::get_or_create_instance_handle | ( | ::DDS::InstanceHandle_t & | handle, | |
const MessageType & | instance_data, | |||
const ::DDS::Time_t & | source_timestamp | |||
) | [inline, private] |
Find the instance handle for the given instance_data using the data type's key(s). If the instance does not already exist create a new instance handle for it.
Definition at line 524 of file DataWriterImpl_T.h.
References DDS::HANDLE_NIL, OpenDDS::DCPS::KEY_ONLY_MARSHALING, DDS::RETCODE_ERROR, DDS::RETCODE_OK, and OpenDDS::DCPS::PublicationInstance::unregistered_.
00528 { 00529 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 00530 guard, 00531 get_lock(), 00532 ::DDS::RETCODE_ERROR); 00533 00534 handle = ::DDS::HANDLE_NIL; 00535 typename InstanceMap::const_iterator it = instance_map_.find(instance_data); 00536 00537 bool needs_creation = true; 00538 bool needs_registration = true; 00539 00540 if (it != instance_map_.end()) 00541 { 00542 needs_creation = false; 00543 00544 handle = it->second; 00545 OpenDDS::DCPS::PublicationInstance* instance = get_handle_instance(handle); 00546 00547 if (instance->unregistered_ == false) 00548 { 00549 needs_registration = false; 00550 } 00551 // else: The instance is unregistered and now register again. 00552 } 00553 00554 if (needs_registration) 00555 { 00556 // don't use fast allocator for registration. 00557 ACE_Message_Block* const marshalled = 00558 this->dds_marshal(instance_data, 00559 OpenDDS::DCPS::KEY_ONLY_MARSHALING); 00560 00561 // tell DataWriterLocal and Publisher about the instance. 00562 ::DDS::ReturnCode_t ret = register_instance_i(handle, marshalled, source_timestamp); 00563 // note: the WriteDataContainer/PublicationInstance maintains ownership 00564 // of the marshalled sample. 00565 00566 if (ret != ::DDS::RETCODE_OK) 00567 { 00568 marshalled->release (); 00569 handle = ::DDS::HANDLE_NIL; 00570 return ret; 00571 } 00572 00573 if (needs_creation) 00574 { 00575 std::pair<typename InstanceMap::iterator, bool> pair = 00576 instance_map_.insert(typename InstanceMap::value_type(instance_data, handle)); 00577 00578 if (pair.second == false) 00579 { 00580 handle = ::DDS::HANDLE_NIL; 00581 ACE_ERROR_RETURN ((LM_ERROR, 00582 ACE_TEXT("(%P|%t) ") 00583 ACE_TEXT("%CDataWriterImpl::") 00584 ACE_TEXT("get_or_create_instance_handle, ") 00585 ACE_TEXT("insert %s failed. \n"), 00586 TraitsType::type_name(), TraitsType::type_name()), 00587 ::DDS::RETCODE_ERROR); 00588 } 00589 } // end of if (needs_creation) 00590 00591 send_all_to_flush_control(guard); 00592 00593 } // end of if (needs_registration) 00594 00595 return ::DDS::RETCODE_OK; 00596 }
virtual void OpenDDS::DCPS::DataWriterImpl_T< MessageType >::init | ( | ::DDS::Topic_ptr | topic, | |
OpenDDS::DCPS::TopicImpl * | topic_servant, | |||
const ::DDS::DataWriterQos & | qos, | |||
::DDS::DataWriterListener_ptr | a_listener, | |||
const ::DDS::StatusMask & | mask, | |||
OpenDDS::DCPS::DomainParticipantImpl * | participant_servant, | |||
OpenDDS::DCPS::PublisherImpl * | publisher_servant, | |||
::DDS::DataWriter_ptr | dw_objref | |||
) | [inline, virtual] |
Initialize the DataWriter object. Called as part of create_datawriter.
Definition at line 297 of file DataWriterImpl_T.h.
References OpenDDS::DCPS::gen_max_marshaled_size(), and OpenDDS::DCPS::DataWriterImpl::init().
00305 { 00306 OpenDDS::DCPS::DataWriterImpl::init (topic, 00307 topic_servant, 00308 qos, 00309 a_listener, 00310 mask, 00311 participant_servant, 00312 publisher_servant, 00313 dw_objref); 00314 00315 MessageType data; 00316 if (TraitsType::gen_is_bounded_size(data)) { 00317 marshaled_size_ = 8 + TraitsType::gen_max_marshaled_size(data, true); 00318 // worst case: CDR encapsulation (4 bytes) + Padding for alignment (4 bytes) 00319 } else { 00320 marshaled_size_ = 0; // should use gen_find_size when marshaling 00321 } 00322 OpenDDS::DCPS::KeyOnly<const MessageType > ko(data); 00323 if (TraitsType::gen_is_bounded_size(ko)) { 00324 key_marshaled_size_ = 8 + TraitsType::gen_max_marshaled_size(ko, true); 00325 // worst case: CDR Encapsulation (4 bytes) + Padding for alignment (4 bytes) 00326 } else { 00327 key_marshaled_size_ = 0; // should use gen_find_size when marshaling 00328 } 00329 }
virtual ::DDS::InstanceHandle_t OpenDDS::DCPS::DataWriterImpl_T< MessageType >::lookup_instance | ( | const MessageType & | instance_data | ) | [inline] |
Definition at line 272 of file DataWriterImpl_T.h.
References DDS::HANDLE_NIL, and DDS::RETCODE_ERROR.
00274 { 00275 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, 00276 guard, 00277 get_lock (), 00278 ::DDS::RETCODE_ERROR); 00279 00280 typename InstanceMap::const_iterator const it = instance_map_.find(instance_data); 00281 00282 if (it == instance_map_.end()) 00283 { 00284 return ::DDS::HANDLE_NIL; 00285 } 00286 else 00287 { 00288 return it->second; 00289 } 00290 }
typedef OpenDDS::DCPS::DataWriterImpl_T< MessageType >::OPENDDS_MAP_CMP | ( | MessageType | , | |
::DDS::InstanceHandle_t | , | |||
typename TraitsType::LessThanType | ||||
) |
virtual ::DDS::InstanceHandle_t OpenDDS::DCPS::DataWriterImpl_T< MessageType >::register_instance | ( | const MessageType & | instance | ) | [inline] |
Definition at line 57 of file DataWriterImpl_T.h.
References register_instance_w_timestamp(), and OpenDDS::DCPS::time_value_to_time().
00059 { 00060 ::DDS::Time_t const timestamp = 00061 ::OpenDDS::DCPS::time_value_to_time (ACE_OS::gettimeofday ()); 00062 return register_instance_w_timestamp (instance, timestamp); 00063 }
virtual ::DDS::InstanceHandle_t OpenDDS::DCPS::DataWriterImpl_T< MessageType >::register_instance_w_timestamp | ( | const MessageType & | instance, | |
const ::DDS::Time_t & | timestamp | |||
) | [inline] |
Definition at line 65 of file DataWriterImpl_T.h.
References DDS::HANDLE_NIL, and DDS::RETCODE_OK.
00068 { 00069 ::DDS::InstanceHandle_t registered_handle = ::DDS::HANDLE_NIL; 00070 00071 ::DDS::ReturnCode_t const ret 00072 = this->get_or_create_instance_handle(registered_handle, 00073 instance, 00074 timestamp); 00075 if (ret != ::DDS::RETCODE_OK) 00076 { 00077 ACE_ERROR ((LM_ERROR, 00078 ACE_TEXT("(%P|%t) ") 00079 ACE_TEXT("%CDataWriterImpl::") 00080 ACE_TEXT("register_instance_w_timestamp, ") 00081 ACE_TEXT("register failed error=%d.\n"), 00082 TraitsType::type_name(), 00083 ret)); 00084 } 00085 00086 return registered_handle; 00087 }
virtual ::DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl_T< MessageType >::unregister_instance | ( | const MessageType & | instance, | |
::DDS::InstanceHandle_t | handle | |||
) | [inline] |
Definition at line 89 of file DataWriterImpl_T.h.
References OpenDDS::DCPS::time_value_to_time(), and unregister_instance_w_timestamp().
00092 { 00093 ::DDS::Time_t const timestamp = 00094 ::OpenDDS::DCPS::time_value_to_time (ACE_OS::gettimeofday ()); 00095 00096 return unregister_instance_w_timestamp (instance, 00097 handle, 00098 timestamp); 00099 }
virtual ::DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl_T< MessageType >::unregister_instance_w_timestamp | ( | const MessageType & | instance, | |
::DDS::InstanceHandle_t | handle, | |||
const ::DDS::Time_t & | timestamp | |||
) | [inline] |
Definition at line 101 of file DataWriterImpl_T.h.
References DDS::HANDLE_NIL, lookup_instance(), DDS::RETCODE_ERROR, and OpenDDS::DCPS::DataWriterImpl::unregister_instance_i().
00105 { 00106 ::DDS::InstanceHandle_t const registered_handle = 00107 this->lookup_instance(instance); 00108 00109 if (registered_handle == ::DDS::HANDLE_NIL) 00110 { 00111 // This case could be the instance is not registered yet or 00112 // already unregistered. 00113 ACE_ERROR_RETURN ((LM_ERROR, 00114 ACE_TEXT("(%P|%t) ") 00115 ACE_TEXT("%CDataWriterImpl::") 00116 ACE_TEXT("unregister_instance_w_timestamp, ") 00117 ACE_TEXT("The instance is not registered.\n"), 00118 TraitsType::type_name()), 00119 ::DDS::RETCODE_ERROR); 00120 } 00121 else if (handle != ::DDS::HANDLE_NIL && handle != registered_handle) 00122 { 00123 ACE_ERROR_RETURN ((LM_ERROR, 00124 ACE_TEXT("(%P|%t) ") 00125 ACE_TEXT("%CDataWriterImpl::") 00126 ACE_TEXT("unregister_w_timestamp, ") 00127 ACE_TEXT("The given handle=%X is different from ") 00128 ACE_TEXT("registered handle=%X.\n"), 00129 TraitsType::type_name(), 00130 handle, registered_handle), 00131 ::DDS::RETCODE_ERROR); 00132 } 00133 00134 // DataWriterImpl::unregister_instance_i will call back to inform the 00135 // DataWriter. 00136 // That the instance handle is removed from there and hence 00137 // DataWriter can remove the instance here. 00138 return OpenDDS::DCPS::DataWriterImpl::unregister_instance_i(handle, timestamp); 00139 }
virtual void OpenDDS::DCPS::DataWriterImpl_T< MessageType >::unregistered | ( | ::DDS::InstanceHandle_t | instance_handle | ) | [inline, virtual] |
The framework has completed its part of unregistering the given instance.
Definition at line 391 of file DataWriterImpl_T.h.
00392 { 00393 ACE_UNUSED_ARG(instance_handle); 00394 // Previously this method removed the instance from the instance_map_. 00395 // The instance handle will not be removed from the 00396 // map so the instance for re-registration after unregistered 00397 // will use the old handle. 00398 }
virtual ::DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl_T< MessageType >::write | ( | const MessageType & | instance_data, | |
::DDS::InstanceHandle_t | handle | |||
) | [inline] |
Definition at line 144 of file DataWriterImpl_T.h.
References OpenDDS::DCPS::time_value_to_time(), and write_w_timestamp().
00147 { 00148 ::DDS::Time_t const source_timestamp = 00149 ::OpenDDS::DCPS::time_value_to_time (ACE_OS::gettimeofday ()); 00150 return write_w_timestamp (instance_data, 00151 handle, 00152 source_timestamp); 00153 }
virtual ::DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl_T< MessageType >::write_w_timestamp | ( | const MessageType & | instance_data, | |
::DDS::InstanceHandle_t | handle, | |||
const ::DDS::Time_t & | source_timestamp | |||
) | [inline] |
Definition at line 159 of file DataWriterImpl_T.h.
References OpenDDS::DCPS::FULL_MARSHALING, DDS::HANDLE_NIL, OpenDDS::DCPS::push_back(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, TheServiceParticipant, and OpenDDS::DCPS::DataWriterImpl::write().
00163 { 00164 // This operation assumes the provided handle is valid. The handle 00165 // provided will not be verified. 00166 00167 if (handle == ::DDS::HANDLE_NIL) { 00168 ::DDS::InstanceHandle_t registered_handle = ::DDS::HANDLE_NIL; 00169 ::DDS::ReturnCode_t ret 00170 = this->get_or_create_instance_handle(registered_handle, 00171 instance_data, 00172 source_timestamp); 00173 if (ret != ::DDS::RETCODE_OK) { 00174 ACE_ERROR_RETURN((LM_ERROR, 00175 ACE_TEXT("(%P|%t) ") 00176 ACE_TEXT("%CDataWriterImpl::write, ") 00177 ACE_TEXT("register failed err=%d.\n"), 00178 TraitsType::type_name(), 00179 ret), 00180 ret); 00181 } 00182 00183 handle = registered_handle; 00184 } 00185 00186 // list of reader RepoIds that should not get data 00187 OpenDDS::DCPS::GUIDSeq_var filter_out; 00188 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC 00189 if (TheServiceParticipant->publisher_content_filter()) { 00190 ACE_GUARD_RETURN(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_, ::DDS::RETCODE_ERROR); 00191 for (RepoIdToReaderInfoMap::iterator iter = reader_info_.begin(), 00192 end = reader_info_.end(); iter != end; ++iter) { 00193 const ReaderInfo& ri = iter->second; 00194 if (!ri.eval_.is_nil()) { 00195 if (!filter_out.ptr()) { 00196 filter_out = new OpenDDS::DCPS::GUIDSeq; 00197 } 00198 if (!ri.eval_->eval(instance_data, ri.expression_params_)) { 00199 push_back(filter_out.inout(), iter->first); 00200 } 00201 } 00202 } 00203 } 00204 #endif 00205 00206 ACE_Message_Block* const marshalled = 00207 dds_marshal (instance_data, OpenDDS::DCPS::FULL_MARSHALING); 00208 00209 return OpenDDS::DCPS::DataWriterImpl::write(marshalled, handle, 00210 source_timestamp, 00211 filter_out._retn()); 00212 }
friend class ::DDS_TEST [friend] |
Reimplemented from OpenDDS::DCPS::DataWriterImpl.
Definition at line 607 of file DataWriterImpl_T.h.
DataAllocator* OpenDDS::DCPS::DataWriterImpl_T< MessageType >::data_allocator_ [private] |
Definition at line 601 of file DataWriterImpl_T.h.
::OpenDDS::DCPS::DataBlockAllocator* OpenDDS::DCPS::DataWriterImpl_T< MessageType >::db_allocator_ [private] |
The data block allocator.
Reimplemented from OpenDDS::DCPS::DataWriterImpl.
Definition at line 603 of file DataWriterImpl_T.h.
InstanceMap OpenDDS::DCPS::DataWriterImpl_T< MessageType >::instance_map_ [private] |
Definition at line 598 of file DataWriterImpl_T.h.
size_t OpenDDS::DCPS::DataWriterImpl_T< MessageType >::key_marshaled_size_ [private] |
Definition at line 600 of file DataWriterImpl_T.h.
size_t OpenDDS::DCPS::DataWriterImpl_T< MessageType >::marshaled_size_ [private] |
Definition at line 599 of file DataWriterImpl_T.h.
::OpenDDS::DCPS::MessageBlockAllocator* OpenDDS::DCPS::DataWriterImpl_T< MessageType >::mb_allocator_ [private] |
The message block allocator.
Reimplemented from OpenDDS::DCPS::DataWriterImpl.
Definition at line 602 of file DataWriterImpl_T.h.