#include <DataWriterImpl_T.h>
Public Types | |
enum | { cdr_header_size = 4 } |
typedef DDSTraits< MessageType > | TraitsType |
typedef MarshalTraits < MessageType > | MarshalTraitsType |
typedef ::OpenDDS::DCPS::Dynamic_Cached_Allocator_With_Overflow < ACE_Thread_Mutex > | DataAllocator |
Public Member Functions | |
typedef | OPENDDS_MAP_CMP_T (MessageType, DDS::InstanceHandle_t, typename TraitsType::LessThanType) InstanceMap |
DataWriterImpl_T (void) | |
virtual | ~DataWriterImpl_T (void) |
virtual DDS::InstanceHandle_t | register_instance (const MessageType &instance) |
virtual DDS::InstanceHandle_t | register_instance_w_timestamp (const MessageType &instance, const DDS::Time_t ×tamp) |
virtual DDS::ReturnCode_t | unregister_instance (const MessageType &instance, DDS::InstanceHandle_t handle) |
virtual DDS::ReturnCode_t | unregister_instance_w_timestamp (const MessageType &instance, DDS::InstanceHandle_t handle, const DDS::Time_t ×tamp) |
virtual DDS::ReturnCode_t | write (const MessageType &instance_data, DDS::InstanceHandle_t handle) |
virtual DDS::ReturnCode_t | write_w_timestamp (const MessageType &instance_data, DDS::InstanceHandle_t handle, const DDS::Time_t &source_timestamp) |
virtual DDS::ReturnCode_t | dispose (const MessageType &instance_data, DDS::InstanceHandle_t instance_handle) |
virtual DDS::ReturnCode_t | dispose_w_timestamp (const MessageType &instance_data, DDS::InstanceHandle_t instance_handle, const DDS::Time_t &source_timestamp) |
virtual DDS::ReturnCode_t | get_key_value (MessageType &key_holder, DDS::InstanceHandle_t handle) |
virtual DDS::InstanceHandle_t | lookup_instance (const MessageType &instance_data) |
virtual DDS::ReturnCode_t | enable_specific () |
ACE_INLINE DataAllocator * | data_allocator () const |
Private Member Functions | |
ACE_Message_Block * | dds_marshal (const MessageType &instance_data, OpenDDS::DCPS::MarshalingType marshaling_type) |
DDS::ReturnCode_t | get_or_create_instance_handle (DDS::InstanceHandle_t &handle, const MessageType &instance_data, const DDS::Time_t &source_timestamp) |
Private Attributes | |
InstanceMap | instance_map_ |
size_t | marshaled_size_ |
size_t | key_marshaled_size_ |
unique_ptr< DataAllocator > | data_allocator_ |
unique_ptr< MessageBlockAllocator > | mb_allocator_ |
The message block allocator. | |
unique_ptr< DataBlockAllocator > | db_allocator_ |
The data block allocator. | |
Friends | |
class | ::DDS_TEST |
Servant for DataWriter interface of the Traits::MessageType data type.
See the DDS specification, OMG formal/04-12-02, for a description of this interface.
Definition at line 22 of file DataWriterImpl_T.h.
typedef ::OpenDDS::DCPS::Dynamic_Cached_Allocator_With_Overflow<ACE_Thread_Mutex> OpenDDS::DCPS::DataWriterImpl_T< MessageType >::DataAllocator |
Definition at line 36 of file DataWriterImpl_T.h.
typedef MarshalTraits<MessageType> OpenDDS::DCPS::DataWriterImpl_T< MessageType >::MarshalTraitsType |
Definition at line 32 of file DataWriterImpl_T.h.
typedef DDSTraits<MessageType> OpenDDS::DCPS::DataWriterImpl_T< MessageType >::TraitsType |
Definition at line 31 of file DataWriterImpl_T.h.
anonymous enum |
Definition at line 38 of file DataWriterImpl_T.h.
00038 { 00039 cdr_header_size = 4 00040 };
OpenDDS::DCPS::DataWriterImpl_T< MessageType >::DataWriterImpl_T | ( | void | ) | [inline] |
Definition at line 42 of file DataWriterImpl_T.h.
References OpenDDS::DCPS::gen_max_marshaled_size().
00043 : marshaled_size_ (0) 00044 , key_marshaled_size_ (0) 00045 { 00046 MessageType data; 00047 if (MarshalTraitsType::gen_is_bounded_size()) { 00048 marshaled_size_ = 8 + TraitsType::gen_max_marshaled_size(data, true); 00049 // worst case: CDR encapsulation (4 bytes) + Padding for alignment (4 bytes) 00050 } else { 00051 marshaled_size_ = 0; // should use gen_find_size when marshaling 00052 } 00053 if (MarshalTraitsType::gen_is_bounded_key_size()) { 00054 OpenDDS::DCPS::KeyOnly<const MessageType > ko(data); 00055 key_marshaled_size_ = 8 + TraitsType::gen_max_marshaled_size(ko, true); 00056 // worst case: CDR Encapsulation (4 bytes) + Padding for alignment (4 bytes) 00057 } else { 00058 key_marshaled_size_ = 0; // should use gen_find_size when marshaling 00059 } 00060 }
virtual OpenDDS::DCPS::DataWriterImpl_T< MessageType >::~DataWriterImpl_T | ( | void | ) | [inline, virtual] |
Definition at line 62 of file DataWriterImpl_T.h.
ACE_INLINE DataAllocator* OpenDDS::DCPS::DataWriterImpl_T< MessageType >::data_allocator | ( | ) | const [inline] |
Accessor to the marshalled data sample allocator.
Definition at line 361 of file DataWriterImpl_T.h.
00361 { 00362 return data_allocator_.get(); 00363 };
ACE_Message_Block* OpenDDS::DCPS::DataWriterImpl_T< MessageType >::dds_marshal | ( | const MessageType & | instance_data, | |
OpenDDS::DCPS::MarshalingType | marshaling_type | |||
) | [inline, private] |
Serialize the instance data.
instance_data | The data to serialize. | |
marshaling_type | Enumerated type specifying whether to marshal just the keys or the entire message. |
Definition at line 375 of file DataWriterImpl_T.h.
References ACE_NEW_RETURN(), ACE_TEXT(), OpenDDS::DCPS::Serializer::ALIGN_CDR, OpenDDS::DCPS::Serializer::ALIGN_NONE, OpenDDS::DCPS::gen_find_size(), OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), OpenDDS::DCPS::KEY_ONLY_MARSHALING, LM_ERROR, ACE_Time_Value::max_time, ACE_Message_Block::MB_DATA, OpenDDS::DCPS::unique_ptr< T, Deleter >::release(), OpenDDS::DCPS::unique_ptr< T, Deleter >::reset(), OpenDDS::DCPS::swap(), OpenDDS::DCPS::Serializer::use_rti_serialization(), and ACE_Time_Value::zero.
00378 { 00379 const bool cdr = this->cdr_encapsulation(), swap = this->swap_bytes(); 00380 00381 Message_Block_Ptr mb; 00382 ACE_Message_Block* tmp_mb; 00383 00384 if (marshaling_type == OpenDDS::DCPS::KEY_ONLY_MARSHALING) { 00385 // Don't use the cached allocator for the registered sample message 00386 // block. 00387 00388 OpenDDS::DCPS::KeyOnly<const MessageType > ko_instance_data(instance_data); 00389 size_t effective_size = 0, padding = 0; 00390 if (key_marshaled_size_) { 00391 effective_size = key_marshaled_size_; 00392 } else { 00393 if (cdr && !Serializer::use_rti_serialization()) { 00394 effective_size = cdr_header_size; // CDR encapsulation 00395 } 00396 TraitsType::gen_find_size(ko_instance_data, effective_size, padding); 00397 if (cdr && Serializer::use_rti_serialization()) { 00398 effective_size += (cdr_header_size); 00399 } 00400 } 00401 if (cdr) { 00402 effective_size += padding; 00403 } 00404 00405 ACE_NEW_RETURN(tmp_mb, ACE_Message_Block(effective_size, 00406 ACE_Message_Block::MB_DATA, 00407 0, //cont 00408 0, //data 00409 0, //alloc_strategy 00410 get_db_lock()), 0); 00411 mb.reset(tmp_mb); 00412 OpenDDS::DCPS::Serializer serializer(mb.get(), swap, cdr 00413 ? OpenDDS::DCPS::Serializer::ALIGN_CDR 00414 : OpenDDS::DCPS::Serializer::ALIGN_NONE); 00415 if (cdr) { 00416 serializer << ACE_OutputCDR::from_octet(0); 00417 serializer << ACE_OutputCDR::from_octet(swap ? !ACE_CDR_BYTE_ORDER : ACE_CDR_BYTE_ORDER); 00418 serializer << ACE_CDR::UShort(0); 00419 } 00420 // If this is RTI serialization, start counting byte offset AFTER 00421 // the header 00422 if (cdr && Serializer::use_rti_serialization()) { 00423 // Start counting byte-offset AFTER header 00424 serializer.reset_alignment(); 00425 } 00426 serializer << ko_instance_data; 00427 } else { // OpenDDS::DCPS::FULL_MARSHALING 00428 size_t effective_size = 0, padding = 0; 00429 if (marshaled_size_) { 00430 effective_size = marshaled_size_; 00431 } else { 00432 if (cdr && !Serializer::use_rti_serialization()) { 00433 effective_size = cdr_header_size; // CDR encapsulation 00434 } 00435 TraitsType::gen_find_size(instance_data, effective_size, padding); 00436 if (cdr && Serializer::use_rti_serialization()) { 00437 effective_size += (cdr_header_size); 00438 } 00439 } 00440 if (cdr) { 00441 effective_size += padding; 00442 } 00443 00444 00445 ACE_NEW_MALLOC_RETURN(tmp_mb, 00446 static_cast<ACE_Message_Block*>( 00447 mb_allocator_->malloc( 00448 sizeof(ACE_Message_Block))), 00449 ACE_Message_Block( 00450 effective_size, 00451 ACE_Message_Block::MB_DATA, 00452 0, //cont 00453 0, //data 00454 data_allocator_.get(), //allocator_strategy 00455 get_db_lock(), //data block locking_strategy 00456 ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY, 00457 ACE_Time_Value::zero, 00458 ACE_Time_Value::max_time, 00459 db_allocator_.get(), 00460 mb_allocator_.get()), 00461 0); 00462 mb.reset(tmp_mb); 00463 OpenDDS::DCPS::Serializer serializer(mb.get(), swap, cdr 00464 ? OpenDDS::DCPS::Serializer::ALIGN_CDR 00465 : OpenDDS::DCPS::Serializer::ALIGN_NONE); 00466 if (cdr) { 00467 serializer << ACE_OutputCDR::from_octet(0); 00468 serializer << ACE_OutputCDR::from_octet(swap ? !ACE_CDR_BYTE_ORDER : ACE_CDR_BYTE_ORDER); 00469 serializer << ACE_CDR::UShort(0); 00470 } 00471 // If this is RTI serialization, start counting byte offset AFTER 00472 // the header 00473 if (cdr && Serializer::use_rti_serialization()) { 00474 // Start counting byte-offset AFTER header 00475 serializer.reset_alignment(); 00476 } 00477 00478 if (! (serializer << instance_data)) { 00479 ACE_ERROR_RETURN((LM_ERROR, 00480 ACE_TEXT("(%P|%t) OpenDDS::DCPS::DataWriterImpl::dds_marshal(), ") 00481 ACE_TEXT("instance_data serialization error.\n")), 00482 0); 00483 } 00484 } 00485 00486 return mb.release(); 00487 }
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl_T< MessageType >::dispose | ( | const MessageType & | instance_data, | |
DDS::InstanceHandle_t | instance_handle | |||
) | [inline, virtual] |
Definition at line 223 of file DataWriterImpl_T.h.
References dispose_w_timestamp(), ACE_OS::gettimeofday(), and OpenDDS::DCPS::time_value_to_time().
00226 { 00227 DDS::Time_t const source_timestamp = 00228 ::OpenDDS::DCPS::time_value_to_time (ACE_OS::gettimeofday ()); 00229 return dispose_w_timestamp (instance_data, 00230 instance_handle, 00231 source_timestamp); 00232 }
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl_T< MessageType >::dispose_w_timestamp | ( | const MessageType & | instance_data, | |
DDS::InstanceHandle_t | instance_handle, | |||
const DDS::Time_t & | source_timestamp | |||
) | [inline, virtual] |
Definition at line 234 of file DataWriterImpl_T.h.
References ACE_TEXT(), dispose(), DDS::HANDLE_NIL, LM_ERROR, lookup_instance(), and DDS::RETCODE_ERROR.
00238 { 00239 if(instance_handle == DDS::HANDLE_NIL) 00240 { 00241 instance_handle = this->lookup_instance(instance_data); 00242 if (instance_handle == DDS::HANDLE_NIL) 00243 { 00244 ACE_ERROR_RETURN ((LM_ERROR, 00245 ACE_TEXT("(%P|%t) ") 00246 ACE_TEXT("%CDataWriterImpl::dispose_w_timestamp, ") 00247 ACE_TEXT("The instance sample is not registered.\n"), 00248 TraitsType::type_name()), 00249 DDS::RETCODE_ERROR); 00250 } 00251 } 00252 00253 return OpenDDS::DCPS::DataWriterImpl::dispose(instance_handle, 00254 source_timestamp); 00255 }
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl_T< MessageType >::enable_specific | ( | ) | [inline, virtual] |
Do parts of enable specific to the datatype. Called by DataWriterImpl::enable().
Implements OpenDDS::DCPS::DataWriterImpl.
Definition at line 306 of file DataWriterImpl_T.h.
References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, LM_DEBUG, and DDS::RETCODE_OK.
00307 { 00308 if (MarshalTraitsType::gen_is_bounded_size ()) 00309 { 00310 data_allocator_.reset(new DataAllocator (n_chunks_, marshaled_size_)); 00311 if (::OpenDDS::DCPS::DCPS_debug_level >= 2) 00312 ACE_DEBUG((LM_DEBUG, 00313 ACE_TEXT("(%P|%t) %CDataWriterImpl::") 00314 ACE_TEXT("enable_specific-data") 00315 ACE_TEXT(" Dynamic_Cached_Allocator_With_Overflow %x ") 00316 ACE_TEXT("with %d chunks\n"), 00317 TraitsType::type_name(), 00318 data_allocator_.get(), 00319 n_chunks_)); 00320 } 00321 else 00322 { 00323 if (::OpenDDS::DCPS::DCPS_debug_level >= 2) 00324 ACE_DEBUG((LM_DEBUG, 00325 ACE_TEXT("(%P|%t) %CDataWriterImpl::enable_specific") 00326 ACE_TEXT(" is unbounded data - allocate from heap\n"), TraitsType::type_name())); 00327 } 00328 00329 mb_allocator_.reset( 00330 new ::OpenDDS::DCPS::MessageBlockAllocator ( 00331 n_chunks_ * association_chunk_multiplier_)); 00332 db_allocator_.reset(new ::OpenDDS::DCPS::DataBlockAllocator (n_chunks_)); 00333 00334 if (::OpenDDS::DCPS::DCPS_debug_level >= 2) 00335 { 00336 ACE_DEBUG((LM_DEBUG, 00337 ACE_TEXT("(%P|%t) %CDataWriterImpl::") 00338 ACE_TEXT("enable_specific-mb ") 00339 ACE_TEXT("Cached_Allocator_With_Overflow ") 00340 ACE_TEXT("%x with %d chunks\n"), 00341 TraitsType::type_name(), 00342 mb_allocator_.get(), 00343 n_chunks_ * association_chunk_multiplier_)); 00344 ACE_DEBUG((LM_DEBUG, 00345 ACE_TEXT("(%P|%t) %CDataWriterImpl::") 00346 ACE_TEXT("enable_specific-db ") 00347 ACE_TEXT("Cached_Allocator_With_Overflow ") 00348 ACE_TEXT("%x with %d chunks\n"), 00349 TraitsType::type_name(), 00350 db_allocator_.get(), 00351 n_chunks_)); 00352 } 00353 00354 return DDS::RETCODE_OK; 00355 }
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl_T< MessageType >::get_key_value | ( | MessageType & | key_holder, | |
DDS::InstanceHandle_t | handle | |||
) | [inline, virtual] |
Definition at line 257 of file DataWriterImpl_T.h.
References DDS::RETCODE_BAD_PARAMETER, DDS::RETCODE_ERROR, and DDS::RETCODE_OK.
00260 { 00261 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, 00262 guard, 00263 get_lock (), 00264 DDS::RETCODE_ERROR); 00265 00266 typename InstanceMap::iterator const the_end = instance_map_.end (); 00267 for (typename InstanceMap::iterator it = instance_map_.begin (); 00268 it != the_end; 00269 ++it) 00270 { 00271 if (it->second == handle) 00272 { 00273 key_holder = it->first; 00274 return DDS::RETCODE_OK; 00275 } 00276 } 00277 00278 return DDS::RETCODE_BAD_PARAMETER; 00279 }
DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl_T< MessageType >::get_or_create_instance_handle | ( | DDS::InstanceHandle_t & | handle, | |
const MessageType & | instance_data, | |||
const DDS::Time_t & | source_timestamp | |||
) | [inline, private] |
Find the instance handle for the given instance_data using the data type's key(s). If the instance does not already exist create a new instance handle for it.
Definition at line 494 of file DataWriterImpl_T.h.
References ACE_TEXT(), DDS::HANDLE_NIL, OpenDDS::DCPS::KEY_ONLY_MARSHALING, LM_ERROR, OpenDDS::DCPS::move(), DDS::RETCODE_ERROR, and DDS::RETCODE_OK.
00498 { 00499 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 00500 guard, 00501 get_lock(), 00502 DDS::RETCODE_ERROR); 00503 00504 handle = DDS::HANDLE_NIL; 00505 typename InstanceMap::const_iterator it = instance_map_.find(instance_data); 00506 00507 bool needs_creation = true; 00508 bool needs_registration = true; 00509 00510 if (it != instance_map_.end()) 00511 { 00512 needs_creation = false; 00513 00514 handle = it->second; 00515 OpenDDS::DCPS::PublicationInstance_rch instance = get_handle_instance(handle); 00516 00517 if (instance->unregistered_ == false) 00518 { 00519 needs_registration = false; 00520 } 00521 // else: The instance is unregistered and now register again. 00522 } 00523 00524 if (needs_registration) 00525 { 00526 // don't use fast allocator for registration. 00527 Message_Block_Ptr marshalled( 00528 this->dds_marshal(instance_data, 00529 OpenDDS::DCPS::KEY_ONLY_MARSHALING)); 00530 00531 // tell DataWriterLocal and Publisher about the instance. 00532 DDS::ReturnCode_t ret = register_instance_i(handle, move(marshalled), source_timestamp); 00533 // note: the WriteDataContainer/PublicationInstance maintains ownership 00534 // of the marshalled sample. 00535 00536 if (ret != DDS::RETCODE_OK) 00537 { 00538 handle = DDS::HANDLE_NIL; 00539 return ret; 00540 } 00541 00542 if (needs_creation) 00543 { 00544 std::pair<typename InstanceMap::iterator, bool> pair = 00545 instance_map_.insert(typename InstanceMap::value_type(instance_data, handle)); 00546 00547 if (pair.second == false) 00548 { 00549 handle = DDS::HANDLE_NIL; 00550 ACE_ERROR_RETURN ((LM_ERROR, 00551 ACE_TEXT("(%P|%t) ") 00552 ACE_TEXT("%CDataWriterImpl::") 00553 ACE_TEXT("get_or_create_instance_handle, ") 00554 ACE_TEXT("insert %C failed. \n"), 00555 TraitsType::type_name(), TraitsType::type_name()), 00556 DDS::RETCODE_ERROR); 00557 } 00558 } // end of if (needs_creation) 00559 00560 send_all_to_flush_control(guard); 00561 00562 } // end of if (needs_registration) 00563 00564 return DDS::RETCODE_OK; 00565 }
virtual DDS::InstanceHandle_t OpenDDS::DCPS::DataWriterImpl_T< MessageType >::lookup_instance | ( | const MessageType & | instance_data | ) | [inline, virtual] |
Definition at line 281 of file DataWriterImpl_T.h.
References DDS::HANDLE_NIL, and DDS::RETCODE_ERROR.
00283 { 00284 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, 00285 guard, 00286 get_lock (), 00287 DDS::RETCODE_ERROR); 00288 00289 typename InstanceMap::const_iterator const it = instance_map_.find(instance_data); 00290 00291 if (it == instance_map_.end()) 00292 { 00293 return DDS::HANDLE_NIL; 00294 } 00295 else 00296 { 00297 return it->second; 00298 } 00299 }
typedef OpenDDS::DCPS::DataWriterImpl_T< MessageType >::OPENDDS_MAP_CMP_T | ( | MessageType | , | |
DDS::InstanceHandle_t | , | |||
typename TraitsType::LessThanType | ||||
) |
virtual DDS::InstanceHandle_t OpenDDS::DCPS::DataWriterImpl_T< MessageType >::register_instance | ( | const MessageType & | instance | ) | [inline, virtual] |
Definition at line 66 of file DataWriterImpl_T.h.
References ACE_OS::gettimeofday(), register_instance_w_timestamp(), OpenDDS::DCPS::time_value_to_time(), and timestamp().
00068 { 00069 DDS::Time_t const timestamp = 00070 ::OpenDDS::DCPS::time_value_to_time (ACE_OS::gettimeofday ()); 00071 return register_instance_w_timestamp (instance, timestamp); 00072 }
virtual DDS::InstanceHandle_t OpenDDS::DCPS::DataWriterImpl_T< MessageType >::register_instance_w_timestamp | ( | const MessageType & | instance, | |
const DDS::Time_t & | timestamp | |||
) | [inline, virtual] |
Definition at line 74 of file DataWriterImpl_T.h.
References ACE_TEXT(), DDS::HANDLE_NIL, LM_ERROR, and DDS::RETCODE_OK.
00077 { 00078 DDS::InstanceHandle_t registered_handle = DDS::HANDLE_NIL; 00079 00080 DDS::ReturnCode_t const ret 00081 = this->get_or_create_instance_handle(registered_handle, 00082 instance, 00083 timestamp); 00084 if (ret != DDS::RETCODE_OK) 00085 { 00086 ACE_ERROR ((LM_ERROR, 00087 ACE_TEXT("(%P|%t) ") 00088 ACE_TEXT("%CDataWriterImpl::") 00089 ACE_TEXT("register_instance_w_timestamp, ") 00090 ACE_TEXT("register failed error=%d.\n"), 00091 TraitsType::type_name(), 00092 ret)); 00093 } 00094 00095 return registered_handle; 00096 }
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl_T< MessageType >::unregister_instance | ( | const MessageType & | instance, | |
DDS::InstanceHandle_t | handle | |||
) | [inline, virtual] |
Definition at line 98 of file DataWriterImpl_T.h.
References ACE_OS::gettimeofday(), OpenDDS::DCPS::time_value_to_time(), timestamp(), and unregister_instance_w_timestamp().
00101 { 00102 DDS::Time_t const timestamp = 00103 ::OpenDDS::DCPS::time_value_to_time (ACE_OS::gettimeofday ()); 00104 00105 return unregister_instance_w_timestamp (instance, 00106 handle, 00107 timestamp); 00108 }
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl_T< MessageType >::unregister_instance_w_timestamp | ( | const MessageType & | instance, | |
DDS::InstanceHandle_t | handle, | |||
const DDS::Time_t & | timestamp | |||
) | [inline, virtual] |
Definition at line 110 of file DataWriterImpl_T.h.
References ACE_TEXT(), DDS::HANDLE_NIL, LM_ERROR, lookup_instance(), DDS::RETCODE_ERROR, and OpenDDS::DCPS::DataWriterImpl::unregister_instance_i().
00114 { 00115 DDS::InstanceHandle_t const registered_handle = 00116 this->lookup_instance(instance); 00117 00118 if (registered_handle == DDS::HANDLE_NIL) 00119 { 00120 // This case could be the instance is not registered yet or 00121 // already unregistered. 00122 ACE_ERROR_RETURN ((LM_ERROR, 00123 ACE_TEXT("(%P|%t) ") 00124 ACE_TEXT("%CDataWriterImpl::") 00125 ACE_TEXT("unregister_instance_w_timestamp, ") 00126 ACE_TEXT("The instance is not registered.\n"), 00127 TraitsType::type_name()), 00128 DDS::RETCODE_ERROR); 00129 } 00130 else if (handle != DDS::HANDLE_NIL && handle != registered_handle) 00131 { 00132 ACE_ERROR_RETURN ((LM_ERROR, 00133 ACE_TEXT("(%P|%t) ") 00134 ACE_TEXT("%CDataWriterImpl::") 00135 ACE_TEXT("unregister_w_timestamp, ") 00136 ACE_TEXT("The given handle=%X is different from ") 00137 ACE_TEXT("registered handle=%X.\n"), 00138 TraitsType::type_name(), 00139 handle, registered_handle), 00140 DDS::RETCODE_ERROR); 00141 } 00142 00143 // DataWriterImpl::unregister_instance_i will call back to inform the 00144 // DataWriter. 00145 // That the instance handle is removed from there and hence 00146 // DataWriter can remove the instance here. 00147 return OpenDDS::DCPS::DataWriterImpl::unregister_instance_i(handle, timestamp); 00148 }
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl_T< MessageType >::write | ( | const MessageType & | instance_data, | |
DDS::InstanceHandle_t | handle | |||
) | [inline, virtual] |
Definition at line 153 of file DataWriterImpl_T.h.
References ACE_OS::gettimeofday(), OpenDDS::DCPS::time_value_to_time(), and write_w_timestamp().
00156 { 00157 DDS::Time_t const source_timestamp = 00158 ::OpenDDS::DCPS::time_value_to_time (ACE_OS::gettimeofday ()); 00159 return write_w_timestamp (instance_data, 00160 handle, 00161 source_timestamp); 00162 }
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl_T< MessageType >::write_w_timestamp | ( | const MessageType & | instance_data, | |
DDS::InstanceHandle_t | handle, | |||
const DDS::Time_t & | source_timestamp | |||
) | [inline, virtual] |
Definition at line 168 of file DataWriterImpl_T.h.
References ACE_TEXT(), OpenDDS::DCPS::DataWriterImpl::ReaderInfo::eval_, OpenDDS::DCPS::DataWriterImpl::ReaderInfo::expression_params_, OpenDDS::DCPS::FULL_MARSHALING, DDS::HANDLE_NIL, OpenDDS::DCPS::RcHandle< T >::is_nil(), LM_ERROR, OpenDDS::DCPS::move(), OpenDDS::DCPS::push_back(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, TheServiceParticipant, and write().
00172 { 00173 // This operation assumes the provided handle is valid. The handle 00174 // provided will not be verified. 00175 00176 if (handle == DDS::HANDLE_NIL) { 00177 DDS::InstanceHandle_t registered_handle = DDS::HANDLE_NIL; 00178 DDS::ReturnCode_t ret 00179 = this->get_or_create_instance_handle(registered_handle, 00180 instance_data, 00181 source_timestamp); 00182 if (ret != DDS::RETCODE_OK) { 00183 ACE_ERROR_RETURN((LM_ERROR, 00184 ACE_TEXT("(%P|%t) ") 00185 ACE_TEXT("%CDataWriterImpl::write_w_timestamp, ") 00186 ACE_TEXT("register failed err=%d.\n"), 00187 TraitsType::type_name(), 00188 ret), 00189 ret); 00190 } 00191 00192 handle = registered_handle; 00193 } 00194 00195 // list of reader RepoIds that should not get data 00196 OpenDDS::DCPS::GUIDSeq_var filter_out; 00197 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC 00198 if (TheServiceParticipant->publisher_content_filter()) { 00199 ACE_GUARD_RETURN(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_, DDS::RETCODE_ERROR); 00200 for (RepoIdToReaderInfoMap::iterator iter = reader_info_.begin(), 00201 end = reader_info_.end(); iter != end; ++iter) { 00202 const ReaderInfo& ri = iter->second; 00203 if (!ri.eval_.is_nil()) { 00204 if (!filter_out.ptr()) { 00205 filter_out = new OpenDDS::DCPS::GUIDSeq; 00206 } 00207 if (!ri.eval_->eval(instance_data, ri.expression_params_)) { 00208 push_back(filter_out.inout(), iter->first); 00209 } 00210 } 00211 } 00212 } 00213 #endif 00214 00215 Message_Block_Ptr marshalled( 00216 dds_marshal (instance_data, OpenDDS::DCPS::FULL_MARSHALING)); 00217 00218 return OpenDDS::DCPS::DataWriterImpl::write(move(marshalled), handle, 00219 source_timestamp, 00220 filter_out._retn()); 00221 }
friend class ::DDS_TEST [friend] |
Reimplemented from OpenDDS::DCPS::DataWriterImpl.
Definition at line 576 of file DataWriterImpl_T.h.
unique_ptr<DataAllocator> OpenDDS::DCPS::DataWriterImpl_T< MessageType >::data_allocator_ [private] |
Definition at line 570 of file DataWriterImpl_T.h.
unique_ptr<DataBlockAllocator> OpenDDS::DCPS::DataWriterImpl_T< MessageType >::db_allocator_ [private] |
The data block allocator.
Reimplemented from OpenDDS::DCPS::DataWriterImpl.
Definition at line 572 of file DataWriterImpl_T.h.
InstanceMap OpenDDS::DCPS::DataWriterImpl_T< MessageType >::instance_map_ [private] |
Definition at line 567 of file DataWriterImpl_T.h.
size_t OpenDDS::DCPS::DataWriterImpl_T< MessageType >::key_marshaled_size_ [private] |
Definition at line 569 of file DataWriterImpl_T.h.
size_t OpenDDS::DCPS::DataWriterImpl_T< MessageType >::marshaled_size_ [private] |
Definition at line 568 of file DataWriterImpl_T.h.
unique_ptr<MessageBlockAllocator> OpenDDS::DCPS::DataWriterImpl_T< MessageType >::mb_allocator_ [private] |
The message block allocator.
Reimplemented from OpenDDS::DCPS::DataWriterImpl.
Definition at line 571 of file DataWriterImpl_T.h.