Update::PersistenceUpdater Class Reference

#include <PersistenceUpdater.h>

Inheritance diagram for Update::PersistenceUpdater:
Inheritance graph
[legend]
Collaboration diagram for Update::PersistenceUpdater:
Collaboration graph
[legend]

List of all members.

Classes

class  IdType_ExtId

Public Types

typedef ACE_Allocator_Adapter
< ACE_Malloc
< ACE_MMAP_MEMORY_POOL,
TAO_SYNCH_MUTEX > > 
ALLOCATOR
typedef struct TopicStrt
< QosSeq, ACE_CString
Topic
 Persisted topic data structure.
typedef struct ParticipantStrt
< QosSeq
Participant
 Persisted participant data structure.
typedef struct ActorStrt
< QosSeq, QosSeq, ACE_CString,
BinSeq, ContentSubscriptionBin
RWActor
 Persisted actor data structure.
typedef
ACE_Hash_Map_With_Allocator
< IdType_ExtId, Topic * > 
TopicIndex
typedef
ACE_Hash_Map_With_Allocator
< IdType_ExtId, Participant * > 
ParticipantIndex
typedef
ACE_Hash_Map_With_Allocator
< IdType_ExtId, RWActor * > 
ActorIndex

Public Member Functions

 PersistenceUpdater ()
virtual ~PersistenceUpdater ()
virtual int init (int argc, ACE_TCHAR *argv[])
 Service object initialization.
virtual int fini ()
 ACE_Task_Base finish method.
virtual int svc ()
 ACE_Task_Base start method.
virtual void requestImage ()
virtual void create (const UTopic &topic)
 Add topic to be persisted.
virtual void create (const UParticipant &participant)
 Add participant to be persisted.
virtual void create (const URActor &actor)
 Add DataReader to be persisted.
virtual void create (const UWActor &actor)
 Add DataWriter to be persisted.
virtual void create (const OwnershipData &data)
 Add ownership data to be persisted.
virtual void update (const IdPath &id, const DDS::DomainParticipantQos &qos)
 Persist updated Qos parameters for a Participant.
virtual void update (const IdPath &id, const DDS::TopicQos &qos)
 Persist updated Qos parameters for a Topic.
virtual void update (const IdPath &id, const DDS::DataWriterQos &qos)
 Persist updated Qos parameters for a DataWriter.
virtual void update (const IdPath &id, const DDS::PublisherQos &qos)
 Persist updated Qos parameters for a Publisher.
virtual void update (const IdPath &id, const DDS::DataReaderQos &qos)
 Persist updated Qos parameters for a DataReader.
virtual void update (const IdPath &id, const DDS::SubscriberQos &qos)
 Persist updated Qos parameters for a Subscriber.
virtual void update (const IdPath &id, const DDS::StringSeq &exprParams)
 Persist updated subscription exprParams.
virtual void destroy (const IdPath &id, ItemType type, ActorType actor)
 Remove an entity (but not children) from persistence.
virtual void updateLastPartId (PartIdType partId)
 Update Last Participant Id for repo.

Private Member Functions

int parse (int argc, ACE_TCHAR *argv[])
void storeUpdate (const ACE_Message_Block &data, BinSeq &storage)

Private Attributes

ACE_TString persistence_file_
bool reset_
Managerum_
OpenDDS::DCPS::unique_ptr
< ALLOCATOR
allocator_
TopicIndextopic_index_
 Persisted Topics.
ParticipantIndexparticipant_index_
 Persisted Participants.
ActorIndexactor_index_
 Persisted Readers and Writers.
PartIdTypelast_part_id_
 What the last participant id is/was.

Detailed Description

Definition at line 33 of file PersistenceUpdater.h.


Member Typedef Documentation

Definition at line 69 of file PersistenceUpdater.h.

typedef ACE_Allocator_Adapter<ACE_Malloc <ACE_MMAP_MEMORY_POOL , TAO_SYNCH_MUTEX> > Update::PersistenceUpdater::ALLOCATOR

Definition at line 55 of file PersistenceUpdater.h.

Persisted participant data structure.

Definition at line 61 of file PersistenceUpdater.h.

Definition at line 68 of file PersistenceUpdater.h.

Persisted actor data structure.

Definition at line 64 of file PersistenceUpdater.h.

Persisted topic data structure.

Definition at line 58 of file PersistenceUpdater.h.

Definition at line 67 of file PersistenceUpdater.h.


Constructor & Destructor Documentation

Update::PersistenceUpdater::PersistenceUpdater (  ) 

Definition at line 214 of file PersistenceUpdater.cpp.

References ACE_TEXT().

00215   : persistence_file_(ACE_TEXT("InforepoPersist"))
00216   , reset_(false)
00217   , um_(0)
00218   , topic_index_(0)
00219   , participant_index_(0)
00220   , actor_index_(0)
00221   , last_part_id_(0)
00222 {}

Here is the call graph for this function:

Update::PersistenceUpdater::~PersistenceUpdater (  )  [virtual]

Definition at line 224 of file PersistenceUpdater.cpp.

00225 {}


Member Function Documentation

void Update::PersistenceUpdater::create ( const OwnershipData data  )  [virtual]

Add ownership data to be persisted.

Implements Update::Updater.

Definition at line 828 of file PersistenceUpdater.cpp.

00829 {
00830   /* This method intentionally left unimplemented. */
00831 }

virtual void Update::PersistenceUpdater::create ( const UWActor actor  )  [virtual]

Add DataWriter to be persisted.

Implements Update::Updater.

void Update::PersistenceUpdater::create ( const URActor actor  )  [virtual]

Add DataReader to be persisted.

Implements Update::Updater.

Definition at line 640 of file PersistenceUpdater.cpp.

References actor_index_, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::actorId, allocator_, ACE_Message_Block::base(), ACE_OutputCDR::begin(), ACE_Hash_Map_With_Allocator< class, class >::bind(), Update::ActorStrt< PSQ, RWQ, C, T, CSP >::callback, ACE_CDR::consolidate(), Update::ActorStrt< PSQ, RWQ, C, T, CSP >::contentSubscriptionProfile, Update::DataReader, Update::DataReaderQos, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::domainId, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::drdwQos, OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), len, ACE_Message_Block::length(), LM_ERROR, ACE_OS::memcpy(), Update::ActorStrt< PSQ, RWQ, C, T, CSP >::participantId, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::pubsubQos, ACE_OutputCDR::reset(), Update::SubscriberQos, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::topicId, and Update::ActorStrt< PSQ, RWQ, C, T, CSP >::transportInterfaceInfo.

00641 {
00642   TAO_OutputCDR outCdr;
00643   outCdr << actor.pubsubQos;
00644   ACE_Message_Block dst;
00645   ACE_CDR::consolidate(&dst, outCdr.begin());
00646 
00647   size_t len = dst.length();
00648   char *buf = new (std::nothrow) char[len];
00649 
00650   if (buf == 0) {
00651     ACE_ERROR((LM_ERROR, "(%P|%t) PersistenceUpdater::create( subscription): allocation failed.\n"));
00652     return;
00653   }
00654 
00655   ArrDelAdapter<char> guard(buf);
00656 
00657   ACE_OS::memcpy(buf, dst.base(), len);
00658 
00659   BinSeq pubsub_qos_bin(len, buf);
00660   QosSeq pubsub_qos(SubscriberQos, pubsub_qos_bin);
00661 
00662   outCdr.reset();
00663   outCdr << actor.drdwQos;
00664   ACE_Message_Block dst2;
00665   ACE_CDR::consolidate(&dst2, outCdr.begin());
00666 
00667   len = dst2.length();
00668   char *buf2 = new (std::nothrow) char[len];
00669 
00670   if (buf2 == 0) {
00671     ACE_ERROR((LM_ERROR, "(%P|%t) PersistenceUpdater::create( subscription): allocation failed.\n"));
00672     return;
00673   }
00674 
00675   ArrDelAdapter<char> guard2(buf2);
00676 
00677   ACE_OS::memcpy(buf2, dst2.base(), len);
00678 
00679   BinSeq dwdr_qos_bin(len, buf2);
00680   QosSeq dwdr_qos(DataReaderQos, dwdr_qos_bin);
00681 
00682   outCdr.reset();
00683   outCdr << actor.transportInterfaceInfo;
00684   ACE_Message_Block dst3;
00685   ACE_CDR::consolidate(&dst3, outCdr.begin());
00686 
00687   len = dst3.length();
00688   char *buf3 = new (std::nothrow) char[len];
00689 
00690   if (buf3 == 0) {
00691     ACE_ERROR((LM_ERROR, "(%P|%t) PersistenceUpdater::create( subscription) allocation failed.\n"));
00692     return;
00693   }
00694 
00695   ArrDelAdapter<char> guard3(buf3);
00696 
00697   ACE_OS::memcpy(buf3, dst3.base(), len);
00698   BinSeq tr_bin(len, buf3);
00699 
00700   outCdr.reset();
00701   outCdr << actor.contentSubscriptionProfile.exprParams;
00702   ACE_Message_Block dst4;
00703   ACE_CDR::consolidate(&dst4, outCdr.begin());
00704   len = dst4.length();
00705   char* buf4 = new (std::nothrow) char[len];
00706   if (buf4 == 0) {
00707     ACE_ERROR((LM_ERROR, "(%P|%t) PersistenceUpdater::create( subscription) allocation failed.\n"));
00708     return;
00709   }
00710   ArrDelAdapter<char> guard4(buf4);
00711 
00712   ACE_OS::memcpy(buf4, dst4.base(), len);
00713   ContentSubscriptionBin csp_bin;
00714   csp_bin.filterClassName = actor.contentSubscriptionProfile.filterClassName;
00715   csp_bin.filterExpr = actor.contentSubscriptionProfile.filterExpr;
00716   csp_bin.exprParams = std::make_pair(len, buf4);
00717 
00718   DActor actor_data(actor.domainId, actor.actorId, actor.topicId
00719                     , actor.participantId
00720                     , DataReader, actor.callback.c_str(), pubsub_qos
00721                     , dwdr_qos, tr_bin, csp_bin);
00722 
00723   // allocate memory for ActorData
00724   void* buffer;
00725   ACE_ALLOCATOR(buffer, allocator_->malloc
00726                 (sizeof(PersistenceUpdater::RWActor)));
00727 
00728   // Initialize ActorData
00729   PersistenceUpdater::RWActor* persistent_data =
00730     new(buffer) PersistenceUpdater::RWActor(actor_data
00731                                             , allocator_.get());
00732 
00733   IdType_ExtId ext(actor.actorId);
00734 
00735   // bind ActorData with the actorId
00736   if (actor_index_->bind(ext, persistent_data, allocator_.get()) != 0) {
00737     allocator_->free((void *) buffer);
00738     return;
00739   }
00740 }

Here is the call graph for this function:

void Update::PersistenceUpdater::create ( const UParticipant participant  )  [virtual]

Add participant to be persisted.

Implements Update::Updater.

Definition at line 593 of file PersistenceUpdater.cpp.

References ACE_TEXT(), allocator_, ACE_Message_Block::base(), ACE_OutputCDR::begin(), ACE_Hash_Map_With_Allocator< class, class >::bind(), ACE_CDR::consolidate(), Update::ParticipantStrt< Q >::domainId, OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), len, ACE_Message_Block::length(), LM_ERROR, ACE_OS::memcpy(), Update::ParticipantStrt< Q >::owner, participant_index_, Update::ParticipantStrt< QosSeq >::participantId, Update::ParticipantStrt< Q >::participantId, Update::ParticipantQos, and Update::ParticipantStrt< Q >::participantQos.

00594 {
00595   // serialize the Topic QOS
00596   TAO_OutputCDR outCdr;
00597   outCdr << participant.participantQos;
00598   ACE_Message_Block dst;
00599   ACE_CDR::consolidate(&dst, outCdr.begin());
00600 
00601   size_t len = dst.length();
00602   char *buf;
00603   ACE_NEW_NORETURN(buf, char[len]);
00604   ArrDelAdapter<char> guard(buf);
00605 
00606   if (buf == 0) {
00607     ACE_ERROR((LM_ERROR,
00608                ACE_TEXT("(%P|%t) PersistenceUpdater::create( UParticipant): allocation failed.\n")));
00609     return;
00610   }
00611 
00612   ACE_OS::memcpy(buf, dst.base(), len);
00613 
00614   BinSeq qos_bin(len, buf);
00615 
00616   QosSeq p(ParticipantQos, qos_bin);
00617   DParticipant participant_data
00618   (participant.domainId, participant.owner, participant.participantId, p);
00619 
00620   // allocate memory for ParticipantData
00621   void* buffer;
00622   ACE_ALLOCATOR(buffer, allocator_->malloc
00623                 (sizeof(PersistenceUpdater::Participant)));
00624 
00625   // Initialize ParticipantData
00626   PersistenceUpdater::Participant* persistent_data
00627   = new(buffer) PersistenceUpdater::Participant(participant_data
00628                                                 , allocator_.get());
00629 
00630   IdType_ExtId ext(participant_data.participantId);
00631 
00632   // bind ParticipantData with the participantId
00633   if (participant_index_->bind(ext, persistent_data, allocator_.get()) != 0) {
00634     allocator_->free((void *) buffer);
00635     return;
00636   }
00637 }

Here is the call graph for this function:

void Update::PersistenceUpdater::create ( const UTopic topic  )  [virtual]

Add topic to be persisted.

Implements Update::Updater.

Definition at line 547 of file PersistenceUpdater.cpp.

References ACE_TEXT(), allocator_, ACE_Message_Block::base(), ACE_OutputCDR::begin(), ACE_Hash_Map_With_Allocator< class, class >::bind(), ACE_CDR::consolidate(), Update::TopicStrt< Q, S >::dataType, Update::TopicStrt< Q, S >::domainId, OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), len, ACE_Message_Block::length(), LM_ERROR, ACE_OS::memcpy(), Update::TopicStrt< Q, S >::name, Update::TopicStrt< Q, S >::participantId, topic_index_, Update::TopicStrt< Q, S >::topicId, Update::TopicQos, and Update::TopicStrt< Q, S >::topicQos.

00548 {
00549   // serialize the Topic QOS
00550   TAO_OutputCDR outCdr;
00551   outCdr << topic.topicQos;
00552   ACE_Message_Block dst;
00553   ACE_CDR::consolidate(&dst, outCdr.begin());
00554 
00555   size_t len = dst.length();
00556   char *buf;
00557   ACE_NEW_NORETURN(buf, char[len]);
00558   ArrDelAdapter<char> guard(buf);
00559 
00560   if (buf == 0) {
00561     ACE_ERROR((LM_ERROR,
00562                ACE_TEXT("(%P|%t) PersistenceUpdater::create( UTopic): allocation failed.\n")));
00563     return;
00564   }
00565 
00566   ACE_OS::memcpy(buf, dst.base(), len);
00567 
00568   BinSeq qos_bin(len, buf);
00569 
00570   QosSeq p(TopicQos, qos_bin);
00571   DTopic topic_data(topic.domainId, topic.topicId, topic.participantId
00572                     , topic.name.c_str(), topic.dataType.c_str(), p);
00573 
00574   // allocate memory for TopicData
00575   void* buffer;
00576   ACE_ALLOCATOR(buffer, allocator_->malloc
00577                 (sizeof(PersistenceUpdater::Topic)));
00578 
00579   // Initialize TopicData
00580   PersistenceUpdater::Topic* persistent_data
00581   = new(buffer) PersistenceUpdater::Topic(topic_data, allocator_.get());
00582 
00583   IdType_ExtId ext(topic_data.topicId);
00584 
00585   // bind TopicData with the topicId
00586   if (topic_index_->bind(ext, persistent_data, allocator_.get()) != 0) {
00587     allocator_->free((void *) buffer);
00588     return;
00589   }
00590 }

Here is the call graph for this function:

void Update::PersistenceUpdater::destroy ( const IdPath id,
ItemType  type,
ActorType  actor 
) [virtual]

Remove an entity (but not children) from persistence.

Implements Update::Updater.

Definition at line 995 of file PersistenceUpdater.cpp.

References ACE_TEXT(), Update::Actor, actor_index_, allocator_, Update::ActorStrt< QosSeq, QosSeq, ACE_CString, BinSeq, ContentSubscriptionBin >::cleanup(), Update::ParticipantStrt< QosSeq >::cleanup(), Update::TopicStrt< QosSeq, ACE_CString >::cleanup(), OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), LM_ERROR, Update::Participant, participant_index_, Update::Topic, topic_index_, and ACE_Hash_Map_With_Allocator< class, class >::unbind().

00996 {
00997   IdType_ExtId ext(id.id);
00998   PersistenceUpdater::Topic* topic = 0;
00999   PersistenceUpdater::Participant* participant = 0;
01000   PersistenceUpdater::RWActor* actor = 0;
01001 
01002   switch (type) {
01003   case Update::Topic:
01004 
01005     if (topic_index_->unbind(ext, topic, allocator_.get()) == 0) {
01006       topic->cleanup(allocator_.get());
01007       allocator_->free((void *) topic);
01008     }
01009 
01010     break;
01011   case Update::Participant:
01012 
01013     if (participant_index_->unbind(ext, participant, allocator_.get()) == 0) {
01014       participant->cleanup(allocator_.get());
01015       allocator_->free((void *) participant);
01016     }
01017 
01018     break;
01019   case Update::Actor:
01020 
01021     if (actor_index_->unbind(ext, actor, allocator_.get()) == 0) {
01022       actor->cleanup(allocator_.get());
01023       allocator_->free((void *) actor);
01024     }
01025 
01026     break;
01027   default: {
01028     OpenDDS::DCPS::RepoIdConverter converter(id.id);
01029     ACE_ERROR((LM_ERROR,
01030                ACE_TEXT("(%P|%t) PersistenceUpdater::destroy: ")
01031                ACE_TEXT("unknown entity - %C.\n"),
01032                std::string(converter).c_str()));
01033   }
01034   }
01035 }

Here is the call graph for this function:

int Update::PersistenceUpdater::fini ( void   )  [virtual]

ACE_Task_Base finish method.

Reimplemented from ACE_Shared_Object.

Definition at line 387 of file PersistenceUpdater.cpp.

00388 {
00389   return 0;
00390 }

int Update::PersistenceUpdater::init ( int  argc,
ACE_TCHAR argv[] 
) [virtual]

Service object initialization.

Reimplemented from ACE_Shared_Object.

Definition at line 274 of file PersistenceUpdater.cpp.

References ACE_NEW_RETURN(), ACE_TEXT(), actor_index_, Update::Manager::add(), allocator_, ACE_String_Base< ACE_CHAR_T >::c_str(), Update::createIndex(), OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), Update::index_cleanup(), last_part_id_, LM_ERROR, parse(), participant_index_, persistence_file_, OpenDDS::DCPS::unique_ptr< T, Deleter >::reset(), reset_, topic_index_, and um_.

00275 {
00276   // discover the UpdateManager
00277   um_ = ACE_Dynamic_Service<Update::Manager>::instance("UpdateManagerSvc");
00278 
00279   if (um_ == 0) {
00280     ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: PersistenceUpdater::init ")
00281                ACE_TEXT("No UpdateManager discovered.\n")));
00282     return -1;
00283   }
00284 
00285   this->parse(argc, argv);
00286 
00287 #if defined ACE_HAS_MAC_OSX && defined __x86_64__ && __x86_64__
00288   ACE_MMAP_Memory_Pool::OPTIONS options((void*)0x200000000);
00289 #else
00290   ACE_MMAP_Memory_Pool::OPTIONS options(ACE_DEFAULT_BASE_ADDR);
00291 #endif
00292 
00293   // Create the allocator with the appropriate options.  The name used
00294   // for  the lock is the same as one used for the file.
00295   ALLOCATOR* allocator;
00296   ACE_NEW_RETURN(allocator,
00297                  ALLOCATOR(persistence_file_.c_str(),
00298                            persistence_file_.c_str(),
00299                            &options),
00300                  -1);
00301   allocator_.reset(allocator);
00302 
00303   bool exists = false;
00304 
00305   char* topic_index = (char*)createIndex("TopicIndex", *allocator_
00306                                          , sizeof(TopicIndex), exists);
00307   if (!topic_index) {
00308     return -1;
00309   }
00310 
00311   char* participant_index = (char*)createIndex("ParticipantIndex", *allocator_
00312                                                , sizeof(ParticipantIndex), exists);
00313   if (!participant_index) {
00314     return -1;
00315   }
00316 
00317   char* actor_index = (char*)createIndex("ActorIndex", *allocator_
00318                                          , sizeof(ActorIndex), exists);
00319   if (!actor_index) {
00320     return -1;
00321   }
00322 
00323   void* last_part_id = createIndex(
00324     "LastParticipantId", *allocator_, sizeof(PartIdType), exists);
00325   if (!last_part_id) {
00326     return -1;
00327   }
00328   last_part_id_ = reinterpret_cast<PartIdType*>(last_part_id);
00329 
00330   if (exists) {
00331     topic_index_ = reinterpret_cast<TopicIndex*>(topic_index);
00332     participant_index_ = reinterpret_cast<ParticipantIndex*>(participant_index);
00333     actor_index_ = reinterpret_cast<ActorIndex*>(actor_index);
00334   } else {
00335     topic_index_ = new(topic_index) TopicIndex(allocator_.get());
00336     participant_index_ = new(participant_index) ParticipantIndex(allocator_.get());
00337     actor_index_ = new(actor_index) ActorIndex(allocator_.get());
00338     *last_part_id_ = 0;
00339   }
00340 
00341   if (reset_) {
00342     index_cleanup(topic_index_, allocator_.get());
00343     index_cleanup(participant_index_, allocator_.get());
00344     index_cleanup(actor_index_, allocator_.get());
00345     *last_part_id_ = 0;
00346   }
00347 
00348   // lastly register the callback
00349   um_->add(this);
00350 
00351   return 0;
00352 }

Here is the call graph for this function:

int Update::PersistenceUpdater::parse ( int  argc,
ACE_TCHAR argv[] 
) [private]

Definition at line 355 of file PersistenceUpdater.cpp.

References ACE_TEXT(), ACE_OS::atoi(), LM_DEBUG, persistence_file_, reset_, and ACE_OS::strcasecmp().

Referenced by init().

00356 {
00357   for (ssize_t count = 0; count < argc; count++) {
00358     if (ACE_OS::strcasecmp(argv[count], ACE_TEXT("-file")) == 0) {
00359       if ((count + 1) < argc) {
00360         persistence_file_ = argv[count+1];
00361         count++;
00362       }
00363 
00364     } else if (ACE_OS::strcasecmp(argv[count], ACE_TEXT("-reset")) == 0) {
00365       if ((count + 1) < argc) {
00366         int val = ACE_OS::atoi(argv[count+1]);
00367         reset_ = true;
00368 
00369         if (val == 0) {
00370           reset_ = false;
00371         }
00372 
00373         count++;
00374       }
00375 
00376     } else {
00377       ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) PersistenceUpdater::parse: Unknown option %s\n")
00378                  , argv[count]));
00379       return -1;
00380     }
00381   }
00382 
00383   return 0;
00384 }

Here is the call graph for this function:

Here is the caller graph for this function:

void Update::PersistenceUpdater::requestImage (  )  [virtual]

Request an image refresh to be sent upstream. This is currently done synchronously. TBD: Move to an asynchronous model

Implements Update::Updater.

Definition at line 399 of file PersistenceUpdater.cpp.

References ACE_TEXT(), actor_index_, Update::ActorStrt< QosSeq, QosSeq, ACE_CString, BinSeq, ContentSubscriptionBin >::actorId, Update::ImageData< T, P, A, W >::actors, ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::begin(), ACE_String_Base< ACE_CHAR_T >::c_str(), Update::ActorStrt< QosSeq, QosSeq, ACE_CString, BinSeq, ContentSubscriptionBin >::callback, Update::ActorStrt< QosSeq, QosSeq, ACE_CString, BinSeq, ContentSubscriptionBin >::contentSubscriptionProfile, Update::DataReader, Update::TopicStrt< QosSeq, ACE_CString >::dataType, OpenDDS::DCPS::DCPS_debug_level, Update::ActorStrt< QosSeq, QosSeq, ACE_CString, BinSeq, ContentSubscriptionBin >::domainId, Update::TopicStrt< QosSeq, ACE_CString >::domainId, Update::ParticipantStrt< QosSeq >::domainId, Update::ActorStrt< QosSeq, QosSeq, ACE_CString, BinSeq, ContentSubscriptionBin >::drdwQos, ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::end(), Update::ContentSubscriptionBin::exprParams, Update::ContentSubscriptionBin::filterClassName, Update::ContentSubscriptionBin::filterExpr, last_part_id_, Update::ImageData< T, P, A, W >::lastPartId, LM_DEBUG, LM_ERROR, ACE_OS::memcpy(), Update::TopicStrt< QosSeq, ACE_CString >::name, OPENDDS_STRING, Update::ParticipantStrt< QosSeq >::owner, participant_index_, Update::ActorStrt< QosSeq, QosSeq, ACE_CString, BinSeq, ContentSubscriptionBin >::participantId, Update::TopicStrt< QosSeq, ACE_CString >::participantId, Update::ParticipantStrt< QosSeq >::participantId, Update::ParticipantQos, Update::ParticipantStrt< QosSeq >::participantQos, Update::ImageData< T, P, A, W >::participants, Update::ActorStrt< QosSeq, QosSeq, ACE_CString, BinSeq, ContentSubscriptionBin >::pubsubQos, Update::Manager::pushImage(), topic_index_, Update::ActorStrt< QosSeq, QosSeq, ACE_CString, BinSeq, ContentSubscriptionBin >::topicId, Update::TopicStrt< QosSeq, ACE_CString >::topicId, Update::TopicQos, Update::TopicStrt< QosSeq, ACE_CString >::topicQos, Update::ImageData< T, P, A, W >::topics, Update::ActorStrt< QosSeq, QosSeq, ACE_CString, BinSeq, ContentSubscriptionBin >::transportInterfaceInfo, Update::ActorStrt< QosSeq, QosSeq, ACE_CString, BinSeq, ContentSubscriptionBin >::type, and um_.

00400 {
00401   if (um_ == NULL) {
00402     return;
00403   }
00404 
00405   DImage image;
00406 
00407   // Allocate space to hold the QOS sequences.
00408   std::vector<ArrDelAdapter<char> > qos_sequences;
00409 
00410   for (ParticipantIndex::ITERATOR iter = participant_index_->begin();
00411        iter != participant_index_->end(); iter++) {
00412     const PersistenceUpdater::Participant* participant
00413     = (*iter).int_id_;
00414 
00415     size_t qos_len = participant->participantQos.second.first;
00416     char *buf;
00417     ACE_NEW_NORETURN(buf, char[qos_len]);
00418     qos_sequences.push_back(ArrDelAdapter<char>(buf));
00419 
00420     if (buf == 0) {
00421       ACE_ERROR((LM_ERROR,
00422                  ACE_TEXT("(%P|%t) PersistenceUpdater::requestImage(): allocation failed.\n")));
00423       return;
00424     }
00425 
00426     ACE_OS::memcpy(buf, participant->participantQos.second.second, qos_len);
00427 
00428     BinSeq in_seq(qos_len, buf);
00429     QosSeq qos(ParticipantQos, in_seq);
00430     DParticipant dparticipant(participant->domainId
00431                               , participant->owner
00432                               , participant->participantId
00433                               , qos);
00434     image.participants.push_back(dparticipant);
00435     if (OpenDDS::DCPS::DCPS_debug_level >= 2)  {
00436       OpenDDS::DCPS::RepoIdConverter conv(participant->participantId);
00437       ACE_DEBUG((LM_DEBUG,
00438         "(%P|%t) PersistenceUpdater::requestImage(): loaded participant %C\n",
00439         OPENDDS_STRING(conv).c_str()));
00440     }
00441   }
00442 
00443   for (TopicIndex::ITERATOR iter = topic_index_->begin();
00444        iter != topic_index_->end(); iter++) {
00445     const PersistenceUpdater::Topic* topic = (*iter).int_id_;
00446 
00447     size_t qos_len = topic->topicQos.second.first;
00448     char *buf;
00449     ACE_NEW_NORETURN(buf, char[qos_len]);
00450     qos_sequences.push_back(ArrDelAdapter<char>(buf));
00451 
00452     if (buf == 0) {
00453       ACE_ERROR((LM_ERROR,
00454                  ACE_TEXT("(%P|%t) PersistenceUpdater::requestImage(): allocation failed.\n")));
00455       return;
00456     }
00457 
00458     ACE_OS::memcpy(buf, topic->topicQos.second.second, qos_len);
00459 
00460     BinSeq in_seq(qos_len, buf);
00461     QosSeq qos(TopicQos, in_seq);
00462     DTopic dTopic(topic->domainId, topic->topicId
00463                   , topic->participantId, topic->name.c_str()
00464                   , topic->dataType.c_str(), qos);
00465     image.topics.push_back(dTopic);
00466   }
00467 
00468   for (ActorIndex::ITERATOR iter = actor_index_->begin();
00469        iter != actor_index_->end(); iter++) {
00470     const PersistenceUpdater::RWActor* actor = (*iter).int_id_;
00471 
00472     size_t qos_len = actor->pubsubQos.second.first;
00473     char *buf;
00474     ACE_NEW_NORETURN(buf, char[qos_len]);
00475     qos_sequences.push_back(ArrDelAdapter<char>(buf));
00476 
00477     if (buf == 0) {
00478       ACE_ERROR((LM_ERROR,
00479                  ACE_TEXT("(%P|%t) PersistenceUpdater::requestImage(): allocation failed.\n")));
00480       return;
00481     }
00482 
00483     ACE_OS::memcpy(buf, actor->pubsubQos.second.second, qos_len);
00484 
00485     BinSeq in_pubsub_seq(qos_len, buf);
00486     QosSeq pubsub_qos(actor->pubsubQos.first, in_pubsub_seq);
00487 
00488     qos_len = actor->drdwQos.second.first;
00489     ACE_NEW_NORETURN(buf, char[qos_len]);
00490     qos_sequences.push_back(ArrDelAdapter<char>(buf));
00491 
00492     if (buf == 0) {
00493       ACE_ERROR((LM_ERROR,
00494                  ACE_TEXT("(%P|%t) PersistenceUpdater::requestImage(): allocation failed.\n")));
00495       return;
00496     }
00497 
00498     ACE_OS::memcpy(buf, actor->drdwQos.second.second, qos_len);
00499 
00500     BinSeq in_drdw_seq(qos_len, buf);
00501     QosSeq drdw_qos(actor->drdwQos.first, in_drdw_seq);
00502 
00503     size_t transport_len = actor->transportInterfaceInfo.first;
00504     ACE_NEW_NORETURN(buf, char[transport_len]);
00505     qos_sequences.push_back(ArrDelAdapter<char>(buf));
00506 
00507     if (buf == 0) {
00508       ACE_ERROR((LM_ERROR,
00509                  ACE_TEXT("(%P|%t) PersistenceUpdater::requestImage(): allocation failed.\n")));
00510       return;
00511     }
00512 
00513     ACE_OS::memcpy(buf, actor->transportInterfaceInfo.second, transport_len);
00514 
00515     BinSeq in_transport_seq(transport_len, buf);
00516 
00517     ContentSubscriptionBin in_csp_bin;
00518     if (actor->type == DataReader) {
00519       in_csp_bin.filterClassName = actor->contentSubscriptionProfile.filterClassName;
00520       in_csp_bin.filterExpr = actor->contentSubscriptionProfile.filterExpr;
00521       BinSeq& params = in_csp_bin.exprParams;
00522       ACE_NEW_NORETURN(params.second, char[params.first]);
00523       if (params.second == 0) {
00524         ACE_ERROR((LM_ERROR,
00525                    ACE_TEXT("(%P|%t) PersistenceUpdater::requestImage(): allocation ")
00526                    ACE_TEXT("failed.\n")));
00527         return;
00528       }
00529       qos_sequences.push_back(ArrDelAdapter<char>(params.second));
00530       ACE_OS::memcpy(params.second,
00531         actor->contentSubscriptionProfile.exprParams.second, params.first);
00532     }
00533 
00534     DActor dActor(actor->domainId, actor->actorId, actor->topicId
00535                   , actor->participantId
00536                   , actor->type, actor->callback.c_str()
00537                   , pubsub_qos, drdw_qos, in_transport_seq, in_csp_bin);
00538     image.actors.push_back(dActor);
00539   }
00540 
00541   image.lastPartId = *last_part_id_;
00542 
00543   um_->pushImage(image);
00544 }

Here is the call graph for this function:

void Update::PersistenceUpdater::storeUpdate ( const ACE_Message_Block data,
BinSeq storage 
) [private]

Definition at line 1038 of file PersistenceUpdater.cpp.

References allocator_, ACE_Message_Block::base(), ACE_Message_Block::length(), and ACE_OS::memcpy().

Referenced by update().

01039 {
01040   size_t len = data.length();
01041 
01042   void* buffer;
01043   ACE_ALLOCATOR(buffer, this->allocator_->malloc(len));
01044   ACE_OS::memcpy(buffer, data.base(), len);
01045 
01046   storage.first  = len;
01047   storage.second = static_cast<char*>(buffer);
01048 }

Here is the call graph for this function:

Here is the caller graph for this function:

int Update::PersistenceUpdater::svc ( void   )  [virtual]

ACE_Task_Base start method.

Reimplemented from ACE_Task_Base.

Definition at line 393 of file PersistenceUpdater.cpp.

00394 {
00395   return 0;
00396 }

void Update::PersistenceUpdater::update ( const IdPath id,
const DDS::StringSeq exprParams 
) [virtual]

Persist updated subscription exprParams.

Implements Update::Updater.

Definition at line 972 of file PersistenceUpdater.cpp.

References ACE_TEXT(), actor_index_, allocator_, ACE_OutputCDR::begin(), ACE_CDR::consolidate(), Update::ActorStrt< QosSeq, QosSeq, ACE_CString, BinSeq, ContentSubscriptionBin >::contentSubscriptionProfile, Update::ContentSubscriptionBin::exprParams, ACE_Hash_Map_With_Allocator< class, class >::find(), OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), LM_ERROR, and storeUpdate().

00973 {
00974   IdType_ExtId ext(id.id);
00975   PersistenceUpdater::RWActor* actor_data = 0;
00976 
00977   if (actor_index_->find(ext, actor_data, allocator_.get()) ==  0) {
00978     TAO_OutputCDR outCdr;
00979     outCdr << exprParams;
00980     ACE_Message_Block dst;
00981     ACE_CDR::consolidate(&dst, outCdr.begin());
00982 
00983     storeUpdate(dst, actor_data->contentSubscriptionProfile.exprParams);
00984 
00985   } else {
00986     OpenDDS::DCPS::RepoIdConverter converter(id.id);
00987     ACE_ERROR((LM_ERROR,
00988                ACE_TEXT("(%P|%t) PersistenceUpdater::update(readerQos): ")
00989                ACE_TEXT("subscription %C not found\n"),
00990                std::string(converter).c_str()));
00991   }
00992 }

Here is the call graph for this function:

void Update::PersistenceUpdater::update ( const IdPath id,
const DDS::SubscriberQos qos 
) [virtual]

Persist updated Qos parameters for a Subscriber.

Implements Update::Updater.

Definition at line 949 of file PersistenceUpdater.cpp.

References ACE_TEXT(), actor_index_, ACE_OutputCDR::begin(), ACE_CDR::consolidate(), ACE_Hash_Map_With_Allocator< class, class >::find(), LM_ERROR, Update::ActorStrt< QosSeq, QosSeq, ACE_CString, BinSeq, ContentSubscriptionBin >::pubsubQos, and storeUpdate().

00950 {
00951   IdType_ExtId ext(id.id);
00952   PersistenceUpdater::RWActor* actor_data = 0;
00953 
00954   if (this->actor_index_->find(ext, actor_data, this->allocator_.get()) ==  0) {
00955     TAO_OutputCDR outCdr;
00956     outCdr << qos;
00957     ACE_Message_Block dst;
00958     ACE_CDR::consolidate(&dst, outCdr.begin());
00959 
00960     this->storeUpdate(dst, actor_data->pubsubQos.second);
00961 
00962   } else {
00963     OpenDDS::DCPS::RepoIdConverter converter(id.id);
00964     ACE_ERROR((LM_ERROR,
00965                ACE_TEXT("(%P|%t) PersistenceUpdater::update(subscriberQos): ")
00966                ACE_TEXT("subscription %C not found\n"),
00967                std::string(converter).c_str()));
00968   }
00969 }

Here is the call graph for this function:

void Update::PersistenceUpdater::update ( const IdPath id,
const DDS::DataReaderQos qos 
) [virtual]

Persist updated Qos parameters for a DataReader.

Implements Update::Updater.

Definition at line 926 of file PersistenceUpdater.cpp.

References ACE_TEXT(), actor_index_, ACE_OutputCDR::begin(), ACE_CDR::consolidate(), Update::ActorStrt< QosSeq, QosSeq, ACE_CString, BinSeq, ContentSubscriptionBin >::drdwQos, ACE_Hash_Map_With_Allocator< class, class >::find(), LM_ERROR, and storeUpdate().

00927 {
00928   IdType_ExtId ext(id.id);
00929   PersistenceUpdater::RWActor* actor_data = 0;
00930 
00931   if (this->actor_index_->find(ext, actor_data, this->allocator_.get()) ==  0) {
00932     TAO_OutputCDR outCdr;
00933     outCdr << qos;
00934     ACE_Message_Block dst;
00935     ACE_CDR::consolidate(&dst, outCdr.begin());
00936 
00937     this->storeUpdate(dst, actor_data->drdwQos.second);
00938 
00939   } else {
00940     OpenDDS::DCPS::RepoIdConverter converter(id.id);
00941     ACE_ERROR((LM_ERROR,
00942                ACE_TEXT("(%P|%t) PersistenceUpdater::update(readerQos): ")
00943                ACE_TEXT("subscription %C not found\n"),
00944                std::string(converter).c_str()));
00945   }
00946 }

Here is the call graph for this function:

void Update::PersistenceUpdater::update ( const IdPath id,
const DDS::PublisherQos qos 
) [virtual]

Persist updated Qos parameters for a Publisher.

Implements Update::Updater.

Definition at line 903 of file PersistenceUpdater.cpp.

References ACE_TEXT(), actor_index_, ACE_OutputCDR::begin(), ACE_CDR::consolidate(), ACE_Hash_Map_With_Allocator< class, class >::find(), LM_ERROR, Update::ActorStrt< QosSeq, QosSeq, ACE_CString, BinSeq, ContentSubscriptionBin >::pubsubQos, and storeUpdate().

00904 {
00905   IdType_ExtId ext(id.id);
00906   PersistenceUpdater::RWActor* actor_data = 0;
00907 
00908   if (this->actor_index_->find(ext, actor_data, this->allocator_.get()) ==  0) {
00909     TAO_OutputCDR outCdr;
00910     outCdr << qos;
00911     ACE_Message_Block dst;
00912     ACE_CDR::consolidate(&dst, outCdr.begin());
00913 
00914     this->storeUpdate(dst, actor_data->pubsubQos.second);
00915 
00916   } else {
00917     OpenDDS::DCPS::RepoIdConverter converter(id.id);
00918     ACE_ERROR((LM_ERROR,
00919                ACE_TEXT("(%P|%t) PersistenceUpdater::update(publisherQos): ")
00920                ACE_TEXT("publication %C not found\n"),
00921                std::string(converter).c_str()));
00922   }
00923 }

Here is the call graph for this function:

void Update::PersistenceUpdater::update ( const IdPath id,
const DDS::DataWriterQos qos 
) [virtual]

Persist updated Qos parameters for a DataWriter.

Implements Update::Updater.

Definition at line 880 of file PersistenceUpdater.cpp.

References ACE_TEXT(), actor_index_, ACE_OutputCDR::begin(), ACE_CDR::consolidate(), Update::ActorStrt< QosSeq, QosSeq, ACE_CString, BinSeq, ContentSubscriptionBin >::drdwQos, ACE_Hash_Map_With_Allocator< class, class >::find(), LM_ERROR, and storeUpdate().

00881 {
00882   IdType_ExtId ext(id.id);
00883   PersistenceUpdater::RWActor* actor_data = 0;
00884 
00885   if (this->actor_index_->find(ext, actor_data, this->allocator_.get()) ==  0) {
00886     TAO_OutputCDR outCdr;
00887     outCdr << qos;
00888     ACE_Message_Block dst;
00889     ACE_CDR::consolidate(&dst, outCdr.begin());
00890 
00891     this->storeUpdate(dst, actor_data->drdwQos.second);
00892 
00893   } else {
00894     OpenDDS::DCPS::RepoIdConverter converter(id.id);
00895     ACE_ERROR((LM_ERROR,
00896                ACE_TEXT("(%P|%t) PersistenceUpdater::update(writerQos): ")
00897                ACE_TEXT("publication %C not found\n"),
00898                std::string(converter).c_str()));
00899   }
00900 }

Here is the call graph for this function:

void Update::PersistenceUpdater::update ( const IdPath id,
const DDS::TopicQos qos 
) [virtual]

Persist updated Qos parameters for a Topic.

Implements Update::Updater.

Definition at line 857 of file PersistenceUpdater.cpp.

References ACE_TEXT(), ACE_OutputCDR::begin(), ACE_CDR::consolidate(), ACE_Hash_Map_With_Allocator< class, class >::find(), LM_ERROR, storeUpdate(), topic_index_, and Update::TopicStrt< QosSeq, ACE_CString >::topicQos.

00858 {
00859   IdType_ExtId ext(id.id);
00860   PersistenceUpdater::Topic* topic_data = 0;
00861 
00862   if (this->topic_index_->find(ext, topic_data, this->allocator_.get()) ==  0) {
00863     TAO_OutputCDR outCdr;
00864     outCdr << qos;
00865     ACE_Message_Block dst;
00866     ACE_CDR::consolidate(&dst, outCdr.begin());
00867 
00868     this->storeUpdate(dst, topic_data->topicQos.second);
00869 
00870   } else {
00871     OpenDDS::DCPS::RepoIdConverter converter(id.id);
00872     ACE_ERROR((LM_ERROR,
00873                ACE_TEXT("(%P|%t) PersistenceUpdater::update: ")
00874                ACE_TEXT("topic %C not found\n"),
00875                std::string(converter).c_str()));
00876   }
00877 }

Here is the call graph for this function:

void Update::PersistenceUpdater::update ( const IdPath id,
const DDS::DomainParticipantQos qos 
) [virtual]

Persist updated Qos parameters for a Participant.

Implements Update::Updater.

Definition at line 834 of file PersistenceUpdater.cpp.

References ACE_TEXT(), ACE_OutputCDR::begin(), ACE_CDR::consolidate(), ACE_Hash_Map_With_Allocator< class, class >::find(), LM_ERROR, participant_index_, Update::ParticipantStrt< QosSeq >::participantQos, and storeUpdate().

00835 {
00836   IdType_ExtId ext(id.id);
00837   PersistenceUpdater::Participant* part_data = 0;
00838 
00839   if (this->participant_index_->find(ext, part_data, this->allocator_.get()) ==  0) {
00840     TAO_OutputCDR outCdr;
00841     outCdr << qos;
00842     ACE_Message_Block dst;
00843     ACE_CDR::consolidate(&dst, outCdr.begin());
00844 
00845     this->storeUpdate(dst, part_data->participantQos.second);
00846 
00847   } else {
00848     OpenDDS::DCPS::RepoIdConverter converter(id.id);
00849     ACE_ERROR((LM_ERROR,
00850                ACE_TEXT("(%P|%t) PersistenceUpdater::update: ")
00851                ACE_TEXT("participant %C not found\n"),
00852                std::string(converter).c_str()));
00853   }
00854 }

Here is the call graph for this function:

void Update::PersistenceUpdater::updateLastPartId ( PartIdType  partId  )  [virtual]

Update Last Participant Id for repo.

Reimplemented from Update::Updater.

Definition at line 1050 of file PersistenceUpdater.cpp.

References last_part_id_.

01051 {
01052   *last_part_id_ = partId;
01053 }


Member Data Documentation

Persisted Readers and Writers.

Definition at line 149 of file PersistenceUpdater.h.

Referenced by create(), destroy(), init(), requestImage(), and update().

Definition at line 140 of file PersistenceUpdater.h.

Referenced by create(), destroy(), init(), storeUpdate(), and update().

What the last participant id is/was.

Definition at line 152 of file PersistenceUpdater.h.

Referenced by init(), requestImage(), and updateLastPartId().

Persisted Participants.

Definition at line 146 of file PersistenceUpdater.h.

Referenced by create(), destroy(), init(), requestImage(), and update().

Definition at line 135 of file PersistenceUpdater.h.

Referenced by init(), and parse().

Definition at line 136 of file PersistenceUpdater.h.

Referenced by init(), and parse().

Persisted Topics.

Definition at line 143 of file PersistenceUpdater.h.

Referenced by create(), destroy(), init(), requestImage(), and update().

Definition at line 138 of file PersistenceUpdater.h.

Referenced by init(), and requestImage().


The documentation for this class was generated from the following files:
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1