DataWriterImpl_T.h

Go to the documentation of this file.
00001 #ifndef dds_DCPS_DataWriterImpl_T_h
00002 #define dds_DCPS_DataWriterImpl_T_h
00003 
00004 #include "dds/DCPS/PublicationInstance.h"
00005 #include "dds/DCPS/DataWriterImpl.h"
00006 #include "dds/DCPS/DataReaderImpl.h"
00007 #include "dds/DCPS/Util.h"
00008 #include "dds/DCPS/TypeSupportImpl.h"
00009 #include "dcps_export.h"
00010 
00011 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00012 
00013 namespace OpenDDS {
00014   namespace DCPS {
00015 
00016 /** Servant for DataWriter interface of the Traits::MessageType data type.
00017  *
00018  * See the DDS specification, OMG formal/04-12-02, for a description of
00019  * this interface.
00020  */
00021   template <typename MessageType>
00022   class
00023 #if ( __GNUC__ == 4 && __GNUC_MINOR__ == 1)
00024     OpenDDS_Dcps_Export
00025 #endif
00026     DataWriterImpl_T
00027     : public virtual OpenDDS::DCPS::LocalObject<typename DDSTraits<MessageType>::DataWriterType>,
00028       public virtual OpenDDS::DCPS::DataWriterImpl
00029   {
00030   public:
00031     typedef DDSTraits<MessageType> TraitsType;
00032     typedef MarshalTraits<MessageType> MarshalTraitsType;
00033 
00034     typedef OPENDDS_MAP_CMP_T(MessageType, DDS::InstanceHandle_t,
00035                               typename TraitsType::LessThanType) InstanceMap;
00036     typedef ::OpenDDS::DCPS::Dynamic_Cached_Allocator_With_Overflow<ACE_Thread_Mutex>  DataAllocator;
00037 
00038     enum {
00039       cdr_header_size = 4
00040     };
00041 
00042     DataWriterImpl_T (void)
00043       : marshaled_size_ (0)
00044       , key_marshaled_size_ (0)
00045     {
00046       MessageType data;
00047       if (MarshalTraitsType::gen_is_bounded_size()) {
00048         marshaled_size_ = 8 + TraitsType::gen_max_marshaled_size(data, true);
00049         // worst case: CDR encapsulation (4 bytes) + Padding for alignment (4 bytes)
00050       } else {
00051         marshaled_size_ = 0; // should use gen_find_size when marshaling
00052       }
00053       if (MarshalTraitsType::gen_is_bounded_key_size()) {
00054         OpenDDS::DCPS::KeyOnly<const MessageType > ko(data);
00055         key_marshaled_size_ = 8 + TraitsType::gen_max_marshaled_size(ko, true);
00056         // worst case: CDR Encapsulation (4 bytes) + Padding for alignment (4 bytes)
00057       } else {
00058         key_marshaled_size_ = 0; // should use gen_find_size when marshaling
00059       }
00060     }
00061 
00062     virtual ~DataWriterImpl_T (void)
00063     {
00064     }
00065 
00066   virtual DDS::InstanceHandle_t register_instance (
00067       const MessageType & instance)
00068     {
00069       DDS::Time_t const timestamp =
00070         ::OpenDDS::DCPS::time_value_to_time (ACE_OS::gettimeofday ());
00071       return register_instance_w_timestamp (instance, timestamp);
00072     }
00073 
00074   virtual DDS::InstanceHandle_t register_instance_w_timestamp (
00075       const MessageType & instance,
00076       const DDS::Time_t & timestamp)
00077     {
00078       DDS::InstanceHandle_t registered_handle = DDS::HANDLE_NIL;
00079 
00080       DDS::ReturnCode_t const ret
00081           = this->get_or_create_instance_handle(registered_handle,
00082                                                 instance,
00083                                                 timestamp);
00084       if (ret != DDS::RETCODE_OK)
00085         {
00086           ACE_ERROR ((LM_ERROR,
00087                       ACE_TEXT("(%P|%t) ")
00088                       ACE_TEXT("%CDataWriterImpl::")
00089                       ACE_TEXT("register_instance_w_timestamp, ")
00090                       ACE_TEXT("register failed error=%d.\n"),
00091                       TraitsType::type_name(),
00092                       ret));
00093         }
00094 
00095       return registered_handle;
00096     }
00097 
00098   virtual DDS::ReturnCode_t unregister_instance (
00099       const MessageType & instance,
00100       DDS::InstanceHandle_t handle)
00101     {
00102       DDS::Time_t const timestamp =
00103         ::OpenDDS::DCPS::time_value_to_time (ACE_OS::gettimeofday ());
00104 
00105       return unregister_instance_w_timestamp (instance,
00106                                               handle,
00107                                               timestamp);
00108     }
00109 
00110   virtual DDS::ReturnCode_t unregister_instance_w_timestamp (
00111       const MessageType & instance,
00112       DDS::InstanceHandle_t handle,
00113       const DDS::Time_t & timestamp)
00114     {
00115       DDS::InstanceHandle_t const registered_handle =
00116         this->lookup_instance(instance);
00117 
00118       if (registered_handle == DDS::HANDLE_NIL)
00119         {
00120           // This case could be the instance is not registered yet or
00121           // already unregistered.
00122           ACE_ERROR_RETURN ((LM_ERROR,
00123                              ACE_TEXT("(%P|%t) ")
00124                              ACE_TEXT("%CDataWriterImpl::")
00125                              ACE_TEXT("unregister_instance_w_timestamp, ")
00126                              ACE_TEXT("The instance is not registered.\n"),
00127                              TraitsType::type_name()),
00128                             DDS::RETCODE_ERROR);
00129         }
00130       else if (handle != DDS::HANDLE_NIL && handle != registered_handle)
00131         {
00132           ACE_ERROR_RETURN ((LM_ERROR,
00133                              ACE_TEXT("(%P|%t) ")
00134                              ACE_TEXT("%CDataWriterImpl::")
00135                              ACE_TEXT("unregister_w_timestamp, ")
00136                              ACE_TEXT("The given handle=%X is different from ")
00137                              ACE_TEXT("registered handle=%X.\n"),
00138                              TraitsType::type_name(),
00139                              handle, registered_handle),
00140                             DDS::RETCODE_ERROR);
00141         }
00142 
00143       // DataWriterImpl::unregister_instance_i will call back to inform the
00144       // DataWriter.
00145       // That the instance handle is removed from there and hence
00146       // DataWriter can remove the instance here.
00147       return OpenDDS::DCPS::DataWriterImpl::unregister_instance_i(handle, timestamp);
00148     }
00149 
00150   //WARNING: If the handle is non-nil and the instance is not registered
00151   //         then this operation may cause an access violation.
00152   //         This lack of safety helps performance.
00153   virtual DDS::ReturnCode_t write (
00154       const MessageType & instance_data,
00155       DDS::InstanceHandle_t handle)
00156     {
00157       DDS::Time_t const source_timestamp =
00158         ::OpenDDS::DCPS::time_value_to_time (ACE_OS::gettimeofday ());
00159       return write_w_timestamp (instance_data,
00160                                 handle,
00161                                 source_timestamp);
00162     }
00163 
00164 
00165   //WARNING: If the handle is non-nil and the instance is not registered
00166   //         then this operation may cause an access violation.
00167   //         This lack of safety helps performance.
00168   virtual DDS::ReturnCode_t write_w_timestamp (
00169       const MessageType & instance_data,
00170       DDS::InstanceHandle_t handle,
00171       const DDS::Time_t & source_timestamp)
00172     {
00173       //  This operation assumes the provided handle is valid. The handle
00174       //  provided will not be verified.
00175 
00176       if (handle == DDS::HANDLE_NIL) {
00177         DDS::InstanceHandle_t registered_handle = DDS::HANDLE_NIL;
00178         DDS::ReturnCode_t ret
00179             = this->get_or_create_instance_handle(registered_handle,
00180                                                   instance_data,
00181                                                   source_timestamp);
00182         if (ret != DDS::RETCODE_OK) {
00183           ACE_ERROR_RETURN((LM_ERROR,
00184                             ACE_TEXT("(%P|%t) ")
00185                             ACE_TEXT("%CDataWriterImpl::write_w_timestamp, ")
00186                             ACE_TEXT("register failed err=%d.\n"),
00187                             TraitsType::type_name(),
00188                             ret),
00189                            ret);
00190         }
00191 
00192         handle = registered_handle;
00193       }
00194 
00195       // list of reader RepoIds that should not get data
00196       OpenDDS::DCPS::GUIDSeq_var filter_out;
00197 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00198       if (TheServiceParticipant->publisher_content_filter()) {
00199         ACE_GUARD_RETURN(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_, DDS::RETCODE_ERROR);
00200         for (RepoIdToReaderInfoMap::iterator iter = reader_info_.begin(),
00201                end = reader_info_.end(); iter != end; ++iter) {
00202           const ReaderInfo& ri = iter->second;
00203           if (!ri.eval_.is_nil()) {
00204             if (!filter_out.ptr()) {
00205               filter_out = new OpenDDS::DCPS::GUIDSeq;
00206             }
00207             if (!ri.eval_->eval(instance_data, ri.expression_params_)) {
00208               push_back(filter_out.inout(), iter->first);
00209             }
00210           }
00211         }
00212       }
00213 #endif
00214 
00215       Message_Block_Ptr marshalled(
00216         dds_marshal (instance_data, OpenDDS::DCPS::FULL_MARSHALING));
00217 
00218       return OpenDDS::DCPS::DataWriterImpl::write(move(marshalled), handle,
00219                                                   source_timestamp,
00220                                                   filter_out._retn());
00221     }
00222 
00223   virtual DDS::ReturnCode_t dispose (
00224       const MessageType & instance_data,
00225       DDS::InstanceHandle_t instance_handle)
00226     {
00227       DDS::Time_t const source_timestamp =
00228         ::OpenDDS::DCPS::time_value_to_time (ACE_OS::gettimeofday ());
00229       return dispose_w_timestamp (instance_data,
00230                                   instance_handle,
00231                                   source_timestamp);
00232     }
00233 
00234   virtual DDS::ReturnCode_t dispose_w_timestamp (
00235       const MessageType & instance_data,
00236       DDS::InstanceHandle_t instance_handle,
00237       const DDS::Time_t & source_timestamp)
00238     {
00239       if(instance_handle == DDS::HANDLE_NIL)
00240         {
00241           instance_handle = this->lookup_instance(instance_data);
00242           if (instance_handle == DDS::HANDLE_NIL)
00243             {
00244               ACE_ERROR_RETURN ((LM_ERROR,
00245                                  ACE_TEXT("(%P|%t) ")
00246                                  ACE_TEXT("%CDataWriterImpl::dispose_w_timestamp, ")
00247                                  ACE_TEXT("The instance sample is not registered.\n"),
00248                                  TraitsType::type_name()),
00249                                 DDS::RETCODE_ERROR);
00250             }
00251         }
00252 
00253       return OpenDDS::DCPS::DataWriterImpl::dispose(instance_handle,
00254                                                     source_timestamp);
00255     }
00256 
00257   virtual DDS::ReturnCode_t get_key_value (
00258       MessageType & key_holder,
00259       DDS::InstanceHandle_t handle)
00260     {
00261       ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
00262                         guard,
00263                         get_lock (),
00264                         DDS::RETCODE_ERROR);
00265 
00266       typename InstanceMap::iterator const the_end = instance_map_.end ();
00267       for (typename InstanceMap::iterator it = instance_map_.begin ();
00268            it != the_end;
00269            ++it)
00270         {
00271           if (it->second == handle)
00272             {
00273               key_holder = it->first;
00274               return DDS::RETCODE_OK;
00275             }
00276         }
00277 
00278       return DDS::RETCODE_BAD_PARAMETER;
00279     }
00280 
00281   virtual DDS::InstanceHandle_t lookup_instance (
00282       const MessageType & instance_data)
00283     {
00284       ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
00285                         guard,
00286                         get_lock (),
00287                         DDS::RETCODE_ERROR);
00288 
00289       typename InstanceMap::const_iterator const it = instance_map_.find(instance_data);
00290 
00291       if (it == instance_map_.end())
00292         {
00293           return DDS::HANDLE_NIL;
00294         }
00295       else
00296         {
00297           return it->second;
00298         }
00299     }
00300 
00301 
00302   /**
00303    * Do parts of enable specific to the datatype.
00304    * Called by DataWriterImpl::enable().
00305    */
00306     virtual DDS::ReturnCode_t enable_specific ()
00307     {
00308       if (MarshalTraitsType::gen_is_bounded_size ())
00309         {
00310           data_allocator_.reset(new DataAllocator (n_chunks_, marshaled_size_));
00311           if (::OpenDDS::DCPS::DCPS_debug_level >= 2)
00312             ACE_DEBUG((LM_DEBUG,
00313                        ACE_TEXT("(%P|%t) %CDataWriterImpl::")
00314                        ACE_TEXT("enable_specific-data")
00315                        ACE_TEXT(" Dynamic_Cached_Allocator_With_Overflow %x ")
00316                        ACE_TEXT("with %d chunks\n"),
00317                        TraitsType::type_name(),
00318                        data_allocator_.get(),
00319                        n_chunks_));
00320         }
00321       else
00322         {
00323           if (::OpenDDS::DCPS::DCPS_debug_level >= 2)
00324             ACE_DEBUG((LM_DEBUG,
00325                        ACE_TEXT("(%P|%t) %CDataWriterImpl::enable_specific")
00326                        ACE_TEXT(" is unbounded data - allocate from heap\n"), TraitsType::type_name()));
00327         }
00328 
00329       mb_allocator_.reset(
00330         new ::OpenDDS::DCPS::MessageBlockAllocator (
00331                                                     n_chunks_ * association_chunk_multiplier_));
00332       db_allocator_.reset(new ::OpenDDS::DCPS::DataBlockAllocator (n_chunks_));
00333 
00334       if (::OpenDDS::DCPS::DCPS_debug_level >= 2)
00335         {
00336           ACE_DEBUG((LM_DEBUG,
00337                      ACE_TEXT("(%P|%t) %CDataWriterImpl::")
00338                      ACE_TEXT("enable_specific-mb ")
00339                      ACE_TEXT("Cached_Allocator_With_Overflow ")
00340                      ACE_TEXT("%x with %d chunks\n"),
00341                      TraitsType::type_name(),
00342                      mb_allocator_.get(),
00343                      n_chunks_ * association_chunk_multiplier_));
00344           ACE_DEBUG((LM_DEBUG,
00345                      ACE_TEXT("(%P|%t) %CDataWriterImpl::")
00346                      ACE_TEXT("enable_specific-db ")
00347                      ACE_TEXT("Cached_Allocator_With_Overflow ")
00348                      ACE_TEXT("%x with %d chunks\n"),
00349                      TraitsType::type_name(),
00350                      db_allocator_.get(),
00351                      n_chunks_));
00352         }
00353 
00354       return DDS::RETCODE_OK;
00355     }
00356 
00357   /**
00358    * Accessor to the marshalled data sample allocator.
00359    */
00360   ACE_INLINE
00361   DataAllocator* data_allocator () const  {
00362     return data_allocator_.get();
00363   };
00364 
00365 private:
00366 
00367   /**
00368    * Serialize the instance data.
00369    *
00370    * @param instance_data The data to serialize.
00371    * @param marshaling_type Enumerated type specifying whether to marshal
00372    *        just the keys or the entire message.
00373    * @return returns the serialized data.
00374    */
00375     ACE_Message_Block* dds_marshal(
00376                                    const MessageType& instance_data,
00377                                    OpenDDS::DCPS::MarshalingType marshaling_type)
00378     {
00379       const bool cdr = this->cdr_encapsulation(), swap = this->swap_bytes();
00380 
00381       Message_Block_Ptr mb;
00382       ACE_Message_Block* tmp_mb;
00383 
00384       if (marshaling_type == OpenDDS::DCPS::KEY_ONLY_MARSHALING) {
00385         // Don't use the cached allocator for the registered sample message
00386         // block.
00387 
00388         OpenDDS::DCPS::KeyOnly<const MessageType > ko_instance_data(instance_data);
00389         size_t effective_size = 0, padding = 0;
00390         if (key_marshaled_size_) {
00391           effective_size = key_marshaled_size_;
00392         } else {
00393           if (cdr && !Serializer::use_rti_serialization()) {
00394             effective_size = cdr_header_size; // CDR encapsulation
00395           }
00396           TraitsType::gen_find_size(ko_instance_data, effective_size, padding);
00397           if (cdr && Serializer::use_rti_serialization()) {
00398             effective_size += (cdr_header_size);
00399           }
00400         }
00401         if (cdr) {
00402           effective_size += padding;
00403         }
00404 
00405         ACE_NEW_RETURN(tmp_mb, ACE_Message_Block(effective_size,
00406                                              ACE_Message_Block::MB_DATA,
00407                                              0, //cont
00408                                              0, //data
00409                                              0, //alloc_strategy
00410                                              get_db_lock()), 0);
00411         mb.reset(tmp_mb);
00412         OpenDDS::DCPS::Serializer serializer(mb.get(), swap, cdr
00413                                              ? OpenDDS::DCPS::Serializer::ALIGN_CDR
00414                                              : OpenDDS::DCPS::Serializer::ALIGN_NONE);
00415         if (cdr) {
00416           serializer << ACE_OutputCDR::from_octet(0);
00417           serializer << ACE_OutputCDR::from_octet(swap ? !ACE_CDR_BYTE_ORDER : ACE_CDR_BYTE_ORDER);
00418           serializer << ACE_CDR::UShort(0);
00419         }
00420         // If this is RTI serialization, start counting byte offset AFTER
00421         // the header
00422         if (cdr && Serializer::use_rti_serialization()) {
00423           // Start counting byte-offset AFTER header
00424           serializer.reset_alignment();
00425         }
00426         serializer << ko_instance_data;
00427       } else { // OpenDDS::DCPS::FULL_MARSHALING
00428         size_t effective_size = 0, padding = 0;
00429         if (marshaled_size_) {
00430           effective_size = marshaled_size_;
00431         } else {
00432           if (cdr && !Serializer::use_rti_serialization()) {
00433             effective_size = cdr_header_size; // CDR encapsulation
00434           }
00435           TraitsType::gen_find_size(instance_data, effective_size, padding);
00436           if (cdr && Serializer::use_rti_serialization()) {
00437             effective_size += (cdr_header_size);
00438           }
00439         }
00440         if (cdr) {
00441           effective_size += padding;
00442         }
00443 
00444 
00445         ACE_NEW_MALLOC_RETURN(tmp_mb,
00446                               static_cast<ACE_Message_Block*>(
00447                                                               mb_allocator_->malloc(
00448                                                                                     sizeof(ACE_Message_Block))),
00449                               ACE_Message_Block(
00450                                                 effective_size,
00451                                                 ACE_Message_Block::MB_DATA,
00452                                                 0, //cont
00453                                                 0, //data
00454                                                 data_allocator_.get(), //allocator_strategy
00455                                                 get_db_lock(), //data block locking_strategy
00456                                                 ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
00457                                                 ACE_Time_Value::zero,
00458                                                 ACE_Time_Value::max_time,
00459                                                 db_allocator_.get(),
00460                                                 mb_allocator_.get()),
00461                               0);
00462         mb.reset(tmp_mb);
00463         OpenDDS::DCPS::Serializer serializer(mb.get(), swap, cdr
00464                                              ? OpenDDS::DCPS::Serializer::ALIGN_CDR
00465                                              : OpenDDS::DCPS::Serializer::ALIGN_NONE);
00466         if (cdr) {
00467           serializer << ACE_OutputCDR::from_octet(0);
00468           serializer << ACE_OutputCDR::from_octet(swap ? !ACE_CDR_BYTE_ORDER : ACE_CDR_BYTE_ORDER);
00469           serializer << ACE_CDR::UShort(0);
00470         }
00471         // If this is RTI serialization, start counting byte offset AFTER
00472         // the header
00473         if (cdr && Serializer::use_rti_serialization()) {
00474           // Start counting byte-offset AFTER header
00475           serializer.reset_alignment();
00476         }
00477 
00478         if (! (serializer << instance_data)) {
00479           ACE_ERROR_RETURN((LM_ERROR,
00480             ACE_TEXT("(%P|%t) OpenDDS::DCPS::DataWriterImpl::dds_marshal(), ")
00481             ACE_TEXT("instance_data serialization error.\n")),
00482             0);
00483         }
00484       }
00485 
00486       return mb.release();
00487     }
00488 
00489   /**
00490    * Find the instance handle for the given instance_data using
00491    * the data type's key(s).  If the instance does not already exist
00492    * create a new instance handle for it.
00493    */
00494   DDS::ReturnCode_t get_or_create_instance_handle(
00495     DDS::InstanceHandle_t& handle,
00496     const MessageType& instance_data,
00497     const DDS::Time_t & source_timestamp)
00498     {
00499       ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00500                        guard,
00501                        get_lock(),
00502                        DDS::RETCODE_ERROR);
00503 
00504       handle = DDS::HANDLE_NIL;
00505       typename InstanceMap::const_iterator it = instance_map_.find(instance_data);
00506 
00507       bool needs_creation = true;
00508       bool needs_registration = true;
00509 
00510       if (it != instance_map_.end())
00511         {
00512           needs_creation = false;
00513 
00514           handle = it->second;
00515           OpenDDS::DCPS::PublicationInstance_rch instance = get_handle_instance(handle);
00516 
00517           if (instance->unregistered_ == false)
00518             {
00519               needs_registration = false;
00520             }
00521           // else: The instance is unregistered and now register again.
00522         }
00523 
00524       if (needs_registration)
00525         {
00526           // don't use fast allocator for registration.
00527           Message_Block_Ptr marshalled(
00528             this->dds_marshal(instance_data,
00529                               OpenDDS::DCPS::KEY_ONLY_MARSHALING));
00530 
00531           // tell DataWriterLocal and Publisher about the instance.
00532           DDS::ReturnCode_t ret = register_instance_i(handle, move(marshalled), source_timestamp);
00533           // note: the WriteDataContainer/PublicationInstance maintains ownership
00534           // of the marshalled sample.
00535 
00536           if (ret != DDS::RETCODE_OK)
00537             {
00538               handle = DDS::HANDLE_NIL;
00539               return ret;
00540             }
00541 
00542           if (needs_creation)
00543             {
00544               std::pair<typename InstanceMap::iterator, bool> pair =
00545                 instance_map_.insert(typename InstanceMap::value_type(instance_data, handle));
00546 
00547               if (pair.second == false)
00548                 {
00549                   handle = DDS::HANDLE_NIL;
00550                   ACE_ERROR_RETURN ((LM_ERROR,
00551                                      ACE_TEXT("(%P|%t) ")
00552                                      ACE_TEXT("%CDataWriterImpl::")
00553                                      ACE_TEXT("get_or_create_instance_handle, ")
00554                                      ACE_TEXT("insert %C failed. \n"),
00555                                      TraitsType::type_name(), TraitsType::type_name()),
00556                                     DDS::RETCODE_ERROR);
00557                 }
00558             } // end of if (needs_creation)
00559 
00560           send_all_to_flush_control(guard);
00561 
00562         } // end of if (needs_registration)
00563 
00564       return DDS::RETCODE_OK;
00565     }
00566 
00567     InstanceMap  instance_map_;
00568     size_t       marshaled_size_;
00569     size_t       key_marshaled_size_;
00570     unique_ptr<DataAllocator> data_allocator_;
00571     unique_ptr<MessageBlockAllocator> mb_allocator_;
00572     unique_ptr<DataBlockAllocator>    db_allocator_;
00573 
00574     // A class, normally provided by an unit test, that needs access to
00575     // private methods/members.
00576     friend class ::DDS_TEST;
00577   };
00578 
00579   }
00580 }
00581 
00582 OPENDDS_END_VERSIONED_NAMESPACE_DECL
00583 
00584 #endif /* dds_DCPS_DataWriterImpl_T_h */
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1