00001 #ifndef dds_DCPS_DataWriterImpl_T_h
00002 #define dds_DCPS_DataWriterImpl_T_h
00003
00004 #include "dds/DCPS/PublicationInstance.h"
00005 #include "dds/DCPS/DataWriterImpl.h"
00006 #include "dds/DCPS/DataReaderImpl.h"
00007 #include "dds/DCPS/Util.h"
00008 #include "dds/DCPS/TypeSupportImpl.h"
00009 #include "dcps_export.h"
00010
00011 namespace OpenDDS {
00012 namespace DCPS {
00013
00014
00015
00016
00017
00018
00019 template <typename MessageType>
00020 class
00021 #if ( __GNUC__ == 4 && __GNUC_MINOR__ == 1)
00022 OpenDDS_Dcps_Export
00023 #endif
00024 DataWriterImpl_T
00025 : public virtual OpenDDS::DCPS::LocalObject<typename DDSTraits<MessageType>::DataWriterType>,
00026 public virtual OpenDDS::DCPS::DataWriterImpl
00027 {
00028 public:
00029 typedef DDSTraits<MessageType> TraitsType;
00030
00031 typedef OPENDDS_MAP_CMP(MessageType, ::DDS::InstanceHandle_t,
00032 typename TraitsType::LessThanType) InstanceMap;
00033 typedef ::OpenDDS::DCPS::Dynamic_Cached_Allocator_With_Overflow<ACE_Thread_Mutex> DataAllocator;
00034
00035 enum {
00036 cdr_header_size = 4
00037 };
00038
00039
00040 DataWriterImpl_T (void)
00041 : marshaled_size_ (0)
00042 , key_marshaled_size_ (0)
00043 , data_allocator_ (0)
00044 , mb_allocator_ (0)
00045 , db_allocator_ (0)
00046 {
00047 }
00048
00049
00050 virtual ~DataWriterImpl_T (void)
00051 {
00052 delete data_allocator_;
00053 delete mb_allocator_;
00054 delete db_allocator_;
00055 }
00056
00057 virtual ::DDS::InstanceHandle_t register_instance (
00058 const MessageType & instance)
00059 {
00060 ::DDS::Time_t const timestamp =
00061 ::OpenDDS::DCPS::time_value_to_time (ACE_OS::gettimeofday ());
00062 return register_instance_w_timestamp (instance, timestamp);
00063 }
00064
00065 virtual ::DDS::InstanceHandle_t register_instance_w_timestamp (
00066 const MessageType & instance,
00067 const ::DDS::Time_t & timestamp)
00068 {
00069 ::DDS::InstanceHandle_t registered_handle = ::DDS::HANDLE_NIL;
00070
00071 ::DDS::ReturnCode_t const ret
00072 = this->get_or_create_instance_handle(registered_handle,
00073 instance,
00074 timestamp);
00075 if (ret != ::DDS::RETCODE_OK)
00076 {
00077 ACE_ERROR ((LM_ERROR,
00078 ACE_TEXT("(%P|%t) ")
00079 ACE_TEXT("%CDataWriterImpl::")
00080 ACE_TEXT("register_instance_w_timestamp, ")
00081 ACE_TEXT("register failed error=%d.\n"),
00082 TraitsType::type_name(),
00083 ret));
00084 }
00085
00086 return registered_handle;
00087 }
00088
00089 virtual ::DDS::ReturnCode_t unregister_instance (
00090 const MessageType & instance,
00091 ::DDS::InstanceHandle_t handle)
00092 {
00093 ::DDS::Time_t const timestamp =
00094 ::OpenDDS::DCPS::time_value_to_time (ACE_OS::gettimeofday ());
00095
00096 return unregister_instance_w_timestamp (instance,
00097 handle,
00098 timestamp);
00099 }
00100
00101 virtual ::DDS::ReturnCode_t unregister_instance_w_timestamp (
00102 const MessageType & instance,
00103 ::DDS::InstanceHandle_t handle,
00104 const ::DDS::Time_t & timestamp)
00105 {
00106 ::DDS::InstanceHandle_t const registered_handle =
00107 this->lookup_instance(instance);
00108
00109 if (registered_handle == ::DDS::HANDLE_NIL)
00110 {
00111
00112
00113 ACE_ERROR_RETURN ((LM_ERROR,
00114 ACE_TEXT("(%P|%t) ")
00115 ACE_TEXT("%CDataWriterImpl::")
00116 ACE_TEXT("unregister_instance_w_timestamp, ")
00117 ACE_TEXT("The instance is not registered.\n"),
00118 TraitsType::type_name()),
00119 ::DDS::RETCODE_ERROR);
00120 }
00121 else if (handle != ::DDS::HANDLE_NIL && handle != registered_handle)
00122 {
00123 ACE_ERROR_RETURN ((LM_ERROR,
00124 ACE_TEXT("(%P|%t) ")
00125 ACE_TEXT("%CDataWriterImpl::")
00126 ACE_TEXT("unregister_w_timestamp, ")
00127 ACE_TEXT("The given handle=%X is different from ")
00128 ACE_TEXT("registered handle=%X.\n"),
00129 TraitsType::type_name(),
00130 handle, registered_handle),
00131 ::DDS::RETCODE_ERROR);
00132 }
00133
00134
00135
00136
00137
00138 return OpenDDS::DCPS::DataWriterImpl::unregister_instance_i(handle, timestamp);
00139 }
00140
00141
00142
00143
00144 virtual ::DDS::ReturnCode_t write (
00145 const MessageType & instance_data,
00146 ::DDS::InstanceHandle_t handle)
00147 {
00148 ::DDS::Time_t const source_timestamp =
00149 ::OpenDDS::DCPS::time_value_to_time (ACE_OS::gettimeofday ());
00150 return write_w_timestamp (instance_data,
00151 handle,
00152 source_timestamp);
00153 }
00154
00155
00156
00157
00158
00159 virtual ::DDS::ReturnCode_t write_w_timestamp (
00160 const MessageType & instance_data,
00161 ::DDS::InstanceHandle_t handle,
00162 const ::DDS::Time_t & source_timestamp)
00163 {
00164
00165
00166
00167 if (handle == ::DDS::HANDLE_NIL) {
00168 ::DDS::InstanceHandle_t registered_handle = ::DDS::HANDLE_NIL;
00169 ::DDS::ReturnCode_t ret
00170 = this->get_or_create_instance_handle(registered_handle,
00171 instance_data,
00172 source_timestamp);
00173 if (ret != ::DDS::RETCODE_OK) {
00174 ACE_ERROR_RETURN((LM_ERROR,
00175 ACE_TEXT("(%P|%t) ")
00176 ACE_TEXT("%CDataWriterImpl::write, ")
00177 ACE_TEXT("register failed err=%d.\n"),
00178 TraitsType::type_name(),
00179 ret),
00180 ret);
00181 }
00182
00183 handle = registered_handle;
00184 }
00185
00186
00187 OpenDDS::DCPS::GUIDSeq_var filter_out;
00188 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00189 if (TheServiceParticipant->publisher_content_filter()) {
00190 ACE_GUARD_RETURN(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_, ::DDS::RETCODE_ERROR);
00191 for (RepoIdToReaderInfoMap::iterator iter = reader_info_.begin(),
00192 end = reader_info_.end(); iter != end; ++iter) {
00193 const ReaderInfo& ri = iter->second;
00194 if (!ri.eval_.is_nil()) {
00195 if (!filter_out.ptr()) {
00196 filter_out = new OpenDDS::DCPS::GUIDSeq;
00197 }
00198 if (!ri.eval_->eval(instance_data, ri.expression_params_)) {
00199 push_back(filter_out.inout(), iter->first);
00200 }
00201 }
00202 }
00203 }
00204 #endif
00205
00206 ACE_Message_Block* const marshalled =
00207 dds_marshal (instance_data, OpenDDS::DCPS::FULL_MARSHALING);
00208
00209 return OpenDDS::DCPS::DataWriterImpl::write(marshalled, handle,
00210 source_timestamp,
00211 filter_out._retn());
00212 }
00213
00214 virtual ::DDS::ReturnCode_t dispose (
00215 const MessageType & instance_data,
00216 ::DDS::InstanceHandle_t instance_handle)
00217 {
00218 ::DDS::Time_t const source_timestamp =
00219 ::OpenDDS::DCPS::time_value_to_time (ACE_OS::gettimeofday ());
00220 return dispose_w_timestamp (instance_data,
00221 instance_handle,
00222 source_timestamp);
00223 }
00224
00225 virtual ::DDS::ReturnCode_t dispose_w_timestamp (
00226 const MessageType & instance_data,
00227 ::DDS::InstanceHandle_t instance_handle,
00228 const ::DDS::Time_t & source_timestamp)
00229 {
00230 if(instance_handle == ::DDS::HANDLE_NIL)
00231 {
00232 instance_handle = this->lookup_instance(instance_data);
00233 if (instance_handle == ::DDS::HANDLE_NIL)
00234 {
00235 ACE_ERROR_RETURN ((LM_ERROR,
00236 ACE_TEXT("(%P|%t) ")
00237 ACE_TEXT("%CDataWriterImpl::dispose, ")
00238 ACE_TEXT("The instance sample is not registered.\n"),
00239 TraitsType::type_name()),
00240 ::DDS::RETCODE_ERROR);
00241 }
00242 }
00243
00244 return OpenDDS::DCPS::DataWriterImpl::dispose(instance_handle,
00245 source_timestamp);
00246 }
00247
00248 virtual ::DDS::ReturnCode_t get_key_value (
00249 MessageType & key_holder,
00250 ::DDS::InstanceHandle_t handle)
00251 {
00252 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
00253 guard,
00254 get_lock (),
00255 ::DDS::RETCODE_ERROR);
00256
00257 typename InstanceMap::iterator const the_end = instance_map_.end ();
00258 for (typename InstanceMap::iterator it = instance_map_.begin ();
00259 it != the_end;
00260 ++it)
00261 {
00262 if (it->second == handle)
00263 {
00264 key_holder = it->first;
00265 return ::DDS::RETCODE_OK;
00266 }
00267 }
00268
00269 return ::DDS::RETCODE_ERROR;
00270 }
00271
00272 virtual ::DDS::InstanceHandle_t lookup_instance (
00273 const MessageType & instance_data)
00274 {
00275 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
00276 guard,
00277 get_lock (),
00278 ::DDS::RETCODE_ERROR);
00279
00280 typename InstanceMap::const_iterator const it = instance_map_.find(instance_data);
00281
00282 if (it == instance_map_.end())
00283 {
00284 return ::DDS::HANDLE_NIL;
00285 }
00286 else
00287 {
00288 return it->second;
00289 }
00290 }
00291
00292
00293
00294
00295
00296
00297 virtual void init (::DDS::Topic_ptr topic,
00298 OpenDDS::DCPS::TopicImpl* topic_servant,
00299 const ::DDS::DataWriterQos & qos,
00300 ::DDS::DataWriterListener_ptr a_listener,
00301 const ::DDS::StatusMask & mask,
00302 OpenDDS::DCPS::DomainParticipantImpl* participant_servant,
00303 OpenDDS::DCPS::PublisherImpl* publisher_servant,
00304 ::DDS::DataWriter_ptr dw_objref)
00305 {
00306 OpenDDS::DCPS::DataWriterImpl::init (topic,
00307 topic_servant,
00308 qos,
00309 a_listener,
00310 mask,
00311 participant_servant,
00312 publisher_servant,
00313 dw_objref);
00314
00315 MessageType data;
00316 if (TraitsType::gen_is_bounded_size(data)) {
00317 marshaled_size_ = 8 + TraitsType::gen_max_marshaled_size(data, true);
00318
00319 } else {
00320 marshaled_size_ = 0;
00321 }
00322 OpenDDS::DCPS::KeyOnly<const MessageType > ko(data);
00323 if (TraitsType::gen_is_bounded_size(ko)) {
00324 key_marshaled_size_ = 8 + TraitsType::gen_max_marshaled_size(ko, true);
00325
00326 } else {
00327 key_marshaled_size_ = 0;
00328 }
00329 }
00330
00331
00332
00333
00334
00335 virtual ::DDS::ReturnCode_t enable_specific ()
00336 {
00337 MessageType data;
00338 if (TraitsType::gen_is_bounded_size (data))
00339 {
00340 data_allocator_ = new DataAllocator (n_chunks_, marshaled_size_);
00341 if (::OpenDDS::DCPS::DCPS_debug_level >= 2)
00342 ACE_DEBUG((LM_DEBUG,
00343 ACE_TEXT("(%P|%t) %CDataWriterImpl::")
00344 ACE_TEXT("enable_specific-data")
00345 ACE_TEXT(" Dynamic_Cached_Allocator_With_Overflow %x ")
00346 ACE_TEXT("with %d chunks\n"),
00347 TraitsType::type_name(),
00348 data_allocator_,
00349 n_chunks_));
00350 }
00351 else
00352 {
00353 if (::OpenDDS::DCPS::DCPS_debug_level >= 2)
00354 ACE_DEBUG((LM_DEBUG,
00355 ACE_TEXT("(%P|%t) %CDataWriterImpl::enable_specific")
00356 ACE_TEXT(" is unbounded data - allocate from heap\n"), TraitsType::type_name()));
00357 }
00358
00359 mb_allocator_ =
00360 new ::OpenDDS::DCPS::MessageBlockAllocator (
00361 n_chunks_ * association_chunk_multiplier_);
00362 db_allocator_ = new ::OpenDDS::DCPS::DataBlockAllocator (n_chunks_);
00363
00364 if (::OpenDDS::DCPS::DCPS_debug_level >= 2)
00365 {
00366 ACE_DEBUG((LM_DEBUG,
00367 ACE_TEXT("(%P|%t) %CDataWriterImpl::")
00368 ACE_TEXT("enable_specific-mb ")
00369 ACE_TEXT("Cached_Allocator_With_Overflow ")
00370 ACE_TEXT("%x with %d chunks\n"),
00371 TraitsType::type_name(),
00372 mb_allocator_,
00373 n_chunks_ * association_chunk_multiplier_));
00374 ACE_DEBUG((LM_DEBUG,
00375 ACE_TEXT("(%P|%t) %CDataWriterImpl::")
00376 ACE_TEXT("enable_specific-db ")
00377 ACE_TEXT("Cached_Allocator_With_Overflow ")
00378 ACE_TEXT("%x with %d chunks\n"),
00379 TraitsType::type_name(),
00380 db_allocator_,
00381 n_chunks_));
00382 }
00383
00384 return ::DDS::RETCODE_OK;
00385 }
00386
00387
00388
00389
00390
00391 virtual void unregistered(::DDS::InstanceHandle_t instance_handle)
00392 {
00393 ACE_UNUSED_ARG(instance_handle);
00394
00395
00396
00397
00398 }
00399
00400
00401
00402
00403 ACE_INLINE
00404 DataAllocator* data_allocator () const {
00405 return data_allocator_;
00406 };
00407
00408 private:
00409
00410
00411
00412
00413
00414
00415
00416
00417
00418 ACE_Message_Block* dds_marshal(
00419 const MessageType& instance_data,
00420 OpenDDS::DCPS::MarshalingType marshaling_type)
00421 {
00422 const bool cdr = this->cdr_encapsulation(), swap = this->swap_bytes();
00423
00424 ACE_Message_Block* mb;
00425 if (marshaling_type == OpenDDS::DCPS::KEY_ONLY_MARSHALING) {
00426
00427
00428
00429 OpenDDS::DCPS::KeyOnly<const MessageType > ko_instance_data(instance_data);
00430 size_t effective_size = 0, padding = 0;
00431 if (key_marshaled_size_) {
00432 effective_size = key_marshaled_size_;
00433 } else {
00434 if (cdr && !Serializer::use_rti_serialization()) {
00435 effective_size = cdr_header_size;
00436 }
00437 TraitsType::gen_find_size(ko_instance_data, effective_size, padding);
00438 if (cdr && Serializer::use_rti_serialization()) {
00439 effective_size += (cdr_header_size);
00440 }
00441 }
00442 if (cdr) {
00443 effective_size += padding;
00444 }
00445 ACE_NEW_RETURN(mb, ACE_Message_Block(effective_size,
00446 ACE_Message_Block::MB_DATA,
00447 0,
00448 0,
00449 0,
00450 get_db_lock()), 0);
00451 OpenDDS::DCPS::Serializer serializer(mb, swap, cdr
00452 ? OpenDDS::DCPS::Serializer::ALIGN_CDR
00453 : OpenDDS::DCPS::Serializer::ALIGN_NONE);
00454 if (cdr) {
00455 serializer << ACE_OutputCDR::from_octet(0);
00456 serializer << ACE_OutputCDR::from_octet(swap ? !ACE_CDR_BYTE_ORDER : ACE_CDR_BYTE_ORDER);
00457 serializer << ACE_CDR::UShort(0);
00458 }
00459
00460
00461 if (cdr && Serializer::use_rti_serialization()) {
00462
00463 serializer.reset_alignment();
00464 }
00465 serializer << ko_instance_data;
00466 } else {
00467 size_t effective_size = 0, padding = 0;
00468 if (marshaled_size_) {
00469 effective_size = marshaled_size_;
00470 } else {
00471 if (cdr && !Serializer::use_rti_serialization()) {
00472 effective_size = cdr_header_size;
00473 }
00474 TraitsType::gen_find_size(instance_data, effective_size, padding);
00475 if (cdr && Serializer::use_rti_serialization()) {
00476 effective_size += (cdr_header_size);
00477 }
00478 }
00479 if (cdr) {
00480 effective_size += padding;
00481 }
00482 ACE_NEW_MALLOC_RETURN(mb,
00483 static_cast<ACE_Message_Block*>(
00484 mb_allocator_->malloc(
00485 sizeof(ACE_Message_Block))),
00486 ACE_Message_Block(
00487 effective_size,
00488 ACE_Message_Block::MB_DATA,
00489 0,
00490 0,
00491 data_allocator_,
00492 get_db_lock(),
00493 ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
00494 ACE_Time_Value::zero,
00495 ACE_Time_Value::max_time,
00496 db_allocator_,
00497 mb_allocator_),
00498 0);
00499 OpenDDS::DCPS::Serializer serializer(mb, swap, cdr
00500 ? OpenDDS::DCPS::Serializer::ALIGN_CDR
00501 : OpenDDS::DCPS::Serializer::ALIGN_NONE);
00502 if (cdr) {
00503 serializer << ACE_OutputCDR::from_octet(0);
00504 serializer << ACE_OutputCDR::from_octet(swap ? !ACE_CDR_BYTE_ORDER : ACE_CDR_BYTE_ORDER);
00505 serializer << ACE_CDR::UShort(0);
00506 }
00507
00508
00509 if (cdr && Serializer::use_rti_serialization()) {
00510
00511 serializer.reset_alignment();
00512 }
00513 serializer << instance_data;
00514 }
00515
00516 return mb;
00517 }
00518
00519
00520
00521
00522
00523
00524 ::DDS::ReturnCode_t get_or_create_instance_handle(
00525 ::DDS::InstanceHandle_t& handle,
00526 const MessageType& instance_data,
00527 const ::DDS::Time_t & source_timestamp)
00528 {
00529 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00530 guard,
00531 get_lock(),
00532 ::DDS::RETCODE_ERROR);
00533
00534 handle = ::DDS::HANDLE_NIL;
00535 typename InstanceMap::const_iterator it = instance_map_.find(instance_data);
00536
00537 bool needs_creation = true;
00538 bool needs_registration = true;
00539
00540 if (it != instance_map_.end())
00541 {
00542 needs_creation = false;
00543
00544 handle = it->second;
00545 OpenDDS::DCPS::PublicationInstance* instance = get_handle_instance(handle);
00546
00547 if (instance->unregistered_ == false)
00548 {
00549 needs_registration = false;
00550 }
00551
00552 }
00553
00554 if (needs_registration)
00555 {
00556
00557 ACE_Message_Block* const marshalled =
00558 this->dds_marshal(instance_data,
00559 OpenDDS::DCPS::KEY_ONLY_MARSHALING);
00560
00561
00562 ::DDS::ReturnCode_t ret = register_instance_i(handle, marshalled, source_timestamp);
00563
00564
00565
00566 if (ret != ::DDS::RETCODE_OK)
00567 {
00568 marshalled->release ();
00569 handle = ::DDS::HANDLE_NIL;
00570 return ret;
00571 }
00572
00573 if (needs_creation)
00574 {
00575 std::pair<typename InstanceMap::iterator, bool> pair =
00576 instance_map_.insert(typename InstanceMap::value_type(instance_data, handle));
00577
00578 if (pair.second == false)
00579 {
00580 handle = ::DDS::HANDLE_NIL;
00581 ACE_ERROR_RETURN ((LM_ERROR,
00582 ACE_TEXT("(%P|%t) ")
00583 ACE_TEXT("%CDataWriterImpl::")
00584 ACE_TEXT("get_or_create_instance_handle, ")
00585 ACE_TEXT("insert %s failed. \n"),
00586 TraitsType::type_name(), TraitsType::type_name()),
00587 ::DDS::RETCODE_ERROR);
00588 }
00589 }
00590
00591 send_all_to_flush_control(guard);
00592
00593 }
00594
00595 return ::DDS::RETCODE_OK;
00596 }
00597
00598 InstanceMap instance_map_;
00599 size_t marshaled_size_;
00600 size_t key_marshaled_size_;
00601 DataAllocator* data_allocator_;
00602 ::OpenDDS::DCPS::MessageBlockAllocator* mb_allocator_;
00603 ::OpenDDS::DCPS::DataBlockAllocator* db_allocator_;
00604
00605
00606
00607 friend class ::DDS_TEST;
00608 };
00609
00610 }
00611 }
00612
00613 #endif