Update::PersistenceUpdater Class Reference

#include <PersistenceUpdater.h>

Inheritance diagram for Update::PersistenceUpdater:

Inheritance graph
[legend]
Collaboration diagram for Update::PersistenceUpdater:

Collaboration graph
[legend]
List of all members.

Public Types

typedef ACE_Allocator_Adapter<
ACE_Malloc< ACE_MMAP_MEMORY_POOL,
TAO_SYNCH_MUTEX > > 
ALLOCATOR
typedef TopicStrt< QosSeq,
ACE_CString > 
Topic
 Persisted entity data structures.
typedef ParticipantStrt< QosSeqParticipant
typedef ActorStrt< QosSeq,
QosSeq, ACE_CString, BinSeq,
ContentSubscriptionBin
RWActor
typedef ACE_Hash_Map_With_Allocator<
IdType_ExtId, Topic * > 
TopicIndex
typedef ACE_Hash_Map_With_Allocator<
IdType_ExtId, Participant * > 
ParticipantIndex
typedef ACE_Hash_Map_With_Allocator<
IdType_ExtId, RWActor * > 
ActorIndex

Public Member Functions

 PersistenceUpdater ()
virtual ~PersistenceUpdater ()
virtual int init (int argc, ACE_TCHAR *argv[])
 Service object initialization.
virtual int fini ()
 pure ACE_Task_Base methods
virtual int svc ()
virtual void requestImage ()
virtual void create (const UTopic &topic)
 Add entities to be persisted.
virtual void create (const UParticipant &participant)
virtual void create (const URActor &actor)
virtual void create (const UWActor &actor)
virtual void create (const OwnershipData &data)
virtual void update (const IdPath &id, const DDS::DomainParticipantQos &qos)
 Persist updated Qos parameters for an entity.
virtual void update (const IdPath &id, const DDS::TopicQos &qos)
virtual void update (const IdPath &id, const DDS::DataWriterQos &qos)
virtual void update (const IdPath &id, const DDS::PublisherQos &qos)
virtual void update (const IdPath &id, const DDS::DataReaderQos &qos)
virtual void update (const IdPath &id, const DDS::SubscriberQos &qos)
virtual void update (const IdPath &id, const DDS::StringSeq &exprParams)
virtual void destroy (const IdPath &id, ItemType type, ActorType actor)
 Remove an entity (but not children) from persistence.

Private Member Functions

int parse (int argc, ACE_TCHAR *argv[])
void storeUpdate (const ACE_Message_Block &data, BinSeq &storage)

Private Attributes

ACE_TString persistence_file_
bool reset_
Managerum_
ALLOCATORallocator_
TopicIndextopic_index_
ParticipantIndexparticipant_index_
ActorIndexactor_index_

Classes

class  IdType_ExtId

Detailed Description

Definition at line 30 of file PersistenceUpdater.h.


Member Typedef Documentation

typedef ACE_Hash_Map_With_Allocator<IdType_ExtId, RWActor*> Update::PersistenceUpdater::ActorIndex

Definition at line 62 of file PersistenceUpdater.h.

typedef ACE_Allocator_Adapter<ACE_Malloc <ACE_MMAP_MEMORY_POOL , TAO_SYNCH_MUTEX> > Update::PersistenceUpdater::ALLOCATOR

Definition at line 52 of file PersistenceUpdater.h.

typedef struct ParticipantStrt< QosSeq > Update::PersistenceUpdater::Participant

Definition at line 56 of file PersistenceUpdater.h.

typedef ACE_Hash_Map_With_Allocator<IdType_ExtId, Participant*> Update::PersistenceUpdater::ParticipantIndex

Definition at line 61 of file PersistenceUpdater.h.

typedef struct ActorStrt< QosSeq, QosSeq, ACE_CString,BinSeq, ContentSubscriptionBin > Update::PersistenceUpdater::RWActor

Definition at line 57 of file PersistenceUpdater.h.

typedef struct TopicStrt< QosSeq, ACE_CString > Update::PersistenceUpdater::Topic

Persisted entity data structures.

Definition at line 55 of file PersistenceUpdater.h.

typedef ACE_Hash_Map_With_Allocator<IdType_ExtId, Topic*> Update::PersistenceUpdater::TopicIndex

Definition at line 60 of file PersistenceUpdater.h.


Constructor & Destructor Documentation

Update::PersistenceUpdater::PersistenceUpdater (  ) 

Definition at line 211 of file PersistenceUpdater.cpp.

00212   : persistence_file_(ACE_TEXT("InforepoPersist"))
00213   , reset_(false)
00214   , um_(0)
00215   , allocator_(0)
00216   , topic_index_(0)
00217   , participant_index_(0)
00218   , actor_index_(0)
00219 
00220 {}

Update::PersistenceUpdater::~PersistenceUpdater (  )  [virtual]

Definition at line 222 of file PersistenceUpdater.cpp.

00223 {}


Member Function Documentation

void Update::PersistenceUpdater::create ( const OwnershipData data  )  [virtual]

Implements Update::Updater.

Definition at line 825 of file PersistenceUpdater.cpp.

00826 {
00827   /* This method intentionally left unimplemented. */
00828 }

virtual void Update::PersistenceUpdater::create ( const UWActor actor  )  [virtual]

Implements Update::Updater.

void Update::PersistenceUpdater::create ( const URActor actor  )  [virtual]

Implements Update::Updater.

Definition at line 637 of file PersistenceUpdater.cpp.

References actor_index_, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::actorId, allocator_, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::callback, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::contentSubscriptionProfile, Update::DataReader, Update::DataReaderQos, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::domainId, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::drdwQos, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::participantId, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::pubsubQos, Update::SubscriberQos, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::topicId, and Update::ActorStrt< PSQ, RWQ, C, T, CSP >::transportInterfaceInfo.

00638 {
00639   TAO_OutputCDR outCdr;
00640   outCdr << actor.pubsubQos;
00641   ACE_Message_Block dst;
00642   ACE_CDR::consolidate(&dst, outCdr.begin());
00643 
00644   size_t len = dst.length();
00645   char *buf = new (std::nothrow) char[len];
00646 
00647   if (buf == 0) {
00648     ACE_ERROR((LM_ERROR, "PersistenceUpdater::create( subscription): allocation failed.\n"));
00649     return;
00650   }
00651 
00652   ArrDelAdapter<char> guard(buf);
00653 
00654   ACE_OS::memcpy(buf, dst.base(), len);
00655 
00656   BinSeq pubsub_qos_bin(len, buf);
00657   QosSeq pubsub_qos(SubscriberQos, pubsub_qos_bin);
00658 
00659   outCdr.reset();
00660   outCdr << actor.drdwQos;
00661   ACE_Message_Block dst2;
00662   ACE_CDR::consolidate(&dst2, outCdr.begin());
00663 
00664   len = dst2.length();
00665   char *buf2 = new (std::nothrow) char[len];
00666 
00667   if (buf2 == 0) {
00668     ACE_ERROR((LM_ERROR, "PersistenceUpdater::create( subscription): allocation failed.\n"));
00669     return;
00670   }
00671 
00672   ArrDelAdapter<char> guard2(buf2);
00673 
00674   ACE_OS::memcpy(buf2, dst2.base(), len);
00675 
00676   BinSeq dwdr_qos_bin(len, buf2);
00677   QosSeq dwdr_qos(DataReaderQos, dwdr_qos_bin);
00678 
00679   outCdr.reset();
00680   outCdr << actor.transportInterfaceInfo;
00681   ACE_Message_Block dst3;
00682   ACE_CDR::consolidate(&dst3, outCdr.begin());
00683 
00684   len = dst3.length();
00685   char *buf3 = new (std::nothrow) char[len];
00686 
00687   if (buf3 == 0) {
00688     ACE_ERROR((LM_ERROR, "PersistenceUpdater::create( subscription) allocation failed.\n"));
00689     return;
00690   }
00691 
00692   ArrDelAdapter<char> guard3(buf3);
00693 
00694   ACE_OS::memcpy(buf3, dst3.base(), len);
00695   BinSeq tr_bin(len, buf3);
00696 
00697   outCdr.reset();
00698   outCdr << actor.contentSubscriptionProfile.exprParams;
00699   ACE_Message_Block dst4;
00700   ACE_CDR::consolidate(&dst4, outCdr.begin());
00701   len = dst4.length();
00702   char* buf4 = new (std::nothrow) char[len];
00703   if (buf4 == 0) {
00704     ACE_ERROR((LM_ERROR, "PersistenceUpdater::create( subscription) allocation failed.\n"));
00705     return;
00706   }
00707   ArrDelAdapter<char> guard4(buf4);
00708 
00709   ACE_OS::memcpy(buf4, dst4.base(), len);
00710   ContentSubscriptionBin csp_bin;
00711   csp_bin.filterClassName = actor.contentSubscriptionProfile.filterClassName;
00712   csp_bin.filterExpr = actor.contentSubscriptionProfile.filterExpr;
00713   csp_bin.exprParams = std::make_pair(len, buf4);
00714 
00715   DActor actor_data(actor.domainId, actor.actorId, actor.topicId
00716                     , actor.participantId
00717                     , DataReader, actor.callback.c_str(), pubsub_qos
00718                     , dwdr_qos, tr_bin, csp_bin);
00719 
00720   // allocate memory for ActorData
00721   void* buffer;
00722   ACE_ALLOCATOR(buffer, allocator_->malloc
00723                 (sizeof(PersistenceUpdater::RWActor)));
00724 
00725   // Initialize ActorData
00726   PersistenceUpdater::RWActor* persistent_data =
00727     new(buffer) PersistenceUpdater::RWActor(actor_data
00728                                             , allocator_);
00729 
00730   IdType_ExtId ext(actor.actorId);
00731 
00732   // bind ActorData with the actorId
00733   if (actor_index_->bind(ext, persistent_data, allocator_) != 0) {
00734     allocator_->free((void *) buffer);
00735     return;
00736   }
00737 }

void Update::PersistenceUpdater::create ( const UParticipant participant  )  [virtual]

Implements Update::Updater.

Definition at line 590 of file PersistenceUpdater.cpp.

References allocator_, Update::ParticipantStrt< Q >::domainId, Update::ParticipantStrt< Q >::owner, participant_index_, Update::ParticipantStrt< QosSeq >::participantId, Update::ParticipantStrt< Q >::participantId, Update::ParticipantQos, and Update::ParticipantStrt< Q >::participantQos.

00591 {
00592   // serialize the Topic QOS
00593   TAO_OutputCDR outCdr;
00594   outCdr << participant.participantQos;
00595   ACE_Message_Block dst;
00596   ACE_CDR::consolidate(&dst, outCdr.begin());
00597 
00598   size_t len = dst.length();
00599   char *buf;
00600   ACE_NEW_NORETURN(buf, char[len]);
00601   ArrDelAdapter<char> guard(buf);
00602 
00603   if (buf == 0) {
00604     ACE_ERROR((LM_ERROR,
00605                ACE_TEXT("PersistenceUpdater::create( UParticipant): allocation failed.\n")));
00606     return;
00607   }
00608 
00609   ACE_OS::memcpy(buf, dst.base(), len);
00610 
00611   BinSeq qos_bin(len, buf);
00612 
00613   QosSeq p(ParticipantQos, qos_bin);
00614   DParticipant participant_data
00615   (participant.domainId, participant.owner, participant.participantId, p);
00616 
00617   // allocate memory for ParticipantData
00618   void* buffer;
00619   ACE_ALLOCATOR(buffer, allocator_->malloc
00620                 (sizeof(PersistenceUpdater::Participant)));
00621 
00622   // Initialize ParticipantData
00623   PersistenceUpdater::Participant* persistent_data
00624   = new(buffer) PersistenceUpdater::Participant(participant_data
00625                                                 , allocator_);
00626 
00627   IdType_ExtId ext(participant_data.participantId);
00628 
00629   // bind ParticipantData with the participantId
00630   if (participant_index_->bind(ext, persistent_data, allocator_) != 0) {
00631     allocator_->free((void *) buffer);
00632     return;
00633   }
00634 }

void Update::PersistenceUpdater::create ( const UTopic topic  )  [virtual]

Add entities to be persisted.

Implements Update::Updater.

Definition at line 544 of file PersistenceUpdater.cpp.

References allocator_, Update::TopicStrt< Q, S >::dataType, Update::TopicStrt< Q, S >::domainId, Update::TopicStrt< Q, S >::name, Update::TopicStrt< Q, S >::participantId, topic_index_, Update::TopicStrt< QosSeq, ACE_CString >::topicId, Update::TopicStrt< Q, S >::topicId, Update::TopicQos, and Update::TopicStrt< Q, S >::topicQos.

00545 {
00546   // serialize the Topic QOS
00547   TAO_OutputCDR outCdr;
00548   outCdr << topic.topicQos;
00549   ACE_Message_Block dst;
00550   ACE_CDR::consolidate(&dst, outCdr.begin());
00551 
00552   size_t len = dst.length();
00553   char *buf;
00554   ACE_NEW_NORETURN(buf, char[len]);
00555   ArrDelAdapter<char> guard(buf);
00556 
00557   if (buf == 0) {
00558     ACE_ERROR((LM_ERROR,
00559                ACE_TEXT("PersistenceUpdater::create( UTopic): allocation failed.\n")));
00560     return;
00561   }
00562 
00563   ACE_OS::memcpy(buf, dst.base(), len);
00564 
00565   BinSeq qos_bin(len, buf);
00566 
00567   QosSeq p(TopicQos, qos_bin);
00568   DTopic topic_data(topic.domainId, topic.topicId, topic.participantId
00569                     , topic.name.c_str(), topic.dataType.c_str(), p);
00570 
00571   // allocate memory for TopicData
00572   void* buffer;
00573   ACE_ALLOCATOR(buffer, allocator_->malloc
00574                 (sizeof(PersistenceUpdater::Topic)));
00575 
00576   // Initialize TopicData
00577   PersistenceUpdater::Topic* persistent_data
00578   = new(buffer) PersistenceUpdater::Topic(topic_data, allocator_);
00579 
00580   IdType_ExtId ext(topic_data.topicId);
00581 
00582   // bind TopicData with the topicId
00583   if (topic_index_->bind(ext, persistent_data, allocator_) != 0) {
00584     allocator_->free((void *) buffer);
00585     return;
00586   }
00587 }

void Update::PersistenceUpdater::destroy ( const IdPath id,
ItemType  type,
ActorType  actor 
) [virtual]

Remove an entity (but not children) from persistence.

Implements Update::Updater.

Definition at line 992 of file PersistenceUpdater.cpp.

References Update::Actor, actor_index_, allocator_, Update::ActorStrt< QosSeq, QosSeq, ACE_CString, BinSeq, ContentSubscriptionBin >::cleanup(), Update::ParticipantStrt< QosSeq >::cleanup(), Update::TopicStrt< QosSeq, ACE_CString >::cleanup(), Update::Participant, participant_index_, Update::Topic, and topic_index_.

00993 {
00994   IdType_ExtId ext(id.id);
00995   PersistenceUpdater::Topic* topic = 0;
00996   PersistenceUpdater::Participant* participant = 0;
00997   PersistenceUpdater::RWActor* actor = 0;
00998 
00999   switch (type) {
01000   case Update::Topic:
01001 
01002     if (topic_index_->unbind(ext, topic, allocator_) == 0) {
01003       topic->cleanup(allocator_);
01004       allocator_->free((void *) topic);
01005     }
01006 
01007     break;
01008   case Update::Participant:
01009 
01010     if (participant_index_->unbind(ext, participant, allocator_) == 0) {
01011       participant->cleanup(allocator_);
01012       allocator_->free((void *) participant);
01013     }
01014 
01015     break;
01016   case Update::Actor:
01017 
01018     if (actor_index_->unbind(ext, actor, allocator_) == 0) {
01019       actor->cleanup(allocator_);
01020       allocator_->free((void *) actor);
01021     }
01022 
01023     break;
01024   default: {
01025     OpenDDS::DCPS::RepoIdConverter converter(id.id);
01026     ACE_ERROR((LM_ERROR,
01027                ACE_TEXT("(%P | %t) PersistenceUpdater::destroy: ")
01028                ACE_TEXT("unknown entity - %C.\n"),
01029                std::string(converter).c_str()));
01030   }
01031   }
01032 }

int Update::PersistenceUpdater::fini (  )  [virtual]

pure ACE_Task_Base methods

Definition at line 392 of file PersistenceUpdater.cpp.

00393 {
00394   return 0;
00395 }

int Update::PersistenceUpdater::init ( int  argc,
ACE_TCHAR *  argv[] 
) [virtual]

Service object initialization.

Definition at line 268 of file PersistenceUpdater.cpp.

References actor_index_, Update::Manager::add(), allocator_, Update::createIndex(), Update::index_cleanup(), parse(), participant_index_, persistence_file_, reset_, topic_index_, and um_.

00269 {
00270   // discover the UpdateManager
00271   um_ = ACE_Dynamic_Service<Update::Manager>::instance
00272         ("UpdateManagerSvc");
00273 
00274   if (um_ == 0) {
00275     ACE_ERROR((LM_ERROR, ACE_TEXT("PersistenceUpdater initialization failed. ")
00276                ACE_TEXT("No UpdateManager discovered.\n")));
00277     return -1;
00278   }
00279 
00280   this->parse(argc, argv);
00281 
00282 #if defined ACE_HAS_MAC_OSX && defined __x86_64__ && __x86_64__
00283   ACE_MMAP_Memory_Pool::OPTIONS options((void*)0x200000000);
00284 #else
00285   ACE_MMAP_Memory_Pool::OPTIONS options(ACE_DEFAULT_BASE_ADDR);
00286 #endif
00287 
00288   // Create the allocator with the appropriate options.  The name used
00289   // for  the lock is the same as one used for the file.
00290   ACE_NEW_RETURN(allocator_,
00291                  ALLOCATOR(persistence_file_.c_str(),
00292                            persistence_file_.c_str(),
00293                            &options),
00294                  -1);
00295 
00296   std::string topic_tag("TopicIndex");
00297   std::string participant_tag("ParticipantIndex");
00298   std::string actor_tag("ActorIndex");
00299   bool exists = false, ex = false;
00300 
00301   char* topic_index = (char*)createIndex(topic_tag, *allocator_
00302                                          , sizeof(TopicIndex), ex);
00303 
00304   if (topic_index == 0) {
00305     ACE_DEBUG((LM_DEBUG, ACE_TEXT("Initial allocation/Bind failed 1.\n")));
00306     return -1;
00307   }
00308 
00309   exists = exists || ex;
00310 
00311   char* participant_index = (char*)createIndex(participant_tag, *allocator_
00312                                                , sizeof(ParticipantIndex), ex);
00313 
00314   if (participant_index == 0) {
00315     ACE_DEBUG((LM_DEBUG, ACE_TEXT("Initial allocation/Bind failed 2.\n")));
00316     return -1;
00317   }
00318 
00319   exists = exists || ex;
00320 
00321   char* actor_index = (char*)createIndex(actor_tag, *allocator_
00322                                          , sizeof(ActorIndex), ex);
00323 
00324   if (actor_index == 0) {
00325     ACE_DEBUG((LM_DEBUG, ACE_TEXT("Initial allocation/Bind failed 2.\n")));
00326     return -1;
00327   }
00328 
00329   exists = exists || ex;
00330 
00331   if (exists) {
00332     topic_index_ = reinterpret_cast<TopicIndex*>(topic_index);
00333     participant_index_ = reinterpret_cast<ParticipantIndex*>(participant_index);
00334     actor_index_ = reinterpret_cast<ActorIndex*>(actor_index);
00335 
00336     if (!(topic_index_ && participant_index_ && actor_index_)) {
00337       ACE_ERROR((LM_DEBUG, ACE_TEXT("Unable to narrow persistent indexes.\n")));
00338       return -1;
00339     }
00340 
00341   } else {
00342     topic_index_ = new(topic_index) TopicIndex(allocator_);
00343     participant_index_ = new(participant_index) ParticipantIndex(allocator_);
00344     actor_index_ = new(actor_index) ActorIndex(allocator_);
00345   }
00346 
00347   if (reset_) {
00348     index_cleanup(topic_index_, allocator_);
00349     index_cleanup(participant_index_, allocator_);
00350     index_cleanup(actor_index_, allocator_);
00351   }
00352 
00353   // lastly register the callback
00354   um_->add(this);
00355 
00356   return 0;
00357 }

int Update::PersistenceUpdater::parse ( int  argc,
ACE_TCHAR *  argv[] 
) [private]

Definition at line 360 of file PersistenceUpdater.cpp.

References persistence_file_, and reset_.

Referenced by init().

00361 {
00362   for (ssize_t count = 0; count < argc; count++) {
00363     if (ACE_OS::strcasecmp(argv[count], ACE_TEXT("-file")) == 0) {
00364       if ((count + 1) < argc) {
00365         persistence_file_ = argv[count+1];
00366         count++;
00367       }
00368 
00369     } else if (ACE_OS::strcasecmp(argv[count], ACE_TEXT("-reset")) == 0) {
00370       if ((count + 1) < argc) {
00371         int val = ACE_OS::atoi(argv[count+1]);
00372         reset_ = true;
00373 
00374         if (val == 0) {
00375           reset_ = false;
00376         }
00377 
00378         count++;
00379       }
00380 
00381     } else {
00382       ACE_DEBUG((LM_DEBUG, ACE_TEXT("Unknown option %s\n")
00383                  , argv[count]));
00384       return -1;
00385     }
00386   }
00387 
00388   return 0;
00389 }

void Update::PersistenceUpdater::requestImage (  )  [virtual]

Request an image refresh to be sent upstream. This is currently done synchronously. TBD: Move to an asynchronous model

Implements Update::Updater.

Definition at line 404 of file PersistenceUpdater.cpp.

References actor_index_, Update::ImageData< T, P, A, W >::actors, Update::DataReader, Update::ContentSubscriptionBin::exprParams, Update::ContentSubscriptionBin::filterClassName, Update::ContentSubscriptionBin::filterExpr, participant_index_, Update::ParticipantQos, Update::ImageData< T, P, A, W >::participants, Update::Manager::pushImage(), topic_index_, Update::TopicQos, Update::ImageData< T, P, A, W >::topics, and um_.

00405 {
00406   if (um_ == NULL) {
00407     return;
00408   }
00409 
00410   DImage image;
00411 
00412   // Allocate space to hold the QOS sequences.
00413   std::vector<ArrDelAdapter<char> > qos_sequences;
00414 
00415   for (ParticipantIndex::ITERATOR iter = participant_index_->begin();
00416        iter != participant_index_->end(); iter++) {
00417     const PersistenceUpdater::Participant* participant
00418     = (*iter).int_id_;
00419 
00420     size_t qos_len = participant->participantQos.second.first;
00421     char *buf;
00422     ACE_NEW_NORETURN(buf, char[qos_len]);
00423     qos_sequences.push_back(ArrDelAdapter<char>(buf));
00424 
00425     if (buf == 0) {
00426       ACE_ERROR((LM_ERROR,
00427                  ACE_TEXT("PersistenceUpdater::requestImage(): allocation failed.\n")));
00428       return;
00429     }
00430 
00431     ACE_OS::memcpy(buf, participant->participantQos.second.second, qos_len);
00432 
00433     BinSeq in_seq(qos_len, buf);
00434     QosSeq qos(ParticipantQos, in_seq);
00435     DParticipant dparticipant(participant->domainId
00436                               , participant->owner
00437                               , participant->participantId
00438                               , qos);
00439     image.participants.push_back(dparticipant);
00440   }
00441 
00442   for (TopicIndex::ITERATOR iter = topic_index_->begin();
00443        iter != topic_index_->end(); iter++) {
00444     const PersistenceUpdater::Topic* topic = (*iter).int_id_;
00445 
00446     size_t qos_len = topic->topicQos.second.first;
00447     char *buf;
00448     ACE_NEW_NORETURN(buf, char[qos_len]);
00449     qos_sequences.push_back(ArrDelAdapter<char>(buf));
00450 
00451     if (buf == 0) {
00452       ACE_ERROR((LM_ERROR,
00453                  ACE_TEXT("PersistenceUpdater::requestImage(): allocation failed.\n")));
00454       return;
00455     }
00456 
00457     ACE_OS::memcpy(buf, topic->topicQos.second.second, qos_len);
00458 
00459     BinSeq in_seq(qos_len, buf);
00460     QosSeq qos(TopicQos, in_seq);
00461     DTopic dTopic(topic->domainId, topic->topicId
00462                   , topic->participantId, topic->name.c_str()
00463                   , topic->dataType.c_str(), qos);
00464     image.topics.push_back(dTopic);
00465   }
00466 
00467   for (ActorIndex::ITERATOR iter = actor_index_->begin();
00468        iter != actor_index_->end(); iter++) {
00469     const PersistenceUpdater::RWActor* actor = (*iter).int_id_;
00470 
00471     size_t qos_len = actor->pubsubQos.second.first;
00472     char *buf;
00473     ACE_NEW_NORETURN(buf, char[qos_len]);
00474     qos_sequences.push_back(ArrDelAdapter<char>(buf));
00475 
00476     if (buf == 0) {
00477       ACE_ERROR((LM_ERROR,
00478                  ACE_TEXT("PersistenceUpdater::requestImage(): allocation failed.\n")));
00479       return;
00480     }
00481 
00482     ACE_OS::memcpy(buf, actor->pubsubQos.second.second, qos_len);
00483 
00484     BinSeq in_pubsub_seq(qos_len, buf);
00485     QosSeq pubsub_qos(actor->pubsubQos.first, in_pubsub_seq);
00486 
00487     qos_len = actor->drdwQos.second.first;
00488     ACE_NEW_NORETURN(buf, char[qos_len]);
00489     qos_sequences.push_back(ArrDelAdapter<char>(buf));
00490 
00491     if (buf == 0) {
00492       ACE_ERROR((LM_ERROR,
00493                  ACE_TEXT("PersistenceUpdater::requestImage(): allocation failed.\n")));
00494       return;
00495     }
00496 
00497     ACE_OS::memcpy(buf, actor->drdwQos.second.second, qos_len);
00498 
00499     BinSeq in_drdw_seq(qos_len, buf);
00500     QosSeq drdw_qos(actor->drdwQos.first, in_drdw_seq);
00501 
00502     qos_len = actor->transportInterfaceInfo.first;
00503     ACE_NEW_NORETURN(buf, char[qos_len]);
00504     qos_sequences.push_back(ArrDelAdapter<char>(buf));
00505 
00506     if (buf == 0) {
00507       ACE_ERROR((LM_ERROR,
00508                  ACE_TEXT("PersistenceUpdater::requestImage(): allocation failed.\n")));
00509       return;
00510     }
00511 
00512     ACE_OS::memcpy(buf, actor->transportInterfaceInfo.second, qos_len);
00513 
00514     BinSeq in_transport_seq(qos_len, buf);
00515 
00516     ContentSubscriptionBin in_csp_bin;
00517     if (actor->type == DataReader) {
00518       in_csp_bin.filterClassName = actor->contentSubscriptionProfile.filterClassName;
00519       in_csp_bin.filterExpr = actor->contentSubscriptionProfile.filterExpr;
00520       BinSeq& params = in_csp_bin.exprParams;
00521       ACE_NEW_NORETURN(params.second, char[params.first]);
00522       if (params.second == 0) {
00523         ACE_ERROR((LM_ERROR,
00524                    ACE_TEXT("PersistenceUpdater::requestImage(): allocation ")
00525                    ACE_TEXT("failed.\n")));
00526         return;
00527       }
00528       qos_sequences.push_back(ArrDelAdapter<char>(params.second));
00529       ACE_OS::memcpy(params.second,
00530         actor->contentSubscriptionProfile.exprParams.second, params.first);
00531     }
00532 
00533     DActor dActor(actor->domainId, actor->actorId, actor->topicId
00534                   , actor->participantId
00535                   , actor->type, actor->callback.c_str()
00536                   , pubsub_qos, drdw_qos, in_transport_seq, in_csp_bin);
00537     image.actors.push_back(dActor);
00538   }
00539 
00540   um_->pushImage(image);
00541 }

void Update::PersistenceUpdater::storeUpdate ( const ACE_Message_Block &  data,
BinSeq storage 
) [private]

Definition at line 1035 of file PersistenceUpdater.cpp.

Referenced by update().

01036 {
01037   size_t len = data.length();
01038 
01039   void* buffer;
01040   ACE_ALLOCATOR(buffer, this->allocator_->malloc(len));
01041   ACE_OS::memcpy(buffer, data.base(), len);
01042 
01043   storage.first  = len;
01044   storage.second = static_cast<char*>(buffer);
01045 }

int Update::PersistenceUpdater::svc (  )  [virtual]

Definition at line 398 of file PersistenceUpdater.cpp.

00399 {
00400   return 0;
00401 }

void Update::PersistenceUpdater::update ( const IdPath id,
const DDS::StringSeq exprParams 
) [virtual]

Implements Update::Updater.

Definition at line 969 of file PersistenceUpdater.cpp.

References actor_index_, allocator_, Update::ActorStrt< QosSeq, QosSeq, ACE_CString, BinSeq, ContentSubscriptionBin >::contentSubscriptionProfile, Update::ContentSubscriptionBin::exprParams, and storeUpdate().

00970 {
00971   IdType_ExtId ext(id.id);
00972   PersistenceUpdater::RWActor* actor_data = 0;
00973 
00974   if (actor_index_->find(ext, actor_data, allocator_) ==  0) {
00975     TAO_OutputCDR outCdr;
00976     outCdr << exprParams;
00977     ACE_Message_Block dst;
00978     ACE_CDR::consolidate(&dst, outCdr.begin());
00979 
00980     storeUpdate(dst, actor_data->contentSubscriptionProfile.exprParams);
00981 
00982   } else {
00983     OpenDDS::DCPS::RepoIdConverter converter(id.id);
00984     ACE_ERROR((LM_ERROR,
00985                ACE_TEXT("(%P|%t) PersistenceUpdater::update(readerQos): ")
00986                ACE_TEXT("subscription %C not found\n"),
00987                std::string(converter).c_str()));
00988   }
00989 }

void Update::PersistenceUpdater::update ( const IdPath id,
const DDS::SubscriberQos qos 
) [virtual]

Implements Update::Updater.

Definition at line 946 of file PersistenceUpdater.cpp.

References Update::ActorStrt< QosSeq, QosSeq, ACE_CString, BinSeq, ContentSubscriptionBin >::pubsubQos, and storeUpdate().

00947 {
00948   IdType_ExtId ext(id.id);
00949   PersistenceUpdater::RWActor* actor_data = 0;
00950 
00951   if (this->actor_index_->find(ext, actor_data, this->allocator_) ==  0) {
00952     TAO_OutputCDR outCdr;
00953     outCdr << qos;
00954     ACE_Message_Block dst;
00955     ACE_CDR::consolidate(&dst, outCdr.begin());
00956 
00957     this->storeUpdate(dst, actor_data->pubsubQos.second);
00958 
00959   } else {
00960     OpenDDS::DCPS::RepoIdConverter converter(id.id);
00961     ACE_ERROR((LM_ERROR,
00962                ACE_TEXT("(%P|%t) PersistenceUpdater::update(subscriberQos): ")
00963                ACE_TEXT("subscription %C not found\n"),
00964                std::string(converter).c_str()));
00965   }
00966 }

void Update::PersistenceUpdater::update ( const IdPath id,
const DDS::DataReaderQos qos 
) [virtual]

Implements Update::Updater.

Definition at line 923 of file PersistenceUpdater.cpp.

References Update::ActorStrt< QosSeq, QosSeq, ACE_CString, BinSeq, ContentSubscriptionBin >::drdwQos, and storeUpdate().

00924 {
00925   IdType_ExtId ext(id.id);
00926   PersistenceUpdater::RWActor* actor_data = 0;
00927 
00928   if (this->actor_index_->find(ext, actor_data, this->allocator_) ==  0) {
00929     TAO_OutputCDR outCdr;
00930     outCdr << qos;
00931     ACE_Message_Block dst;
00932     ACE_CDR::consolidate(&dst, outCdr.begin());
00933 
00934     this->storeUpdate(dst, actor_data->drdwQos.second);
00935 
00936   } else {
00937     OpenDDS::DCPS::RepoIdConverter converter(id.id);
00938     ACE_ERROR((LM_ERROR,
00939                ACE_TEXT("(%P|%t) PersistenceUpdater::update(readerQos): ")
00940                ACE_TEXT("subscription %C not found\n"),
00941                std::string(converter).c_str()));
00942   }
00943 }

void Update::PersistenceUpdater::update ( const IdPath id,
const DDS::PublisherQos qos 
) [virtual]

Implements Update::Updater.

Definition at line 900 of file PersistenceUpdater.cpp.

References Update::ActorStrt< QosSeq, QosSeq, ACE_CString, BinSeq, ContentSubscriptionBin >::pubsubQos, and storeUpdate().

00901 {
00902   IdType_ExtId ext(id.id);
00903   PersistenceUpdater::RWActor* actor_data = 0;
00904 
00905   if (this->actor_index_->find(ext, actor_data, this->allocator_) ==  0) {
00906     TAO_OutputCDR outCdr;
00907     outCdr << qos;
00908     ACE_Message_Block dst;
00909     ACE_CDR::consolidate(&dst, outCdr.begin());
00910 
00911     this->storeUpdate(dst, actor_data->pubsubQos.second);
00912 
00913   } else {
00914     OpenDDS::DCPS::RepoIdConverter converter(id.id);
00915     ACE_ERROR((LM_ERROR,
00916                ACE_TEXT("(%P|%t) PersistenceUpdater::update(publisherQos): ")
00917                ACE_TEXT("publication %C not found\n"),
00918                std::string(converter).c_str()));
00919   }
00920 }

void Update::PersistenceUpdater::update ( const IdPath id,
const DDS::DataWriterQos qos 
) [virtual]

Implements Update::Updater.

Definition at line 877 of file PersistenceUpdater.cpp.

References Update::ActorStrt< QosSeq, QosSeq, ACE_CString, BinSeq, ContentSubscriptionBin >::drdwQos, and storeUpdate().

00878 {
00879   IdType_ExtId ext(id.id);
00880   PersistenceUpdater::RWActor* actor_data = 0;
00881 
00882   if (this->actor_index_->find(ext, actor_data, this->allocator_) ==  0) {
00883     TAO_OutputCDR outCdr;
00884     outCdr << qos;
00885     ACE_Message_Block dst;
00886     ACE_CDR::consolidate(&dst, outCdr.begin());
00887 
00888     this->storeUpdate(dst, actor_data->drdwQos.second);
00889 
00890   } else {
00891     OpenDDS::DCPS::RepoIdConverter converter(id.id);
00892     ACE_ERROR((LM_ERROR,
00893                ACE_TEXT("(%P|%t) PersistenceUpdater::update(writerQos): ")
00894                ACE_TEXT("publication %C not found\n"),
00895                std::string(converter).c_str()));
00896   }
00897 }

void Update::PersistenceUpdater::update ( const IdPath id,
const DDS::TopicQos qos 
) [virtual]

Implements Update::Updater.

Definition at line 854 of file PersistenceUpdater.cpp.

References storeUpdate(), and Update::TopicStrt< QosSeq, ACE_CString >::topicQos.

00855 {
00856   IdType_ExtId ext(id.id);
00857   PersistenceUpdater::Topic* topic_data = 0;
00858 
00859   if (this->topic_index_->find(ext, topic_data, this->allocator_) ==  0) {
00860     TAO_OutputCDR outCdr;
00861     outCdr << qos;
00862     ACE_Message_Block dst;
00863     ACE_CDR::consolidate(&dst, outCdr.begin());
00864 
00865     this->storeUpdate(dst, topic_data->topicQos.second);
00866 
00867   } else {
00868     OpenDDS::DCPS::RepoIdConverter converter(id.id);
00869     ACE_ERROR((LM_ERROR,
00870                ACE_TEXT("(%P|%t) PersistenceUpdater::update: ")
00871                ACE_TEXT("topic %C not found\n"),
00872                std::string(converter).c_str()));
00873   }
00874 }

void Update::PersistenceUpdater::update ( const IdPath id,
const DDS::DomainParticipantQos qos 
) [virtual]

Persist updated Qos parameters for an entity.

Implements Update::Updater.

Definition at line 831 of file PersistenceUpdater.cpp.

References Update::ParticipantStrt< QosSeq >::participantQos, and storeUpdate().

00832 {
00833   IdType_ExtId ext(id.id);
00834   PersistenceUpdater::Participant* part_data = 0;
00835 
00836   if (this->participant_index_->find(ext, part_data, this->allocator_) ==  0) {
00837     TAO_OutputCDR outCdr;
00838     outCdr << qos;
00839     ACE_Message_Block dst;
00840     ACE_CDR::consolidate(&dst, outCdr.begin());
00841 
00842     this->storeUpdate(dst, part_data->participantQos.second);
00843 
00844   } else {
00845     OpenDDS::DCPS::RepoIdConverter converter(id.id);
00846     ACE_ERROR((LM_ERROR,
00847                ACE_TEXT("(%P|%t) PersistenceUpdater::update: ")
00848                ACE_TEXT("participant %C not found\n"),
00849                std::string(converter).c_str()));
00850   }
00851 }


Member Data Documentation

ActorIndex* Update::PersistenceUpdater::actor_index_ [private]

Definition at line 112 of file PersistenceUpdater.h.

Referenced by create(), destroy(), init(), requestImage(), and update().

ALLOCATOR* Update::PersistenceUpdater::allocator_ [private]

Definition at line 108 of file PersistenceUpdater.h.

Referenced by create(), destroy(), init(), and update().

ParticipantIndex* Update::PersistenceUpdater::participant_index_ [private]

Definition at line 111 of file PersistenceUpdater.h.

Referenced by create(), destroy(), init(), and requestImage().

ACE_TString Update::PersistenceUpdater::persistence_file_ [private]

Definition at line 103 of file PersistenceUpdater.h.

Referenced by init(), and parse().

bool Update::PersistenceUpdater::reset_ [private]

Definition at line 104 of file PersistenceUpdater.h.

Referenced by init(), and parse().

TopicIndex* Update::PersistenceUpdater::topic_index_ [private]

Definition at line 110 of file PersistenceUpdater.h.

Referenced by create(), destroy(), init(), and requestImage().

Manager* Update::PersistenceUpdater::um_ [private]

Definition at line 106 of file PersistenceUpdater.h.

Referenced by init(), and requestImage().


The documentation for this class was generated from the following files:
Generated on Fri Feb 12 20:06:52 2016 for OpenDDS by  doxygen 1.4.7