00001 #ifndef dds_DCPS_DataWriterImpl_T_h
00002 #define dds_DCPS_DataWriterImpl_T_h
00003
00004 #include "dds/DCPS/PublicationInstance.h"
00005 #include "dds/DCPS/DataWriterImpl.h"
00006 #include "dds/DCPS/DataReaderImpl.h"
00007 #include "dds/DCPS/Util.h"
00008 #include "dds/DCPS/TypeSupportImpl.h"
00009 #include "dcps_export.h"
00010
00011 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00012
00013 namespace OpenDDS {
00014 namespace DCPS {
00015
00016
00017
00018
00019
00020
00021 template <typename MessageType>
00022 class
00023 #if ( __GNUC__ == 4 && __GNUC_MINOR__ == 1)
00024 OpenDDS_Dcps_Export
00025 #endif
00026 DataWriterImpl_T
00027 : public virtual OpenDDS::DCPS::LocalObject<typename DDSTraits<MessageType>::DataWriterType>,
00028 public virtual OpenDDS::DCPS::DataWriterImpl
00029 {
00030 public:
00031 typedef DDSTraits<MessageType> TraitsType;
00032 typedef MarshalTraits<MessageType> MarshalTraitsType;
00033
00034 typedef OPENDDS_MAP_CMP_T(MessageType, DDS::InstanceHandle_t,
00035 typename TraitsType::LessThanType) InstanceMap;
00036 typedef ::OpenDDS::DCPS::Dynamic_Cached_Allocator_With_Overflow<ACE_Thread_Mutex> DataAllocator;
00037
00038 enum {
00039 cdr_header_size = 4
00040 };
00041
00042 DataWriterImpl_T (void)
00043 : marshaled_size_ (0)
00044 , key_marshaled_size_ (0)
00045 {
00046 MessageType data;
00047 if (MarshalTraitsType::gen_is_bounded_size()) {
00048 marshaled_size_ = 8 + TraitsType::gen_max_marshaled_size(data, true);
00049
00050 } else {
00051 marshaled_size_ = 0;
00052 }
00053 if (MarshalTraitsType::gen_is_bounded_key_size()) {
00054 OpenDDS::DCPS::KeyOnly<const MessageType > ko(data);
00055 key_marshaled_size_ = 8 + TraitsType::gen_max_marshaled_size(ko, true);
00056
00057 } else {
00058 key_marshaled_size_ = 0;
00059 }
00060 }
00061
00062 virtual ~DataWriterImpl_T (void)
00063 {
00064 }
00065
00066 virtual DDS::InstanceHandle_t register_instance (
00067 const MessageType & instance)
00068 {
00069 DDS::Time_t const timestamp =
00070 ::OpenDDS::DCPS::time_value_to_time (ACE_OS::gettimeofday ());
00071 return register_instance_w_timestamp (instance, timestamp);
00072 }
00073
00074 virtual DDS::InstanceHandle_t register_instance_w_timestamp (
00075 const MessageType & instance,
00076 const DDS::Time_t & timestamp)
00077 {
00078 DDS::InstanceHandle_t registered_handle = DDS::HANDLE_NIL;
00079
00080 DDS::ReturnCode_t const ret
00081 = this->get_or_create_instance_handle(registered_handle,
00082 instance,
00083 timestamp);
00084 if (ret != DDS::RETCODE_OK)
00085 {
00086 ACE_ERROR ((LM_ERROR,
00087 ACE_TEXT("(%P|%t) ")
00088 ACE_TEXT("%CDataWriterImpl::")
00089 ACE_TEXT("register_instance_w_timestamp, ")
00090 ACE_TEXT("register failed error=%d.\n"),
00091 TraitsType::type_name(),
00092 ret));
00093 }
00094
00095 return registered_handle;
00096 }
00097
00098 virtual DDS::ReturnCode_t unregister_instance (
00099 const MessageType & instance,
00100 DDS::InstanceHandle_t handle)
00101 {
00102 DDS::Time_t const timestamp =
00103 ::OpenDDS::DCPS::time_value_to_time (ACE_OS::gettimeofday ());
00104
00105 return unregister_instance_w_timestamp (instance,
00106 handle,
00107 timestamp);
00108 }
00109
00110 virtual DDS::ReturnCode_t unregister_instance_w_timestamp (
00111 const MessageType & instance,
00112 DDS::InstanceHandle_t handle,
00113 const DDS::Time_t & timestamp)
00114 {
00115 DDS::InstanceHandle_t const registered_handle =
00116 this->lookup_instance(instance);
00117
00118 if (registered_handle == DDS::HANDLE_NIL)
00119 {
00120
00121
00122 ACE_ERROR_RETURN ((LM_ERROR,
00123 ACE_TEXT("(%P|%t) ")
00124 ACE_TEXT("%CDataWriterImpl::")
00125 ACE_TEXT("unregister_instance_w_timestamp, ")
00126 ACE_TEXT("The instance is not registered.\n"),
00127 TraitsType::type_name()),
00128 DDS::RETCODE_ERROR);
00129 }
00130 else if (handle != DDS::HANDLE_NIL && handle != registered_handle)
00131 {
00132 ACE_ERROR_RETURN ((LM_ERROR,
00133 ACE_TEXT("(%P|%t) ")
00134 ACE_TEXT("%CDataWriterImpl::")
00135 ACE_TEXT("unregister_w_timestamp, ")
00136 ACE_TEXT("The given handle=%X is different from ")
00137 ACE_TEXT("registered handle=%X.\n"),
00138 TraitsType::type_name(),
00139 handle, registered_handle),
00140 DDS::RETCODE_ERROR);
00141 }
00142
00143
00144
00145
00146
00147 return OpenDDS::DCPS::DataWriterImpl::unregister_instance_i(handle, timestamp);
00148 }
00149
00150
00151
00152
00153 virtual DDS::ReturnCode_t write (
00154 const MessageType & instance_data,
00155 DDS::InstanceHandle_t handle)
00156 {
00157 DDS::Time_t const source_timestamp =
00158 ::OpenDDS::DCPS::time_value_to_time (ACE_OS::gettimeofday ());
00159 return write_w_timestamp (instance_data,
00160 handle,
00161 source_timestamp);
00162 }
00163
00164
00165
00166
00167
00168 virtual DDS::ReturnCode_t write_w_timestamp (
00169 const MessageType & instance_data,
00170 DDS::InstanceHandle_t handle,
00171 const DDS::Time_t & source_timestamp)
00172 {
00173
00174
00175
00176 if (handle == DDS::HANDLE_NIL) {
00177 DDS::InstanceHandle_t registered_handle = DDS::HANDLE_NIL;
00178 DDS::ReturnCode_t ret
00179 = this->get_or_create_instance_handle(registered_handle,
00180 instance_data,
00181 source_timestamp);
00182 if (ret != DDS::RETCODE_OK) {
00183 ACE_ERROR_RETURN((LM_ERROR,
00184 ACE_TEXT("(%P|%t) ")
00185 ACE_TEXT("%CDataWriterImpl::write_w_timestamp, ")
00186 ACE_TEXT("register failed err=%d.\n"),
00187 TraitsType::type_name(),
00188 ret),
00189 ret);
00190 }
00191
00192 handle = registered_handle;
00193 }
00194
00195
00196 OpenDDS::DCPS::GUIDSeq_var filter_out;
00197 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00198 if (TheServiceParticipant->publisher_content_filter()) {
00199 ACE_GUARD_RETURN(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_, DDS::RETCODE_ERROR);
00200 for (RepoIdToReaderInfoMap::iterator iter = reader_info_.begin(),
00201 end = reader_info_.end(); iter != end; ++iter) {
00202 const ReaderInfo& ri = iter->second;
00203 if (!ri.eval_.is_nil()) {
00204 if (!filter_out.ptr()) {
00205 filter_out = new OpenDDS::DCPS::GUIDSeq;
00206 }
00207 if (!ri.eval_->eval(instance_data, ri.expression_params_)) {
00208 push_back(filter_out.inout(), iter->first);
00209 }
00210 }
00211 }
00212 }
00213 #endif
00214
00215 Message_Block_Ptr marshalled(
00216 dds_marshal (instance_data, OpenDDS::DCPS::FULL_MARSHALING));
00217
00218 return OpenDDS::DCPS::DataWriterImpl::write(move(marshalled), handle,
00219 source_timestamp,
00220 filter_out._retn());
00221 }
00222
00223 virtual DDS::ReturnCode_t dispose (
00224 const MessageType & instance_data,
00225 DDS::InstanceHandle_t instance_handle)
00226 {
00227 DDS::Time_t const source_timestamp =
00228 ::OpenDDS::DCPS::time_value_to_time (ACE_OS::gettimeofday ());
00229 return dispose_w_timestamp (instance_data,
00230 instance_handle,
00231 source_timestamp);
00232 }
00233
00234 virtual DDS::ReturnCode_t dispose_w_timestamp (
00235 const MessageType & instance_data,
00236 DDS::InstanceHandle_t instance_handle,
00237 const DDS::Time_t & source_timestamp)
00238 {
00239 if(instance_handle == DDS::HANDLE_NIL)
00240 {
00241 instance_handle = this->lookup_instance(instance_data);
00242 if (instance_handle == DDS::HANDLE_NIL)
00243 {
00244 ACE_ERROR_RETURN ((LM_ERROR,
00245 ACE_TEXT("(%P|%t) ")
00246 ACE_TEXT("%CDataWriterImpl::dispose_w_timestamp, ")
00247 ACE_TEXT("The instance sample is not registered.\n"),
00248 TraitsType::type_name()),
00249 DDS::RETCODE_ERROR);
00250 }
00251 }
00252
00253 return OpenDDS::DCPS::DataWriterImpl::dispose(instance_handle,
00254 source_timestamp);
00255 }
00256
00257 virtual DDS::ReturnCode_t get_key_value (
00258 MessageType & key_holder,
00259 DDS::InstanceHandle_t handle)
00260 {
00261 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
00262 guard,
00263 get_lock (),
00264 DDS::RETCODE_ERROR);
00265
00266 typename InstanceMap::iterator const the_end = instance_map_.end ();
00267 for (typename InstanceMap::iterator it = instance_map_.begin ();
00268 it != the_end;
00269 ++it)
00270 {
00271 if (it->second == handle)
00272 {
00273 key_holder = it->first;
00274 return DDS::RETCODE_OK;
00275 }
00276 }
00277
00278 return DDS::RETCODE_BAD_PARAMETER;
00279 }
00280
00281 virtual DDS::InstanceHandle_t lookup_instance (
00282 const MessageType & instance_data)
00283 {
00284 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
00285 guard,
00286 get_lock (),
00287 DDS::RETCODE_ERROR);
00288
00289 typename InstanceMap::const_iterator const it = instance_map_.find(instance_data);
00290
00291 if (it == instance_map_.end())
00292 {
00293 return DDS::HANDLE_NIL;
00294 }
00295 else
00296 {
00297 return it->second;
00298 }
00299 }
00300
00301
00302
00303
00304
00305
00306 virtual DDS::ReturnCode_t enable_specific ()
00307 {
00308 if (MarshalTraitsType::gen_is_bounded_size ())
00309 {
00310 data_allocator_.reset(new DataAllocator (n_chunks_, marshaled_size_));
00311 if (::OpenDDS::DCPS::DCPS_debug_level >= 2)
00312 ACE_DEBUG((LM_DEBUG,
00313 ACE_TEXT("(%P|%t) %CDataWriterImpl::")
00314 ACE_TEXT("enable_specific-data")
00315 ACE_TEXT(" Dynamic_Cached_Allocator_With_Overflow %x ")
00316 ACE_TEXT("with %d chunks\n"),
00317 TraitsType::type_name(),
00318 data_allocator_.get(),
00319 n_chunks_));
00320 }
00321 else
00322 {
00323 if (::OpenDDS::DCPS::DCPS_debug_level >= 2)
00324 ACE_DEBUG((LM_DEBUG,
00325 ACE_TEXT("(%P|%t) %CDataWriterImpl::enable_specific")
00326 ACE_TEXT(" is unbounded data - allocate from heap\n"), TraitsType::type_name()));
00327 }
00328
00329 mb_allocator_.reset(
00330 new ::OpenDDS::DCPS::MessageBlockAllocator (
00331 n_chunks_ * association_chunk_multiplier_));
00332 db_allocator_.reset(new ::OpenDDS::DCPS::DataBlockAllocator (n_chunks_));
00333
00334 if (::OpenDDS::DCPS::DCPS_debug_level >= 2)
00335 {
00336 ACE_DEBUG((LM_DEBUG,
00337 ACE_TEXT("(%P|%t) %CDataWriterImpl::")
00338 ACE_TEXT("enable_specific-mb ")
00339 ACE_TEXT("Cached_Allocator_With_Overflow ")
00340 ACE_TEXT("%x with %d chunks\n"),
00341 TraitsType::type_name(),
00342 mb_allocator_.get(),
00343 n_chunks_ * association_chunk_multiplier_));
00344 ACE_DEBUG((LM_DEBUG,
00345 ACE_TEXT("(%P|%t) %CDataWriterImpl::")
00346 ACE_TEXT("enable_specific-db ")
00347 ACE_TEXT("Cached_Allocator_With_Overflow ")
00348 ACE_TEXT("%x with %d chunks\n"),
00349 TraitsType::type_name(),
00350 db_allocator_.get(),
00351 n_chunks_));
00352 }
00353
00354 return DDS::RETCODE_OK;
00355 }
00356
00357
00358
00359
00360 ACE_INLINE
00361 DataAllocator* data_allocator () const {
00362 return data_allocator_.get();
00363 };
00364
00365 private:
00366
00367
00368
00369
00370
00371
00372
00373
00374
00375 ACE_Message_Block* dds_marshal(
00376 const MessageType& instance_data,
00377 OpenDDS::DCPS::MarshalingType marshaling_type)
00378 {
00379 const bool cdr = this->cdr_encapsulation(), swap = this->swap_bytes();
00380
00381 Message_Block_Ptr mb;
00382 ACE_Message_Block* tmp_mb;
00383
00384 if (marshaling_type == OpenDDS::DCPS::KEY_ONLY_MARSHALING) {
00385
00386
00387
00388 OpenDDS::DCPS::KeyOnly<const MessageType > ko_instance_data(instance_data);
00389 size_t effective_size = 0, padding = 0;
00390 if (key_marshaled_size_) {
00391 effective_size = key_marshaled_size_;
00392 } else {
00393 if (cdr && !Serializer::use_rti_serialization()) {
00394 effective_size = cdr_header_size;
00395 }
00396 TraitsType::gen_find_size(ko_instance_data, effective_size, padding);
00397 if (cdr && Serializer::use_rti_serialization()) {
00398 effective_size += (cdr_header_size);
00399 }
00400 }
00401 if (cdr) {
00402 effective_size += padding;
00403 }
00404
00405 ACE_NEW_RETURN(tmp_mb, ACE_Message_Block(effective_size,
00406 ACE_Message_Block::MB_DATA,
00407 0,
00408 0,
00409 0,
00410 get_db_lock()), 0);
00411 mb.reset(tmp_mb);
00412 OpenDDS::DCPS::Serializer serializer(mb.get(), swap, cdr
00413 ? OpenDDS::DCPS::Serializer::ALIGN_CDR
00414 : OpenDDS::DCPS::Serializer::ALIGN_NONE);
00415 if (cdr) {
00416 serializer << ACE_OutputCDR::from_octet(0);
00417 serializer << ACE_OutputCDR::from_octet(swap ? !ACE_CDR_BYTE_ORDER : ACE_CDR_BYTE_ORDER);
00418 serializer << ACE_CDR::UShort(0);
00419 }
00420
00421
00422 if (cdr && Serializer::use_rti_serialization()) {
00423
00424 serializer.reset_alignment();
00425 }
00426 serializer << ko_instance_data;
00427 } else {
00428 size_t effective_size = 0, padding = 0;
00429 if (marshaled_size_) {
00430 effective_size = marshaled_size_;
00431 } else {
00432 if (cdr && !Serializer::use_rti_serialization()) {
00433 effective_size = cdr_header_size;
00434 }
00435 TraitsType::gen_find_size(instance_data, effective_size, padding);
00436 if (cdr && Serializer::use_rti_serialization()) {
00437 effective_size += (cdr_header_size);
00438 }
00439 }
00440 if (cdr) {
00441 effective_size += padding;
00442 }
00443
00444
00445 ACE_NEW_MALLOC_RETURN(tmp_mb,
00446 static_cast<ACE_Message_Block*>(
00447 mb_allocator_->malloc(
00448 sizeof(ACE_Message_Block))),
00449 ACE_Message_Block(
00450 effective_size,
00451 ACE_Message_Block::MB_DATA,
00452 0,
00453 0,
00454 data_allocator_.get(),
00455 get_db_lock(),
00456 ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
00457 ACE_Time_Value::zero,
00458 ACE_Time_Value::max_time,
00459 db_allocator_.get(),
00460 mb_allocator_.get()),
00461 0);
00462 mb.reset(tmp_mb);
00463 OpenDDS::DCPS::Serializer serializer(mb.get(), swap, cdr
00464 ? OpenDDS::DCPS::Serializer::ALIGN_CDR
00465 : OpenDDS::DCPS::Serializer::ALIGN_NONE);
00466 if (cdr) {
00467 serializer << ACE_OutputCDR::from_octet(0);
00468 serializer << ACE_OutputCDR::from_octet(swap ? !ACE_CDR_BYTE_ORDER : ACE_CDR_BYTE_ORDER);
00469 serializer << ACE_CDR::UShort(0);
00470 }
00471
00472
00473 if (cdr && Serializer::use_rti_serialization()) {
00474
00475 serializer.reset_alignment();
00476 }
00477
00478 if (! (serializer << instance_data)) {
00479 ACE_ERROR_RETURN((LM_ERROR,
00480 ACE_TEXT("(%P|%t) OpenDDS::DCPS::DataWriterImpl::dds_marshal(), ")
00481 ACE_TEXT("instance_data serialization error.\n")),
00482 0);
00483 }
00484 }
00485
00486 return mb.release();
00487 }
00488
00489
00490
00491
00492
00493
00494 DDS::ReturnCode_t get_or_create_instance_handle(
00495 DDS::InstanceHandle_t& handle,
00496 const MessageType& instance_data,
00497 const DDS::Time_t & source_timestamp)
00498 {
00499 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00500 guard,
00501 get_lock(),
00502 DDS::RETCODE_ERROR);
00503
00504 handle = DDS::HANDLE_NIL;
00505 typename InstanceMap::const_iterator it = instance_map_.find(instance_data);
00506
00507 bool needs_creation = true;
00508 bool needs_registration = true;
00509
00510 if (it != instance_map_.end())
00511 {
00512 needs_creation = false;
00513
00514 handle = it->second;
00515 OpenDDS::DCPS::PublicationInstance_rch instance = get_handle_instance(handle);
00516
00517 if (instance->unregistered_ == false)
00518 {
00519 needs_registration = false;
00520 }
00521
00522 }
00523
00524 if (needs_registration)
00525 {
00526
00527 Message_Block_Ptr marshalled(
00528 this->dds_marshal(instance_data,
00529 OpenDDS::DCPS::KEY_ONLY_MARSHALING));
00530
00531
00532 DDS::ReturnCode_t ret = register_instance_i(handle, move(marshalled), source_timestamp);
00533
00534
00535
00536 if (ret != DDS::RETCODE_OK)
00537 {
00538 handle = DDS::HANDLE_NIL;
00539 return ret;
00540 }
00541
00542 if (needs_creation)
00543 {
00544 std::pair<typename InstanceMap::iterator, bool> pair =
00545 instance_map_.insert(typename InstanceMap::value_type(instance_data, handle));
00546
00547 if (pair.second == false)
00548 {
00549 handle = DDS::HANDLE_NIL;
00550 ACE_ERROR_RETURN ((LM_ERROR,
00551 ACE_TEXT("(%P|%t) ")
00552 ACE_TEXT("%CDataWriterImpl::")
00553 ACE_TEXT("get_or_create_instance_handle, ")
00554 ACE_TEXT("insert %C failed. \n"),
00555 TraitsType::type_name(), TraitsType::type_name()),
00556 DDS::RETCODE_ERROR);
00557 }
00558 }
00559
00560 send_all_to_flush_control(guard);
00561
00562 }
00563
00564 return DDS::RETCODE_OK;
00565 }
00566
00567 InstanceMap instance_map_;
00568 size_t marshaled_size_;
00569 size_t key_marshaled_size_;
00570 unique_ptr<DataAllocator> data_allocator_;
00571 unique_ptr<MessageBlockAllocator> mb_allocator_;
00572 unique_ptr<DataBlockAllocator> db_allocator_;
00573
00574
00575
00576 friend class ::DDS_TEST;
00577 };
00578
00579 }
00580 }
00581
00582 OPENDDS_END_VERSIONED_NAMESPACE_DECL
00583
00584 #endif