OpenDDS::DCPS::DataWriterImpl_T< MessageType > Class Template Reference

#include <DataWriterImpl_T.h>

Inheritance diagram for OpenDDS::DCPS::DataWriterImpl_T< MessageType >:

Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::DataWriterImpl_T< MessageType >:

Collaboration graph
[legend]
List of all members.

Public Types

typedef DDSTraits< MessageType > TraitsType
typedef ::OpenDDS::DCPS::Dynamic_Cached_Allocator_With_Overflow<
ACE_Thread_Mutex > 
DataAllocator
 cdr_header_size = 4
enum  { cdr_header_size = 4 }

Public Member Functions

typedef OPENDDS_MAP_CMP (MessageType,::DDS::InstanceHandle_t, typename TraitsType::LessThanType) InstanceMap
 DataWriterImpl_T (void)
 Constructor.
virtual ~DataWriterImpl_T (void)
 Destructor.
virtual ::DDS::InstanceHandle_t register_instance (const MessageType &instance)
virtual ::DDS::InstanceHandle_t register_instance_w_timestamp (const MessageType &instance, const ::DDS::Time_t &timestamp)
virtual ::DDS::ReturnCode_t unregister_instance (const MessageType &instance,::DDS::InstanceHandle_t handle)
virtual ::DDS::ReturnCode_t unregister_instance_w_timestamp (const MessageType &instance,::DDS::InstanceHandle_t handle, const ::DDS::Time_t &timestamp)
virtual ::DDS::ReturnCode_t write (const MessageType &instance_data,::DDS::InstanceHandle_t handle)
virtual ::DDS::ReturnCode_t write_w_timestamp (const MessageType &instance_data,::DDS::InstanceHandle_t handle, const ::DDS::Time_t &source_timestamp)
virtual ::DDS::ReturnCode_t dispose (const MessageType &instance_data,::DDS::InstanceHandle_t instance_handle)
virtual ::DDS::ReturnCode_t dispose_w_timestamp (const MessageType &instance_data,::DDS::InstanceHandle_t instance_handle, const ::DDS::Time_t &source_timestamp)
virtual ::DDS::ReturnCode_t get_key_value (MessageType &key_holder,::DDS::InstanceHandle_t handle)
virtual ::DDS::InstanceHandle_t lookup_instance (const MessageType &instance_data)
virtual void init (::DDS::Topic_ptr topic, OpenDDS::DCPS::TopicImpl *topic_servant, const ::DDS::DataWriterQos &qos,::DDS::DataWriterListener_ptr a_listener, const ::DDS::StatusMask &mask, OpenDDS::DCPS::DomainParticipantImpl *participant_servant, OpenDDS::DCPS::PublisherImpl *publisher_servant,::DDS::DataWriter_ptr dw_objref)
virtual ::DDS::ReturnCode_t enable_specific ()
virtual void unregistered (::DDS::InstanceHandle_t instance_handle)
ACE_INLINE DataAllocatordata_allocator () const

Private Member Functions

ACE_Message_Block * dds_marshal (const MessageType &instance_data, OpenDDS::DCPS::MarshalingType marshaling_type)
::DDS::ReturnCode_t get_or_create_instance_handle (::DDS::InstanceHandle_t &handle, const MessageType &instance_data, const ::DDS::Time_t &source_timestamp)

Private Attributes

InstanceMap instance_map_
size_t marshaled_size_
size_t key_marshaled_size_
DataAllocatordata_allocator_
::OpenDDS::DCPS::MessageBlockAllocatormb_allocator_
 The message block allocator.
::OpenDDS::DCPS::DataBlockAllocatordb_allocator_
 The data block allocator.

Friends

class ::DDS_TEST

Detailed Description

template<typename MessageType>
class OpenDDS::DCPS::DataWriterImpl_T< MessageType >

Servant for DataWriter interface of the Traits::MessageType data type.

See the DDS specification, OMG formal/04-12-02, for a description of this interface.

Definition at line 20 of file DataWriterImpl_T.h.


Member Typedef Documentation

template<typename MessageType>
typedef ::OpenDDS::DCPS::Dynamic_Cached_Allocator_With_Overflow<ACE_Thread_Mutex> OpenDDS::DCPS::DataWriterImpl_T< MessageType >::DataAllocator

Definition at line 33 of file DataWriterImpl_T.h.

template<typename MessageType>
typedef DDSTraits<MessageType> OpenDDS::DCPS::DataWriterImpl_T< MessageType >::TraitsType

Definition at line 29 of file DataWriterImpl_T.h.


Member Enumeration Documentation

template<typename MessageType>
anonymous enum

Enumerator:
cdr_header_size 

Definition at line 35 of file DataWriterImpl_T.h.

00035          {
00036       cdr_header_size = 4
00037     };


Constructor & Destructor Documentation

template<typename MessageType>
OpenDDS::DCPS::DataWriterImpl_T< MessageType >::DataWriterImpl_T ( void   )  [inline]

Constructor.

Definition at line 40 of file DataWriterImpl_T.h.

00041       : marshaled_size_ (0)
00042       , key_marshaled_size_ (0)
00043       , data_allocator_ (0)
00044       , mb_allocator_ (0)
00045       , db_allocator_ (0)
00046     {
00047     }

template<typename MessageType>
virtual OpenDDS::DCPS::DataWriterImpl_T< MessageType >::~DataWriterImpl_T ( void   )  [inline, virtual]

Destructor.

Definition at line 50 of file DataWriterImpl_T.h.

00051     {
00052       delete data_allocator_;
00053       delete mb_allocator_;
00054       delete db_allocator_;
00055     }


Member Function Documentation

template<typename MessageType>
ACE_INLINE DataAllocator* OpenDDS::DCPS::DataWriterImpl_T< MessageType >::data_allocator (  )  const [inline]

Accessor to the marshalled data sample allocator.

Definition at line 404 of file DataWriterImpl_T.h.

00404                                           {
00405     return data_allocator_;
00406   };

template<typename MessageType>
ACE_Message_Block* OpenDDS::DCPS::DataWriterImpl_T< MessageType >::dds_marshal ( const MessageType &  instance_data,
OpenDDS::DCPS::MarshalingType  marshaling_type 
) [inline, private]

Serialize the instance data.

Parameters:
instance_data The data to serialize.
marshaling_type Enumerated type specifying whether to marshal just the keys or the entire message.
Returns:
returns the serialized data.

Definition at line 418 of file DataWriterImpl_T.h.

References OpenDDS::DCPS::Serializer::ALIGN_CDR, OpenDDS::DCPS::Serializer::ALIGN_NONE, OpenDDS::DCPS::gen_find_size(), OpenDDS::DCPS::KEY_ONLY_MARSHALING, OpenDDS::DCPS::Serializer::reset_alignment(), OpenDDS::DCPS::swap(), and OpenDDS::DCPS::Serializer::use_rti_serialization().

00421     {
00422       const bool cdr = this->cdr_encapsulation(), swap = this->swap_bytes();
00423 
00424       ACE_Message_Block* mb;
00425       if (marshaling_type == OpenDDS::DCPS::KEY_ONLY_MARSHALING) {
00426         // Don't use the cached allocator for the registered sample message
00427         // block.
00428 
00429         OpenDDS::DCPS::KeyOnly<const MessageType > ko_instance_data(instance_data);
00430         size_t effective_size = 0, padding = 0;
00431         if (key_marshaled_size_) {
00432           effective_size = key_marshaled_size_;
00433         } else {
00434           if (cdr && !Serializer::use_rti_serialization()) {
00435             effective_size = cdr_header_size; // CDR encapsulation
00436           }
00437           TraitsType::gen_find_size(ko_instance_data, effective_size, padding);
00438           if (cdr && Serializer::use_rti_serialization()) {
00439             effective_size += (cdr_header_size);
00440           }
00441         }
00442         if (cdr) {
00443           effective_size += padding;
00444         }
00445         ACE_NEW_RETURN(mb, ACE_Message_Block(effective_size,
00446                                              ACE_Message_Block::MB_DATA,
00447                                              0, //cont
00448                                              0, //data
00449                                              0, //alloc_strategy
00450                                              get_db_lock()), 0);
00451         OpenDDS::DCPS::Serializer serializer(mb, swap, cdr
00452                                              ? OpenDDS::DCPS::Serializer::ALIGN_CDR
00453                                              : OpenDDS::DCPS::Serializer::ALIGN_NONE);
00454         if (cdr) {
00455           serializer << ACE_OutputCDR::from_octet(0);
00456           serializer << ACE_OutputCDR::from_octet(swap ? !ACE_CDR_BYTE_ORDER : ACE_CDR_BYTE_ORDER);
00457           serializer << ACE_CDR::UShort(0);
00458         }
00459         // If this is RTI serialization, start counting byte offset AFTER
00460         // the header
00461         if (cdr && Serializer::use_rti_serialization()) {
00462           // Start counting byte-offset AFTER header
00463           serializer.reset_alignment();
00464         }
00465         serializer << ko_instance_data;
00466       } else { // OpenDDS::DCPS::FULL_MARSHALING
00467         size_t effective_size = 0, padding = 0;
00468         if (marshaled_size_) {
00469           effective_size = marshaled_size_;
00470         } else {
00471           if (cdr && !Serializer::use_rti_serialization()) {
00472             effective_size = cdr_header_size; // CDR encapsulation
00473           }
00474           TraitsType::gen_find_size(instance_data, effective_size, padding);
00475           if (cdr && Serializer::use_rti_serialization()) {
00476             effective_size += (cdr_header_size);
00477           }
00478         }
00479         if (cdr) {
00480           effective_size += padding;
00481         }
00482         ACE_NEW_MALLOC_RETURN(mb,
00483                               static_cast<ACE_Message_Block*>(
00484                                                               mb_allocator_->malloc(
00485                                                                                     sizeof(ACE_Message_Block))),
00486                               ACE_Message_Block(
00487                                                 effective_size,
00488                                                 ACE_Message_Block::MB_DATA,
00489                                                 0, //cont
00490                                                 0, //data
00491                                                 data_allocator_, //allocator_strategy
00492                                                 get_db_lock(), //data block locking_strategy
00493                                                 ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
00494                                                 ACE_Time_Value::zero,
00495                                                 ACE_Time_Value::max_time,
00496                                                 db_allocator_,
00497                                                 mb_allocator_),
00498                               0);
00499         OpenDDS::DCPS::Serializer serializer(mb, swap, cdr
00500                                              ? OpenDDS::DCPS::Serializer::ALIGN_CDR
00501                                              : OpenDDS::DCPS::Serializer::ALIGN_NONE);
00502         if (cdr) {
00503           serializer << ACE_OutputCDR::from_octet(0);
00504           serializer << ACE_OutputCDR::from_octet(swap ? !ACE_CDR_BYTE_ORDER : ACE_CDR_BYTE_ORDER);
00505           serializer << ACE_CDR::UShort(0);
00506         }
00507         // If this is RTI serialization, start counting byte offset AFTER
00508         // the header
00509         if (cdr && Serializer::use_rti_serialization()) {
00510           // Start counting byte-offset AFTER header
00511           serializer.reset_alignment();
00512         }
00513         serializer << instance_data;
00514       }
00515 
00516       return mb;
00517     }

template<typename MessageType>
virtual ::DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl_T< MessageType >::dispose ( const MessageType &  instance_data,
::DDS::InstanceHandle_t  instance_handle 
) [inline]

Definition at line 214 of file DataWriterImpl_T.h.

References dispose_w_timestamp(), and OpenDDS::DCPS::time_value_to_time().

00217     {
00218       ::DDS::Time_t const source_timestamp =
00219         ::OpenDDS::DCPS::time_value_to_time (ACE_OS::gettimeofday ());
00220       return dispose_w_timestamp (instance_data,
00221                                   instance_handle,
00222                                   source_timestamp);
00223     }

template<typename MessageType>
virtual ::DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl_T< MessageType >::dispose_w_timestamp ( const MessageType &  instance_data,
::DDS::InstanceHandle_t  instance_handle,
const ::DDS::Time_t source_timestamp 
) [inline]

Definition at line 225 of file DataWriterImpl_T.h.

References OpenDDS::DCPS::DataWriterImpl::dispose(), DDS::HANDLE_NIL, lookup_instance(), and DDS::RETCODE_ERROR.

00229     {
00230       if(instance_handle == ::DDS::HANDLE_NIL)
00231         {
00232           instance_handle = this->lookup_instance(instance_data);
00233           if (instance_handle == ::DDS::HANDLE_NIL)
00234             {
00235               ACE_ERROR_RETURN ((LM_ERROR,
00236                                  ACE_TEXT("(%P|%t) ")
00237                                  ACE_TEXT("%CDataWriterImpl::dispose, ")
00238                                  ACE_TEXT("The instance sample is not registered.\n"),
00239                                  TraitsType::type_name()),
00240                                 ::DDS::RETCODE_ERROR);
00241             }
00242         }
00243 
00244       return OpenDDS::DCPS::DataWriterImpl::dispose(instance_handle,
00245                                                     source_timestamp);
00246     }

template<typename MessageType>
virtual ::DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl_T< MessageType >::enable_specific (  )  [inline, virtual]

Do parts of enable specific to the datatype. Called by DataWriterImpl::enable().

Implements OpenDDS::DCPS::DataWriterImpl.

Definition at line 335 of file DataWriterImpl_T.h.

References OpenDDS::DCPS::DCPS_debug_level, and DDS::RETCODE_OK.

00336     {
00337       MessageType data;
00338       if (TraitsType::gen_is_bounded_size (data))
00339         {
00340           data_allocator_ = new DataAllocator (n_chunks_, marshaled_size_);
00341           if (::OpenDDS::DCPS::DCPS_debug_level >= 2)
00342             ACE_DEBUG((LM_DEBUG,
00343                        ACE_TEXT("(%P|%t) %CDataWriterImpl::")
00344                        ACE_TEXT("enable_specific-data")
00345                        ACE_TEXT(" Dynamic_Cached_Allocator_With_Overflow %x ")
00346                        ACE_TEXT("with %d chunks\n"),
00347                        TraitsType::type_name(),
00348                        data_allocator_,
00349                        n_chunks_));
00350         }
00351       else
00352         {
00353           if (::OpenDDS::DCPS::DCPS_debug_level >= 2)
00354             ACE_DEBUG((LM_DEBUG,
00355                        ACE_TEXT("(%P|%t) %CDataWriterImpl::enable_specific")
00356                        ACE_TEXT(" is unbounded data - allocate from heap\n"), TraitsType::type_name()));
00357         }
00358 
00359       mb_allocator_ =
00360         new ::OpenDDS::DCPS::MessageBlockAllocator (
00361                                                     n_chunks_ * association_chunk_multiplier_);
00362       db_allocator_ = new ::OpenDDS::DCPS::DataBlockAllocator (n_chunks_);
00363 
00364       if (::OpenDDS::DCPS::DCPS_debug_level >= 2)
00365         {
00366           ACE_DEBUG((LM_DEBUG,
00367                      ACE_TEXT("(%P|%t) %CDataWriterImpl::")
00368                      ACE_TEXT("enable_specific-mb ")
00369                      ACE_TEXT("Cached_Allocator_With_Overflow ")
00370                      ACE_TEXT("%x with %d chunks\n"),
00371                      TraitsType::type_name(),
00372                      mb_allocator_,
00373                      n_chunks_ * association_chunk_multiplier_));
00374           ACE_DEBUG((LM_DEBUG,
00375                      ACE_TEXT("(%P|%t) %CDataWriterImpl::")
00376                      ACE_TEXT("enable_specific-db ")
00377                      ACE_TEXT("Cached_Allocator_With_Overflow ")
00378                      ACE_TEXT("%x with %d chunks\n"),
00379                      TraitsType::type_name(),
00380                      db_allocator_,
00381                      n_chunks_));
00382         }
00383 
00384       return ::DDS::RETCODE_OK;
00385     }

template<typename MessageType>
virtual ::DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl_T< MessageType >::get_key_value ( MessageType &  key_holder,
::DDS::InstanceHandle_t  handle 
) [inline]

Definition at line 248 of file DataWriterImpl_T.h.

References DDS::RETCODE_ERROR, and DDS::RETCODE_OK.

00251     {
00252       ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
00253                         guard,
00254                         get_lock (),
00255                         ::DDS::RETCODE_ERROR);
00256 
00257       typename InstanceMap::iterator const the_end = instance_map_.end ();
00258       for (typename InstanceMap::iterator it = instance_map_.begin ();
00259            it != the_end;
00260            ++it)
00261         {
00262           if (it->second == handle)
00263             {
00264               key_holder = it->first;
00265               return ::DDS::RETCODE_OK;
00266             }
00267         }
00268 
00269       return ::DDS::RETCODE_ERROR;
00270     }

template<typename MessageType>
::DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl_T< MessageType >::get_or_create_instance_handle ( ::DDS::InstanceHandle_t handle,
const MessageType &  instance_data,
const ::DDS::Time_t source_timestamp 
) [inline, private]

Find the instance handle for the given instance_data using the data type's key(s). If the instance does not already exist create a new instance handle for it.

Definition at line 524 of file DataWriterImpl_T.h.

References DDS::HANDLE_NIL, OpenDDS::DCPS::KEY_ONLY_MARSHALING, DDS::RETCODE_ERROR, DDS::RETCODE_OK, and OpenDDS::DCPS::PublicationInstance::unregistered_.

00528     {
00529       ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00530                        guard,
00531                        get_lock(),
00532                        ::DDS::RETCODE_ERROR);
00533 
00534       handle = ::DDS::HANDLE_NIL;
00535       typename InstanceMap::const_iterator it = instance_map_.find(instance_data);
00536 
00537       bool needs_creation = true;
00538       bool needs_registration = true;
00539 
00540       if (it != instance_map_.end())
00541         {
00542           needs_creation = false;
00543 
00544           handle = it->second;
00545           OpenDDS::DCPS::PublicationInstance* instance = get_handle_instance(handle);
00546 
00547           if (instance->unregistered_ == false)
00548             {
00549               needs_registration = false;
00550             }
00551           // else: The instance is unregistered and now register again.
00552         }
00553 
00554       if (needs_registration)
00555         {
00556           // don't use fast allocator for registration.
00557           ACE_Message_Block* const marshalled =
00558             this->dds_marshal(instance_data,
00559                               OpenDDS::DCPS::KEY_ONLY_MARSHALING);
00560 
00561           // tell DataWriterLocal and Publisher about the instance.
00562           ::DDS::ReturnCode_t ret = register_instance_i(handle, marshalled, source_timestamp);
00563           // note: the WriteDataContainer/PublicationInstance maintains ownership
00564           // of the marshalled sample.
00565 
00566           if (ret != ::DDS::RETCODE_OK)
00567             {
00568               marshalled->release ();
00569               handle = ::DDS::HANDLE_NIL;
00570               return ret;
00571             }
00572 
00573           if (needs_creation)
00574             {
00575               std::pair<typename InstanceMap::iterator, bool> pair =
00576                 instance_map_.insert(typename InstanceMap::value_type(instance_data, handle));
00577 
00578               if (pair.second == false)
00579                 {
00580                   handle = ::DDS::HANDLE_NIL;
00581                   ACE_ERROR_RETURN ((LM_ERROR,
00582                                      ACE_TEXT("(%P|%t) ")
00583                                      ACE_TEXT("%CDataWriterImpl::")
00584                                      ACE_TEXT("get_or_create_instance_handle, ")
00585                                      ACE_TEXT("insert %s failed. \n"),
00586                                      TraitsType::type_name(), TraitsType::type_name()),
00587                                     ::DDS::RETCODE_ERROR);
00588                 }
00589             } // end of if (needs_creation)
00590 
00591           send_all_to_flush_control(guard);
00592 
00593         } // end of if (needs_registration)
00594 
00595       return ::DDS::RETCODE_OK;
00596     }

template<typename MessageType>
virtual void OpenDDS::DCPS::DataWriterImpl_T< MessageType >::init ( ::DDS::Topic_ptr  topic,
OpenDDS::DCPS::TopicImpl topic_servant,
const ::DDS::DataWriterQos qos,
::DDS::DataWriterListener_ptr  a_listener,
const ::DDS::StatusMask mask,
OpenDDS::DCPS::DomainParticipantImpl participant_servant,
OpenDDS::DCPS::PublisherImpl publisher_servant,
::DDS::DataWriter_ptr  dw_objref 
) [inline, virtual]

Initialize the DataWriter object. Called as part of create_datawriter.

Definition at line 297 of file DataWriterImpl_T.h.

References OpenDDS::DCPS::gen_max_marshaled_size(), and OpenDDS::DCPS::DataWriterImpl::init().

00305     {
00306       OpenDDS::DCPS::DataWriterImpl::init (topic,
00307                                            topic_servant,
00308                                            qos,
00309                                            a_listener,
00310                                            mask,
00311                                            participant_servant,
00312                                            publisher_servant,
00313                                            dw_objref);
00314 
00315       MessageType data;
00316       if (TraitsType::gen_is_bounded_size(data)) {
00317         marshaled_size_ = 8 + TraitsType::gen_max_marshaled_size(data, true);
00318         // worst case: CDR encapsulation (4 bytes) + Padding for alignment (4 bytes)
00319       } else {
00320         marshaled_size_ = 0; // should use gen_find_size when marshaling
00321       }
00322       OpenDDS::DCPS::KeyOnly<const MessageType > ko(data);
00323       if (TraitsType::gen_is_bounded_size(ko)) {
00324         key_marshaled_size_ = 8 + TraitsType::gen_max_marshaled_size(ko, true);
00325         // worst case: CDR Encapsulation (4 bytes) + Padding for alignment (4 bytes)
00326       } else {
00327         key_marshaled_size_ = 0; // should use gen_find_size when marshaling
00328       }
00329     }

template<typename MessageType>
virtual ::DDS::InstanceHandle_t OpenDDS::DCPS::DataWriterImpl_T< MessageType >::lookup_instance ( const MessageType &  instance_data  )  [inline]

Definition at line 272 of file DataWriterImpl_T.h.

References DDS::HANDLE_NIL, and DDS::RETCODE_ERROR.

00274     {
00275       ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
00276                         guard,
00277                         get_lock (),
00278                         ::DDS::RETCODE_ERROR);
00279 
00280       typename InstanceMap::const_iterator const it = instance_map_.find(instance_data);
00281 
00282       if (it == instance_map_.end())
00283         {
00284           return ::DDS::HANDLE_NIL;
00285         }
00286       else
00287         {
00288           return it->second;
00289         }
00290     }

template<typename MessageType>
typedef OpenDDS::DCPS::DataWriterImpl_T< MessageType >::OPENDDS_MAP_CMP ( MessageType  ,
::DDS::InstanceHandle_t  ,
typename TraitsType::LessThanType   
)

template<typename MessageType>
virtual ::DDS::InstanceHandle_t OpenDDS::DCPS::DataWriterImpl_T< MessageType >::register_instance ( const MessageType &  instance  )  [inline]

Definition at line 57 of file DataWriterImpl_T.h.

References register_instance_w_timestamp(), and OpenDDS::DCPS::time_value_to_time().

00059     {
00060       ::DDS::Time_t const timestamp =
00061         ::OpenDDS::DCPS::time_value_to_time (ACE_OS::gettimeofday ());
00062       return register_instance_w_timestamp (instance, timestamp);
00063     }

template<typename MessageType>
virtual ::DDS::InstanceHandle_t OpenDDS::DCPS::DataWriterImpl_T< MessageType >::register_instance_w_timestamp ( const MessageType &  instance,
const ::DDS::Time_t timestamp 
) [inline]

Definition at line 65 of file DataWriterImpl_T.h.

References DDS::HANDLE_NIL, and DDS::RETCODE_OK.

00068     {
00069       ::DDS::InstanceHandle_t registered_handle = ::DDS::HANDLE_NIL;
00070 
00071       ::DDS::ReturnCode_t const ret
00072           = this->get_or_create_instance_handle(registered_handle,
00073                                                 instance,
00074                                                 timestamp);
00075       if (ret != ::DDS::RETCODE_OK)
00076         {
00077           ACE_ERROR ((LM_ERROR,
00078                       ACE_TEXT("(%P|%t) ")
00079                       ACE_TEXT("%CDataWriterImpl::")
00080                       ACE_TEXT("register_instance_w_timestamp, ")
00081                       ACE_TEXT("register failed error=%d.\n"),
00082                       TraitsType::type_name(),
00083                       ret));
00084         }
00085 
00086       return registered_handle;
00087     }

template<typename MessageType>
virtual ::DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl_T< MessageType >::unregister_instance ( const MessageType &  instance,
::DDS::InstanceHandle_t  handle 
) [inline]

Definition at line 89 of file DataWriterImpl_T.h.

References OpenDDS::DCPS::time_value_to_time(), and unregister_instance_w_timestamp().

00092     {
00093       ::DDS::Time_t const timestamp =
00094         ::OpenDDS::DCPS::time_value_to_time (ACE_OS::gettimeofday ());
00095 
00096       return unregister_instance_w_timestamp (instance,
00097                                               handle,
00098                                               timestamp);
00099     }

template<typename MessageType>
virtual ::DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl_T< MessageType >::unregister_instance_w_timestamp ( const MessageType &  instance,
::DDS::InstanceHandle_t  handle,
const ::DDS::Time_t timestamp 
) [inline]

Definition at line 101 of file DataWriterImpl_T.h.

References DDS::HANDLE_NIL, lookup_instance(), DDS::RETCODE_ERROR, and OpenDDS::DCPS::DataWriterImpl::unregister_instance_i().

00105     {
00106       ::DDS::InstanceHandle_t const registered_handle =
00107         this->lookup_instance(instance);
00108 
00109       if (registered_handle == ::DDS::HANDLE_NIL)
00110         {
00111           // This case could be the instance is not registered yet or
00112           // already unregistered.
00113           ACE_ERROR_RETURN ((LM_ERROR,
00114                              ACE_TEXT("(%P|%t) ")
00115                              ACE_TEXT("%CDataWriterImpl::")
00116                              ACE_TEXT("unregister_instance_w_timestamp, ")
00117                              ACE_TEXT("The instance is not registered.\n"),
00118                              TraitsType::type_name()),
00119                             ::DDS::RETCODE_ERROR);
00120         }
00121       else if (handle != ::DDS::HANDLE_NIL && handle != registered_handle)
00122         {
00123           ACE_ERROR_RETURN ((LM_ERROR,
00124                              ACE_TEXT("(%P|%t) ")
00125                              ACE_TEXT("%CDataWriterImpl::")
00126                              ACE_TEXT("unregister_w_timestamp, ")
00127                              ACE_TEXT("The given handle=%X is different from ")
00128                              ACE_TEXT("registered handle=%X.\n"),
00129                              TraitsType::type_name(),
00130                              handle, registered_handle),
00131                             ::DDS::RETCODE_ERROR);
00132         }
00133 
00134       // DataWriterImpl::unregister_instance_i will call back to inform the
00135       // DataWriter.
00136       // That the instance handle is removed from there and hence
00137       // DataWriter can remove the instance here.
00138       return OpenDDS::DCPS::DataWriterImpl::unregister_instance_i(handle, timestamp);
00139     }

template<typename MessageType>
virtual void OpenDDS::DCPS::DataWriterImpl_T< MessageType >::unregistered ( ::DDS::InstanceHandle_t  instance_handle  )  [inline, virtual]

The framework has completed its part of unregistering the given instance.

Definition at line 391 of file DataWriterImpl_T.h.

00392   {
00393     ACE_UNUSED_ARG(instance_handle);
00394     // Previously this method removed the instance from the instance_map_.
00395     // The instance handle will not be removed from the
00396     // map so the instance for re-registration after unregistered
00397     // will use the old handle.
00398   }

template<typename MessageType>
virtual ::DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl_T< MessageType >::write ( const MessageType &  instance_data,
::DDS::InstanceHandle_t  handle 
) [inline]

Definition at line 144 of file DataWriterImpl_T.h.

References OpenDDS::DCPS::time_value_to_time(), and write_w_timestamp().

00147     {
00148       ::DDS::Time_t const source_timestamp =
00149         ::OpenDDS::DCPS::time_value_to_time (ACE_OS::gettimeofday ());
00150       return write_w_timestamp (instance_data,
00151                                 handle,
00152                                 source_timestamp);
00153     }

template<typename MessageType>
virtual ::DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl_T< MessageType >::write_w_timestamp ( const MessageType &  instance_data,
::DDS::InstanceHandle_t  handle,
const ::DDS::Time_t source_timestamp 
) [inline]

Definition at line 159 of file DataWriterImpl_T.h.

References OpenDDS::DCPS::FULL_MARSHALING, DDS::HANDLE_NIL, OpenDDS::DCPS::push_back(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, TheServiceParticipant, and OpenDDS::DCPS::DataWriterImpl::write().

00163     {
00164       //  This operation assumes the provided handle is valid. The handle
00165       //  provided will not be verified.
00166 
00167       if (handle == ::DDS::HANDLE_NIL) {
00168         ::DDS::InstanceHandle_t registered_handle = ::DDS::HANDLE_NIL;
00169         ::DDS::ReturnCode_t ret
00170             = this->get_or_create_instance_handle(registered_handle,
00171                                                   instance_data,
00172                                                   source_timestamp);
00173         if (ret != ::DDS::RETCODE_OK) {
00174           ACE_ERROR_RETURN((LM_ERROR,
00175                             ACE_TEXT("(%P|%t) ")
00176                             ACE_TEXT("%CDataWriterImpl::write, ")
00177                             ACE_TEXT("register failed err=%d.\n"),
00178                             TraitsType::type_name(),
00179                             ret),
00180                            ret);
00181         }
00182 
00183         handle = registered_handle;
00184       }
00185 
00186       // list of reader RepoIds that should not get data
00187       OpenDDS::DCPS::GUIDSeq_var filter_out;
00188 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00189       if (TheServiceParticipant->publisher_content_filter()) {
00190         ACE_GUARD_RETURN(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_, ::DDS::RETCODE_ERROR);
00191         for (RepoIdToReaderInfoMap::iterator iter = reader_info_.begin(),
00192                end = reader_info_.end(); iter != end; ++iter) {
00193           const ReaderInfo& ri = iter->second;
00194           if (!ri.eval_.is_nil()) {
00195             if (!filter_out.ptr()) {
00196               filter_out = new OpenDDS::DCPS::GUIDSeq;
00197             }
00198             if (!ri.eval_->eval(instance_data, ri.expression_params_)) {
00199               push_back(filter_out.inout(), iter->first);
00200             }
00201           }
00202         }
00203       }
00204 #endif
00205 
00206       ACE_Message_Block* const marshalled =
00207         dds_marshal (instance_data, OpenDDS::DCPS::FULL_MARSHALING);
00208 
00209       return OpenDDS::DCPS::DataWriterImpl::write(marshalled, handle,
00210                                                   source_timestamp,
00211                                                   filter_out._retn());
00212     }


Friends And Related Function Documentation

template<typename MessageType>
friend class ::DDS_TEST [friend]

Reimplemented from OpenDDS::DCPS::DataWriterImpl.

Definition at line 607 of file DataWriterImpl_T.h.


Member Data Documentation

template<typename MessageType>
DataAllocator* OpenDDS::DCPS::DataWriterImpl_T< MessageType >::data_allocator_ [private]

Definition at line 601 of file DataWriterImpl_T.h.

template<typename MessageType>
::OpenDDS::DCPS::DataBlockAllocator* OpenDDS::DCPS::DataWriterImpl_T< MessageType >::db_allocator_ [private]

The data block allocator.

Reimplemented from OpenDDS::DCPS::DataWriterImpl.

Definition at line 603 of file DataWriterImpl_T.h.

template<typename MessageType>
InstanceMap OpenDDS::DCPS::DataWriterImpl_T< MessageType >::instance_map_ [private]

Definition at line 598 of file DataWriterImpl_T.h.

template<typename MessageType>
size_t OpenDDS::DCPS::DataWriterImpl_T< MessageType >::key_marshaled_size_ [private]

Definition at line 600 of file DataWriterImpl_T.h.

template<typename MessageType>
size_t OpenDDS::DCPS::DataWriterImpl_T< MessageType >::marshaled_size_ [private]

Definition at line 599 of file DataWriterImpl_T.h.

template<typename MessageType>
::OpenDDS::DCPS::MessageBlockAllocator* OpenDDS::DCPS::DataWriterImpl_T< MessageType >::mb_allocator_ [private]

The message block allocator.

Todo:
The publication_lost_status_ and publication_reconnecting_status_ are left here for future use when we add get_publication_lost_status() and get_publication_reconnecting_status() methods.

Reimplemented from OpenDDS::DCPS::DataWriterImpl.

Definition at line 602 of file DataWriterImpl_T.h.


The documentation for this class was generated from the following file:
Generated on Fri Feb 12 20:06:14 2016 for OpenDDS by  doxygen 1.4.7