OpenDDS::DCPS::DataWriterImpl_T< MessageType > Class Template Reference

#include <DataWriterImpl_T.h>

Inheritance diagram for OpenDDS::DCPS::DataWriterImpl_T< MessageType >:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::DataWriterImpl_T< MessageType >:
Collaboration graph
[legend]

List of all members.

Public Types

enum  { cdr_header_size = 4 }
typedef DDSTraits< MessageType > TraitsType
typedef MarshalTraits
< MessageType > 
MarshalTraitsType
typedef
::OpenDDS::DCPS::Dynamic_Cached_Allocator_With_Overflow
< ACE_Thread_Mutex
DataAllocator

Public Member Functions

typedef OPENDDS_MAP_CMP_T (MessageType, DDS::InstanceHandle_t, typename TraitsType::LessThanType) InstanceMap
 DataWriterImpl_T (void)
virtual ~DataWriterImpl_T (void)
virtual DDS::InstanceHandle_t register_instance (const MessageType &instance)
virtual DDS::InstanceHandle_t register_instance_w_timestamp (const MessageType &instance, const DDS::Time_t &timestamp)
virtual DDS::ReturnCode_t unregister_instance (const MessageType &instance, DDS::InstanceHandle_t handle)
virtual DDS::ReturnCode_t unregister_instance_w_timestamp (const MessageType &instance, DDS::InstanceHandle_t handle, const DDS::Time_t &timestamp)
virtual DDS::ReturnCode_t write (const MessageType &instance_data, DDS::InstanceHandle_t handle)
virtual DDS::ReturnCode_t write_w_timestamp (const MessageType &instance_data, DDS::InstanceHandle_t handle, const DDS::Time_t &source_timestamp)
virtual DDS::ReturnCode_t dispose (const MessageType &instance_data, DDS::InstanceHandle_t instance_handle)
virtual DDS::ReturnCode_t dispose_w_timestamp (const MessageType &instance_data, DDS::InstanceHandle_t instance_handle, const DDS::Time_t &source_timestamp)
virtual DDS::ReturnCode_t get_key_value (MessageType &key_holder, DDS::InstanceHandle_t handle)
virtual DDS::InstanceHandle_t lookup_instance (const MessageType &instance_data)
virtual DDS::ReturnCode_t enable_specific ()
ACE_INLINE DataAllocatordata_allocator () const

Private Member Functions

ACE_Message_Blockdds_marshal (const MessageType &instance_data, OpenDDS::DCPS::MarshalingType marshaling_type)
DDS::ReturnCode_t get_or_create_instance_handle (DDS::InstanceHandle_t &handle, const MessageType &instance_data, const DDS::Time_t &source_timestamp)

Private Attributes

InstanceMap instance_map_
size_t marshaled_size_
size_t key_marshaled_size_
unique_ptr< DataAllocatordata_allocator_
unique_ptr< MessageBlockAllocatormb_allocator_
 The message block allocator.
unique_ptr< DataBlockAllocatordb_allocator_
 The data block allocator.

Friends

class ::DDS_TEST

Detailed Description

template<typename MessageType>
class OpenDDS::DCPS::DataWriterImpl_T< MessageType >

Servant for DataWriter interface of the Traits::MessageType data type.

See the DDS specification, OMG formal/04-12-02, for a description of this interface.

Definition at line 22 of file DataWriterImpl_T.h.


Member Typedef Documentation

Definition at line 36 of file DataWriterImpl_T.h.

template<typename MessageType >
typedef MarshalTraits<MessageType> OpenDDS::DCPS::DataWriterImpl_T< MessageType >::MarshalTraitsType

Definition at line 32 of file DataWriterImpl_T.h.

template<typename MessageType >
typedef DDSTraits<MessageType> OpenDDS::DCPS::DataWriterImpl_T< MessageType >::TraitsType

Definition at line 31 of file DataWriterImpl_T.h.


Member Enumeration Documentation

template<typename MessageType >
anonymous enum
Enumerator:
cdr_header_size 

Definition at line 38 of file DataWriterImpl_T.h.

00038          {
00039       cdr_header_size = 4
00040     };


Constructor & Destructor Documentation

template<typename MessageType >
OpenDDS::DCPS::DataWriterImpl_T< MessageType >::DataWriterImpl_T ( void   )  [inline]

Definition at line 42 of file DataWriterImpl_T.h.

References OpenDDS::DCPS::gen_max_marshaled_size().

00043       : marshaled_size_ (0)
00044       , key_marshaled_size_ (0)
00045     {
00046       MessageType data;
00047       if (MarshalTraitsType::gen_is_bounded_size()) {
00048         marshaled_size_ = 8 + TraitsType::gen_max_marshaled_size(data, true);
00049         // worst case: CDR encapsulation (4 bytes) + Padding for alignment (4 bytes)
00050       } else {
00051         marshaled_size_ = 0; // should use gen_find_size when marshaling
00052       }
00053       if (MarshalTraitsType::gen_is_bounded_key_size()) {
00054         OpenDDS::DCPS::KeyOnly<const MessageType > ko(data);
00055         key_marshaled_size_ = 8 + TraitsType::gen_max_marshaled_size(ko, true);
00056         // worst case: CDR Encapsulation (4 bytes) + Padding for alignment (4 bytes)
00057       } else {
00058         key_marshaled_size_ = 0; // should use gen_find_size when marshaling
00059       }
00060     }

Here is the call graph for this function:

template<typename MessageType >
virtual OpenDDS::DCPS::DataWriterImpl_T< MessageType >::~DataWriterImpl_T ( void   )  [inline, virtual]

Definition at line 62 of file DataWriterImpl_T.h.

00063     {
00064     }


Member Function Documentation

template<typename MessageType >
ACE_INLINE DataAllocator* OpenDDS::DCPS::DataWriterImpl_T< MessageType >::data_allocator (  )  const [inline]

Accessor to the marshalled data sample allocator.

Definition at line 361 of file DataWriterImpl_T.h.

00361                                           {
00362     return data_allocator_.get();
00363   };

template<typename MessageType >
ACE_Message_Block* OpenDDS::DCPS::DataWriterImpl_T< MessageType >::dds_marshal ( const MessageType &  instance_data,
OpenDDS::DCPS::MarshalingType  marshaling_type 
) [inline, private]

Serialize the instance data.

Parameters:
instance_data The data to serialize.
marshaling_type Enumerated type specifying whether to marshal just the keys or the entire message.
Returns:
returns the serialized data.

Definition at line 375 of file DataWriterImpl_T.h.

References ACE_NEW_RETURN(), ACE_TEXT(), OpenDDS::DCPS::Serializer::ALIGN_CDR, OpenDDS::DCPS::Serializer::ALIGN_NONE, OpenDDS::DCPS::gen_find_size(), OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), OpenDDS::DCPS::KEY_ONLY_MARSHALING, LM_ERROR, ACE_Time_Value::max_time, ACE_Message_Block::MB_DATA, OpenDDS::DCPS::unique_ptr< T, Deleter >::release(), OpenDDS::DCPS::unique_ptr< T, Deleter >::reset(), OpenDDS::DCPS::swap(), OpenDDS::DCPS::Serializer::use_rti_serialization(), and ACE_Time_Value::zero.

00378     {
00379       const bool cdr = this->cdr_encapsulation(), swap = this->swap_bytes();
00380 
00381       Message_Block_Ptr mb;
00382       ACE_Message_Block* tmp_mb;
00383 
00384       if (marshaling_type == OpenDDS::DCPS::KEY_ONLY_MARSHALING) {
00385         // Don't use the cached allocator for the registered sample message
00386         // block.
00387 
00388         OpenDDS::DCPS::KeyOnly<const MessageType > ko_instance_data(instance_data);
00389         size_t effective_size = 0, padding = 0;
00390         if (key_marshaled_size_) {
00391           effective_size = key_marshaled_size_;
00392         } else {
00393           if (cdr && !Serializer::use_rti_serialization()) {
00394             effective_size = cdr_header_size; // CDR encapsulation
00395           }
00396           TraitsType::gen_find_size(ko_instance_data, effective_size, padding);
00397           if (cdr && Serializer::use_rti_serialization()) {
00398             effective_size += (cdr_header_size);
00399           }
00400         }
00401         if (cdr) {
00402           effective_size += padding;
00403         }
00404 
00405         ACE_NEW_RETURN(tmp_mb, ACE_Message_Block(effective_size,
00406                                              ACE_Message_Block::MB_DATA,
00407                                              0, //cont
00408                                              0, //data
00409                                              0, //alloc_strategy
00410                                              get_db_lock()), 0);
00411         mb.reset(tmp_mb);
00412         OpenDDS::DCPS::Serializer serializer(mb.get(), swap, cdr
00413                                              ? OpenDDS::DCPS::Serializer::ALIGN_CDR
00414                                              : OpenDDS::DCPS::Serializer::ALIGN_NONE);
00415         if (cdr) {
00416           serializer << ACE_OutputCDR::from_octet(0);
00417           serializer << ACE_OutputCDR::from_octet(swap ? !ACE_CDR_BYTE_ORDER : ACE_CDR_BYTE_ORDER);
00418           serializer << ACE_CDR::UShort(0);
00419         }
00420         // If this is RTI serialization, start counting byte offset AFTER
00421         // the header
00422         if (cdr && Serializer::use_rti_serialization()) {
00423           // Start counting byte-offset AFTER header
00424           serializer.reset_alignment();
00425         }
00426         serializer << ko_instance_data;
00427       } else { // OpenDDS::DCPS::FULL_MARSHALING
00428         size_t effective_size = 0, padding = 0;
00429         if (marshaled_size_) {
00430           effective_size = marshaled_size_;
00431         } else {
00432           if (cdr && !Serializer::use_rti_serialization()) {
00433             effective_size = cdr_header_size; // CDR encapsulation
00434           }
00435           TraitsType::gen_find_size(instance_data, effective_size, padding);
00436           if (cdr && Serializer::use_rti_serialization()) {
00437             effective_size += (cdr_header_size);
00438           }
00439         }
00440         if (cdr) {
00441           effective_size += padding;
00442         }
00443 
00444 
00445         ACE_NEW_MALLOC_RETURN(tmp_mb,
00446                               static_cast<ACE_Message_Block*>(
00447                                                               mb_allocator_->malloc(
00448                                                                                     sizeof(ACE_Message_Block))),
00449                               ACE_Message_Block(
00450                                                 effective_size,
00451                                                 ACE_Message_Block::MB_DATA,
00452                                                 0, //cont
00453                                                 0, //data
00454                                                 data_allocator_.get(), //allocator_strategy
00455                                                 get_db_lock(), //data block locking_strategy
00456                                                 ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
00457                                                 ACE_Time_Value::zero,
00458                                                 ACE_Time_Value::max_time,
00459                                                 db_allocator_.get(),
00460                                                 mb_allocator_.get()),
00461                               0);
00462         mb.reset(tmp_mb);
00463         OpenDDS::DCPS::Serializer serializer(mb.get(), swap, cdr
00464                                              ? OpenDDS::DCPS::Serializer::ALIGN_CDR
00465                                              : OpenDDS::DCPS::Serializer::ALIGN_NONE);
00466         if (cdr) {
00467           serializer << ACE_OutputCDR::from_octet(0);
00468           serializer << ACE_OutputCDR::from_octet(swap ? !ACE_CDR_BYTE_ORDER : ACE_CDR_BYTE_ORDER);
00469           serializer << ACE_CDR::UShort(0);
00470         }
00471         // If this is RTI serialization, start counting byte offset AFTER
00472         // the header
00473         if (cdr && Serializer::use_rti_serialization()) {
00474           // Start counting byte-offset AFTER header
00475           serializer.reset_alignment();
00476         }
00477 
00478         if (! (serializer << instance_data)) {
00479           ACE_ERROR_RETURN((LM_ERROR,
00480             ACE_TEXT("(%P|%t) OpenDDS::DCPS::DataWriterImpl::dds_marshal(), ")
00481             ACE_TEXT("instance_data serialization error.\n")),
00482             0);
00483         }
00484       }
00485 
00486       return mb.release();
00487     }

Here is the call graph for this function:

template<typename MessageType >
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl_T< MessageType >::dispose ( const MessageType &  instance_data,
DDS::InstanceHandle_t  instance_handle 
) [inline, virtual]

Definition at line 223 of file DataWriterImpl_T.h.

References dispose_w_timestamp(), ACE_OS::gettimeofday(), and OpenDDS::DCPS::time_value_to_time().

00226     {
00227       DDS::Time_t const source_timestamp =
00228         ::OpenDDS::DCPS::time_value_to_time (ACE_OS::gettimeofday ());
00229       return dispose_w_timestamp (instance_data,
00230                                   instance_handle,
00231                                   source_timestamp);
00232     }

Here is the call graph for this function:

template<typename MessageType >
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl_T< MessageType >::dispose_w_timestamp ( const MessageType &  instance_data,
DDS::InstanceHandle_t  instance_handle,
const DDS::Time_t source_timestamp 
) [inline, virtual]

Definition at line 234 of file DataWriterImpl_T.h.

References ACE_TEXT(), dispose(), DDS::HANDLE_NIL, LM_ERROR, lookup_instance(), and DDS::RETCODE_ERROR.

00238     {
00239       if(instance_handle == DDS::HANDLE_NIL)
00240         {
00241           instance_handle = this->lookup_instance(instance_data);
00242           if (instance_handle == DDS::HANDLE_NIL)
00243             {
00244               ACE_ERROR_RETURN ((LM_ERROR,
00245                                  ACE_TEXT("(%P|%t) ")
00246                                  ACE_TEXT("%CDataWriterImpl::dispose_w_timestamp, ")
00247                                  ACE_TEXT("The instance sample is not registered.\n"),
00248                                  TraitsType::type_name()),
00249                                 DDS::RETCODE_ERROR);
00250             }
00251         }
00252 
00253       return OpenDDS::DCPS::DataWriterImpl::dispose(instance_handle,
00254                                                     source_timestamp);
00255     }

Here is the call graph for this function:

template<typename MessageType >
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl_T< MessageType >::enable_specific (  )  [inline, virtual]

Do parts of enable specific to the datatype. Called by DataWriterImpl::enable().

Implements OpenDDS::DCPS::DataWriterImpl.

Definition at line 306 of file DataWriterImpl_T.h.

References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, LM_DEBUG, and DDS::RETCODE_OK.

00307     {
00308       if (MarshalTraitsType::gen_is_bounded_size ())
00309         {
00310           data_allocator_.reset(new DataAllocator (n_chunks_, marshaled_size_));
00311           if (::OpenDDS::DCPS::DCPS_debug_level >= 2)
00312             ACE_DEBUG((LM_DEBUG,
00313                        ACE_TEXT("(%P|%t) %CDataWriterImpl::")
00314                        ACE_TEXT("enable_specific-data")
00315                        ACE_TEXT(" Dynamic_Cached_Allocator_With_Overflow %x ")
00316                        ACE_TEXT("with %d chunks\n"),
00317                        TraitsType::type_name(),
00318                        data_allocator_.get(),
00319                        n_chunks_));
00320         }
00321       else
00322         {
00323           if (::OpenDDS::DCPS::DCPS_debug_level >= 2)
00324             ACE_DEBUG((LM_DEBUG,
00325                        ACE_TEXT("(%P|%t) %CDataWriterImpl::enable_specific")
00326                        ACE_TEXT(" is unbounded data - allocate from heap\n"), TraitsType::type_name()));
00327         }
00328 
00329       mb_allocator_.reset(
00330         new ::OpenDDS::DCPS::MessageBlockAllocator (
00331                                                     n_chunks_ * association_chunk_multiplier_));
00332       db_allocator_.reset(new ::OpenDDS::DCPS::DataBlockAllocator (n_chunks_));
00333 
00334       if (::OpenDDS::DCPS::DCPS_debug_level >= 2)
00335         {
00336           ACE_DEBUG((LM_DEBUG,
00337                      ACE_TEXT("(%P|%t) %CDataWriterImpl::")
00338                      ACE_TEXT("enable_specific-mb ")
00339                      ACE_TEXT("Cached_Allocator_With_Overflow ")
00340                      ACE_TEXT("%x with %d chunks\n"),
00341                      TraitsType::type_name(),
00342                      mb_allocator_.get(),
00343                      n_chunks_ * association_chunk_multiplier_));
00344           ACE_DEBUG((LM_DEBUG,
00345                      ACE_TEXT("(%P|%t) %CDataWriterImpl::")
00346                      ACE_TEXT("enable_specific-db ")
00347                      ACE_TEXT("Cached_Allocator_With_Overflow ")
00348                      ACE_TEXT("%x with %d chunks\n"),
00349                      TraitsType::type_name(),
00350                      db_allocator_.get(),
00351                      n_chunks_));
00352         }
00353 
00354       return DDS::RETCODE_OK;
00355     }

Here is the call graph for this function:

template<typename MessageType >
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl_T< MessageType >::get_key_value ( MessageType &  key_holder,
DDS::InstanceHandle_t  handle 
) [inline, virtual]

Definition at line 257 of file DataWriterImpl_T.h.

References DDS::RETCODE_BAD_PARAMETER, DDS::RETCODE_ERROR, and DDS::RETCODE_OK.

00260     {
00261       ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
00262                         guard,
00263                         get_lock (),
00264                         DDS::RETCODE_ERROR);
00265 
00266       typename InstanceMap::iterator const the_end = instance_map_.end ();
00267       for (typename InstanceMap::iterator it = instance_map_.begin ();
00268            it != the_end;
00269            ++it)
00270         {
00271           if (it->second == handle)
00272             {
00273               key_holder = it->first;
00274               return DDS::RETCODE_OK;
00275             }
00276         }
00277 
00278       return DDS::RETCODE_BAD_PARAMETER;
00279     }

template<typename MessageType >
DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl_T< MessageType >::get_or_create_instance_handle ( DDS::InstanceHandle_t handle,
const MessageType &  instance_data,
const DDS::Time_t source_timestamp 
) [inline, private]

Find the instance handle for the given instance_data using the data type's key(s). If the instance does not already exist create a new instance handle for it.

Definition at line 494 of file DataWriterImpl_T.h.

References ACE_TEXT(), DDS::HANDLE_NIL, OpenDDS::DCPS::KEY_ONLY_MARSHALING, LM_ERROR, OpenDDS::DCPS::move(), DDS::RETCODE_ERROR, and DDS::RETCODE_OK.

00498     {
00499       ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00500                        guard,
00501                        get_lock(),
00502                        DDS::RETCODE_ERROR);
00503 
00504       handle = DDS::HANDLE_NIL;
00505       typename InstanceMap::const_iterator it = instance_map_.find(instance_data);
00506 
00507       bool needs_creation = true;
00508       bool needs_registration = true;
00509 
00510       if (it != instance_map_.end())
00511         {
00512           needs_creation = false;
00513 
00514           handle = it->second;
00515           OpenDDS::DCPS::PublicationInstance_rch instance = get_handle_instance(handle);
00516 
00517           if (instance->unregistered_ == false)
00518             {
00519               needs_registration = false;
00520             }
00521           // else: The instance is unregistered and now register again.
00522         }
00523 
00524       if (needs_registration)
00525         {
00526           // don't use fast allocator for registration.
00527           Message_Block_Ptr marshalled(
00528             this->dds_marshal(instance_data,
00529                               OpenDDS::DCPS::KEY_ONLY_MARSHALING));
00530 
00531           // tell DataWriterLocal and Publisher about the instance.
00532           DDS::ReturnCode_t ret = register_instance_i(handle, move(marshalled), source_timestamp);
00533           // note: the WriteDataContainer/PublicationInstance maintains ownership
00534           // of the marshalled sample.
00535 
00536           if (ret != DDS::RETCODE_OK)
00537             {
00538               handle = DDS::HANDLE_NIL;
00539               return ret;
00540             }
00541 
00542           if (needs_creation)
00543             {
00544               std::pair<typename InstanceMap::iterator, bool> pair =
00545                 instance_map_.insert(typename InstanceMap::value_type(instance_data, handle));
00546 
00547               if (pair.second == false)
00548                 {
00549                   handle = DDS::HANDLE_NIL;
00550                   ACE_ERROR_RETURN ((LM_ERROR,
00551                                      ACE_TEXT("(%P|%t) ")
00552                                      ACE_TEXT("%CDataWriterImpl::")
00553                                      ACE_TEXT("get_or_create_instance_handle, ")
00554                                      ACE_TEXT("insert %C failed. \n"),
00555                                      TraitsType::type_name(), TraitsType::type_name()),
00556                                     DDS::RETCODE_ERROR);
00557                 }
00558             } // end of if (needs_creation)
00559 
00560           send_all_to_flush_control(guard);
00561 
00562         } // end of if (needs_registration)
00563 
00564       return DDS::RETCODE_OK;
00565     }

Here is the call graph for this function:

template<typename MessageType >
virtual DDS::InstanceHandle_t OpenDDS::DCPS::DataWriterImpl_T< MessageType >::lookup_instance ( const MessageType &  instance_data  )  [inline, virtual]

Definition at line 281 of file DataWriterImpl_T.h.

References DDS::HANDLE_NIL, and DDS::RETCODE_ERROR.

00283     {
00284       ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
00285                         guard,
00286                         get_lock (),
00287                         DDS::RETCODE_ERROR);
00288 
00289       typename InstanceMap::const_iterator const it = instance_map_.find(instance_data);
00290 
00291       if (it == instance_map_.end())
00292         {
00293           return DDS::HANDLE_NIL;
00294         }
00295       else
00296         {
00297           return it->second;
00298         }
00299     }

template<typename MessageType >
typedef OpenDDS::DCPS::DataWriterImpl_T< MessageType >::OPENDDS_MAP_CMP_T ( MessageType  ,
DDS::InstanceHandle_t  ,
typename TraitsType::LessThanType   
)
template<typename MessageType >
virtual DDS::InstanceHandle_t OpenDDS::DCPS::DataWriterImpl_T< MessageType >::register_instance ( const MessageType &  instance  )  [inline, virtual]

Definition at line 66 of file DataWriterImpl_T.h.

References ACE_OS::gettimeofday(), register_instance_w_timestamp(), OpenDDS::DCPS::time_value_to_time(), and timestamp().

00068     {
00069       DDS::Time_t const timestamp =
00070         ::OpenDDS::DCPS::time_value_to_time (ACE_OS::gettimeofday ());
00071       return register_instance_w_timestamp (instance, timestamp);
00072     }

Here is the call graph for this function:

template<typename MessageType >
virtual DDS::InstanceHandle_t OpenDDS::DCPS::DataWriterImpl_T< MessageType >::register_instance_w_timestamp ( const MessageType &  instance,
const DDS::Time_t timestamp 
) [inline, virtual]

Definition at line 74 of file DataWriterImpl_T.h.

References ACE_TEXT(), DDS::HANDLE_NIL, LM_ERROR, and DDS::RETCODE_OK.

00077     {
00078       DDS::InstanceHandle_t registered_handle = DDS::HANDLE_NIL;
00079 
00080       DDS::ReturnCode_t const ret
00081           = this->get_or_create_instance_handle(registered_handle,
00082                                                 instance,
00083                                                 timestamp);
00084       if (ret != DDS::RETCODE_OK)
00085         {
00086           ACE_ERROR ((LM_ERROR,
00087                       ACE_TEXT("(%P|%t) ")
00088                       ACE_TEXT("%CDataWriterImpl::")
00089                       ACE_TEXT("register_instance_w_timestamp, ")
00090                       ACE_TEXT("register failed error=%d.\n"),
00091                       TraitsType::type_name(),
00092                       ret));
00093         }
00094 
00095       return registered_handle;
00096     }

Here is the call graph for this function:

template<typename MessageType >
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl_T< MessageType >::unregister_instance ( const MessageType &  instance,
DDS::InstanceHandle_t  handle 
) [inline, virtual]

Definition at line 98 of file DataWriterImpl_T.h.

References ACE_OS::gettimeofday(), OpenDDS::DCPS::time_value_to_time(), timestamp(), and unregister_instance_w_timestamp().

00101     {
00102       DDS::Time_t const timestamp =
00103         ::OpenDDS::DCPS::time_value_to_time (ACE_OS::gettimeofday ());
00104 
00105       return unregister_instance_w_timestamp (instance,
00106                                               handle,
00107                                               timestamp);
00108     }

Here is the call graph for this function:

template<typename MessageType >
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl_T< MessageType >::unregister_instance_w_timestamp ( const MessageType &  instance,
DDS::InstanceHandle_t  handle,
const DDS::Time_t timestamp 
) [inline, virtual]

Definition at line 110 of file DataWriterImpl_T.h.

References ACE_TEXT(), DDS::HANDLE_NIL, LM_ERROR, lookup_instance(), DDS::RETCODE_ERROR, and OpenDDS::DCPS::DataWriterImpl::unregister_instance_i().

00114     {
00115       DDS::InstanceHandle_t const registered_handle =
00116         this->lookup_instance(instance);
00117 
00118       if (registered_handle == DDS::HANDLE_NIL)
00119         {
00120           // This case could be the instance is not registered yet or
00121           // already unregistered.
00122           ACE_ERROR_RETURN ((LM_ERROR,
00123                              ACE_TEXT("(%P|%t) ")
00124                              ACE_TEXT("%CDataWriterImpl::")
00125                              ACE_TEXT("unregister_instance_w_timestamp, ")
00126                              ACE_TEXT("The instance is not registered.\n"),
00127                              TraitsType::type_name()),
00128                             DDS::RETCODE_ERROR);
00129         }
00130       else if (handle != DDS::HANDLE_NIL && handle != registered_handle)
00131         {
00132           ACE_ERROR_RETURN ((LM_ERROR,
00133                              ACE_TEXT("(%P|%t) ")
00134                              ACE_TEXT("%CDataWriterImpl::")
00135                              ACE_TEXT("unregister_w_timestamp, ")
00136                              ACE_TEXT("The given handle=%X is different from ")
00137                              ACE_TEXT("registered handle=%X.\n"),
00138                              TraitsType::type_name(),
00139                              handle, registered_handle),
00140                             DDS::RETCODE_ERROR);
00141         }
00142 
00143       // DataWriterImpl::unregister_instance_i will call back to inform the
00144       // DataWriter.
00145       // That the instance handle is removed from there and hence
00146       // DataWriter can remove the instance here.
00147       return OpenDDS::DCPS::DataWriterImpl::unregister_instance_i(handle, timestamp);
00148     }

Here is the call graph for this function:

template<typename MessageType >
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl_T< MessageType >::write ( const MessageType &  instance_data,
DDS::InstanceHandle_t  handle 
) [inline, virtual]

Definition at line 153 of file DataWriterImpl_T.h.

References ACE_OS::gettimeofday(), OpenDDS::DCPS::time_value_to_time(), and write_w_timestamp().

00156     {
00157       DDS::Time_t const source_timestamp =
00158         ::OpenDDS::DCPS::time_value_to_time (ACE_OS::gettimeofday ());
00159       return write_w_timestamp (instance_data,
00160                                 handle,
00161                                 source_timestamp);
00162     }

Here is the call graph for this function:

template<typename MessageType >
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl_T< MessageType >::write_w_timestamp ( const MessageType &  instance_data,
DDS::InstanceHandle_t  handle,
const DDS::Time_t source_timestamp 
) [inline, virtual]

Definition at line 168 of file DataWriterImpl_T.h.

References ACE_TEXT(), OpenDDS::DCPS::DataWriterImpl::ReaderInfo::eval_, OpenDDS::DCPS::DataWriterImpl::ReaderInfo::expression_params_, OpenDDS::DCPS::FULL_MARSHALING, DDS::HANDLE_NIL, OpenDDS::DCPS::RcHandle< T >::is_nil(), LM_ERROR, OpenDDS::DCPS::move(), OpenDDS::DCPS::push_back(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, TheServiceParticipant, and write().

00172     {
00173       //  This operation assumes the provided handle is valid. The handle
00174       //  provided will not be verified.
00175 
00176       if (handle == DDS::HANDLE_NIL) {
00177         DDS::InstanceHandle_t registered_handle = DDS::HANDLE_NIL;
00178         DDS::ReturnCode_t ret
00179             = this->get_or_create_instance_handle(registered_handle,
00180                                                   instance_data,
00181                                                   source_timestamp);
00182         if (ret != DDS::RETCODE_OK) {
00183           ACE_ERROR_RETURN((LM_ERROR,
00184                             ACE_TEXT("(%P|%t) ")
00185                             ACE_TEXT("%CDataWriterImpl::write_w_timestamp, ")
00186                             ACE_TEXT("register failed err=%d.\n"),
00187                             TraitsType::type_name(),
00188                             ret),
00189                            ret);
00190         }
00191 
00192         handle = registered_handle;
00193       }
00194 
00195       // list of reader RepoIds that should not get data
00196       OpenDDS::DCPS::GUIDSeq_var filter_out;
00197 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00198       if (TheServiceParticipant->publisher_content_filter()) {
00199         ACE_GUARD_RETURN(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_, DDS::RETCODE_ERROR);
00200         for (RepoIdToReaderInfoMap::iterator iter = reader_info_.begin(),
00201                end = reader_info_.end(); iter != end; ++iter) {
00202           const ReaderInfo& ri = iter->second;
00203           if (!ri.eval_.is_nil()) {
00204             if (!filter_out.ptr()) {
00205               filter_out = new OpenDDS::DCPS::GUIDSeq;
00206             }
00207             if (!ri.eval_->eval(instance_data, ri.expression_params_)) {
00208               push_back(filter_out.inout(), iter->first);
00209             }
00210           }
00211         }
00212       }
00213 #endif
00214 
00215       Message_Block_Ptr marshalled(
00216         dds_marshal (instance_data, OpenDDS::DCPS::FULL_MARSHALING));
00217 
00218       return OpenDDS::DCPS::DataWriterImpl::write(move(marshalled), handle,
00219                                                   source_timestamp,
00220                                                   filter_out._retn());
00221     }

Here is the call graph for this function:


Friends And Related Function Documentation

template<typename MessageType >
friend class ::DDS_TEST [friend]

Reimplemented from OpenDDS::DCPS::DataWriterImpl.

Definition at line 576 of file DataWriterImpl_T.h.


Member Data Documentation

template<typename MessageType >
unique_ptr<DataAllocator> OpenDDS::DCPS::DataWriterImpl_T< MessageType >::data_allocator_ [private]

Definition at line 570 of file DataWriterImpl_T.h.

template<typename MessageType >
unique_ptr<DataBlockAllocator> OpenDDS::DCPS::DataWriterImpl_T< MessageType >::db_allocator_ [private]

The data block allocator.

Reimplemented from OpenDDS::DCPS::DataWriterImpl.

Definition at line 572 of file DataWriterImpl_T.h.

template<typename MessageType >
InstanceMap OpenDDS::DCPS::DataWriterImpl_T< MessageType >::instance_map_ [private]

Definition at line 567 of file DataWriterImpl_T.h.

template<typename MessageType >
size_t OpenDDS::DCPS::DataWriterImpl_T< MessageType >::key_marshaled_size_ [private]

Definition at line 569 of file DataWriterImpl_T.h.

template<typename MessageType >
size_t OpenDDS::DCPS::DataWriterImpl_T< MessageType >::marshaled_size_ [private]

Definition at line 568 of file DataWriterImpl_T.h.

template<typename MessageType >
unique_ptr<MessageBlockAllocator> OpenDDS::DCPS::DataWriterImpl_T< MessageType >::mb_allocator_ [private]

The message block allocator.

Todo:
The publication_lost_status_ and publication_reconnecting_status_ are left here for future use when we add get_publication_lost_status() and get_publication_reconnecting_status() methods.

Reimplemented from OpenDDS::DCPS::DataWriterImpl.

Definition at line 571 of file DataWriterImpl_T.h.


The documentation for this class was generated from the following file:
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1