#include <PersistenceUpdater.h>
Classes | |
class | IdType_ExtId |
Public Types | |
typedef ACE_Allocator_Adapter < ACE_Malloc < ACE_MMAP_MEMORY_POOL, TAO_SYNCH_MUTEX > > | ALLOCATOR |
typedef struct TopicStrt < QosSeq, ACE_CString > | Topic |
Persisted topic data structure. | |
typedef struct ParticipantStrt < QosSeq > | Participant |
Persisted participant data structure. | |
typedef struct ActorStrt < QosSeq, QosSeq, ACE_CString, BinSeq, ContentSubscriptionBin > | RWActor |
Persisted actor data structure. | |
typedef ACE_Hash_Map_With_Allocator < IdType_ExtId, Topic * > | TopicIndex |
typedef ACE_Hash_Map_With_Allocator < IdType_ExtId, Participant * > | ParticipantIndex |
typedef ACE_Hash_Map_With_Allocator < IdType_ExtId, RWActor * > | ActorIndex |
Public Member Functions | |
PersistenceUpdater () | |
virtual | ~PersistenceUpdater () |
virtual int | init (int argc, ACE_TCHAR *argv[]) |
Service object initialization. | |
virtual int | fini () |
ACE_Task_Base finish method. | |
virtual int | svc () |
ACE_Task_Base start method. | |
virtual void | requestImage () |
virtual void | create (const UTopic &topic) |
Add topic to be persisted. | |
virtual void | create (const UParticipant &participant) |
Add participant to be persisted. | |
virtual void | create (const URActor &actor) |
Add DataReader to be persisted. | |
virtual void | create (const UWActor &actor) |
Add DataWriter to be persisted. | |
virtual void | create (const OwnershipData &data) |
Add ownership data to be persisted. | |
virtual void | update (const IdPath &id, const DDS::DomainParticipantQos &qos) |
Persist updated Qos parameters for a Participant. | |
virtual void | update (const IdPath &id, const DDS::TopicQos &qos) |
Persist updated Qos parameters for a Topic. | |
virtual void | update (const IdPath &id, const DDS::DataWriterQos &qos) |
Persist updated Qos parameters for a DataWriter. | |
virtual void | update (const IdPath &id, const DDS::PublisherQos &qos) |
Persist updated Qos parameters for a Publisher. | |
virtual void | update (const IdPath &id, const DDS::DataReaderQos &qos) |
Persist updated Qos parameters for a DataReader. | |
virtual void | update (const IdPath &id, const DDS::SubscriberQos &qos) |
Persist updated Qos parameters for a Subscriber. | |
virtual void | update (const IdPath &id, const DDS::StringSeq &exprParams) |
Persist updated subscription exprParams. | |
virtual void | destroy (const IdPath &id, ItemType type, ActorType actor) |
Remove an entity (but not children) from persistence. | |
virtual void | updateLastPartId (PartIdType partId) |
Update Last Participant Id for repo. | |
Private Member Functions | |
int | parse (int argc, ACE_TCHAR *argv[]) |
void | storeUpdate (const ACE_Message_Block &data, BinSeq &storage) |
Private Attributes | |
ACE_TString | persistence_file_ |
bool | reset_ |
Manager * | um_ |
OpenDDS::DCPS::unique_ptr < ALLOCATOR > | allocator_ |
TopicIndex * | topic_index_ |
Persisted Topics. | |
ParticipantIndex * | participant_index_ |
Persisted Participants. | |
ActorIndex * | actor_index_ |
Persisted Readers and Writers. | |
PartIdType * | last_part_id_ |
What the last participant id is/was. |
Definition at line 33 of file PersistenceUpdater.h.
Definition at line 69 of file PersistenceUpdater.h.
typedef ACE_Allocator_Adapter<ACE_Malloc <ACE_MMAP_MEMORY_POOL , TAO_SYNCH_MUTEX> > Update::PersistenceUpdater::ALLOCATOR |
Definition at line 55 of file PersistenceUpdater.h.
typedef struct ParticipantStrt< QosSeq > Update::PersistenceUpdater::Participant [read] |
Persisted participant data structure.
Definition at line 61 of file PersistenceUpdater.h.
typedef ACE_Hash_Map_With_Allocator<IdType_ExtId, Participant*> Update::PersistenceUpdater::ParticipantIndex |
Definition at line 68 of file PersistenceUpdater.h.
typedef struct ActorStrt< QosSeq, QosSeq, ACE_CString,BinSeq, ContentSubscriptionBin > Update::PersistenceUpdater::RWActor [read] |
Persisted actor data structure.
Definition at line 64 of file PersistenceUpdater.h.
typedef struct TopicStrt< QosSeq, ACE_CString > Update::PersistenceUpdater::Topic [read] |
Persisted topic data structure.
Definition at line 58 of file PersistenceUpdater.h.
Definition at line 67 of file PersistenceUpdater.h.
Update::PersistenceUpdater::PersistenceUpdater | ( | ) |
Definition at line 214 of file PersistenceUpdater.cpp.
References ACE_TEXT().
00215 : persistence_file_(ACE_TEXT("InforepoPersist")) 00216 , reset_(false) 00217 , um_(0) 00218 , topic_index_(0) 00219 , participant_index_(0) 00220 , actor_index_(0) 00221 , last_part_id_(0) 00222 {}
Update::PersistenceUpdater::~PersistenceUpdater | ( | ) | [virtual] |
Definition at line 224 of file PersistenceUpdater.cpp.
void Update::PersistenceUpdater::create | ( | const OwnershipData & | data | ) | [virtual] |
Add ownership data to be persisted.
Implements Update::Updater.
Definition at line 828 of file PersistenceUpdater.cpp.
virtual void Update::PersistenceUpdater::create | ( | const UWActor & | actor | ) | [virtual] |
Add DataWriter to be persisted.
Implements Update::Updater.
void Update::PersistenceUpdater::create | ( | const URActor & | actor | ) | [virtual] |
Add DataReader to be persisted.
Implements Update::Updater.
Definition at line 640 of file PersistenceUpdater.cpp.
References actor_index_, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::actorId, allocator_, ACE_Message_Block::base(), ACE_OutputCDR::begin(), ACE_Hash_Map_With_Allocator< class, class >::bind(), Update::ActorStrt< PSQ, RWQ, C, T, CSP >::callback, ACE_CDR::consolidate(), Update::ActorStrt< PSQ, RWQ, C, T, CSP >::contentSubscriptionProfile, Update::DataReader, Update::DataReaderQos, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::domainId, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::drdwQos, OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), len, ACE_Message_Block::length(), LM_ERROR, ACE_OS::memcpy(), Update::ActorStrt< PSQ, RWQ, C, T, CSP >::participantId, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::pubsubQos, ACE_OutputCDR::reset(), Update::SubscriberQos, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::topicId, and Update::ActorStrt< PSQ, RWQ, C, T, CSP >::transportInterfaceInfo.
00641 { 00642 TAO_OutputCDR outCdr; 00643 outCdr << actor.pubsubQos; 00644 ACE_Message_Block dst; 00645 ACE_CDR::consolidate(&dst, outCdr.begin()); 00646 00647 size_t len = dst.length(); 00648 char *buf = new (std::nothrow) char[len]; 00649 00650 if (buf == 0) { 00651 ACE_ERROR((LM_ERROR, "(%P|%t) PersistenceUpdater::create( subscription): allocation failed.\n")); 00652 return; 00653 } 00654 00655 ArrDelAdapter<char> guard(buf); 00656 00657 ACE_OS::memcpy(buf, dst.base(), len); 00658 00659 BinSeq pubsub_qos_bin(len, buf); 00660 QosSeq pubsub_qos(SubscriberQos, pubsub_qos_bin); 00661 00662 outCdr.reset(); 00663 outCdr << actor.drdwQos; 00664 ACE_Message_Block dst2; 00665 ACE_CDR::consolidate(&dst2, outCdr.begin()); 00666 00667 len = dst2.length(); 00668 char *buf2 = new (std::nothrow) char[len]; 00669 00670 if (buf2 == 0) { 00671 ACE_ERROR((LM_ERROR, "(%P|%t) PersistenceUpdater::create( subscription): allocation failed.\n")); 00672 return; 00673 } 00674 00675 ArrDelAdapter<char> guard2(buf2); 00676 00677 ACE_OS::memcpy(buf2, dst2.base(), len); 00678 00679 BinSeq dwdr_qos_bin(len, buf2); 00680 QosSeq dwdr_qos(DataReaderQos, dwdr_qos_bin); 00681 00682 outCdr.reset(); 00683 outCdr << actor.transportInterfaceInfo; 00684 ACE_Message_Block dst3; 00685 ACE_CDR::consolidate(&dst3, outCdr.begin()); 00686 00687 len = dst3.length(); 00688 char *buf3 = new (std::nothrow) char[len]; 00689 00690 if (buf3 == 0) { 00691 ACE_ERROR((LM_ERROR, "(%P|%t) PersistenceUpdater::create( subscription) allocation failed.\n")); 00692 return; 00693 } 00694 00695 ArrDelAdapter<char> guard3(buf3); 00696 00697 ACE_OS::memcpy(buf3, dst3.base(), len); 00698 BinSeq tr_bin(len, buf3); 00699 00700 outCdr.reset(); 00701 outCdr << actor.contentSubscriptionProfile.exprParams; 00702 ACE_Message_Block dst4; 00703 ACE_CDR::consolidate(&dst4, outCdr.begin()); 00704 len = dst4.length(); 00705 char* buf4 = new (std::nothrow) char[len]; 00706 if (buf4 == 0) { 00707 ACE_ERROR((LM_ERROR, "(%P|%t) PersistenceUpdater::create( subscription) allocation failed.\n")); 00708 return; 00709 } 00710 ArrDelAdapter<char> guard4(buf4); 00711 00712 ACE_OS::memcpy(buf4, dst4.base(), len); 00713 ContentSubscriptionBin csp_bin; 00714 csp_bin.filterClassName = actor.contentSubscriptionProfile.filterClassName; 00715 csp_bin.filterExpr = actor.contentSubscriptionProfile.filterExpr; 00716 csp_bin.exprParams = std::make_pair(len, buf4); 00717 00718 DActor actor_data(actor.domainId, actor.actorId, actor.topicId 00719 , actor.participantId 00720 , DataReader, actor.callback.c_str(), pubsub_qos 00721 , dwdr_qos, tr_bin, csp_bin); 00722 00723 // allocate memory for ActorData 00724 void* buffer; 00725 ACE_ALLOCATOR(buffer, allocator_->malloc 00726 (sizeof(PersistenceUpdater::RWActor))); 00727 00728 // Initialize ActorData 00729 PersistenceUpdater::RWActor* persistent_data = 00730 new(buffer) PersistenceUpdater::RWActor(actor_data 00731 , allocator_.get()); 00732 00733 IdType_ExtId ext(actor.actorId); 00734 00735 // bind ActorData with the actorId 00736 if (actor_index_->bind(ext, persistent_data, allocator_.get()) != 0) { 00737 allocator_->free((void *) buffer); 00738 return; 00739 } 00740 }
void Update::PersistenceUpdater::create | ( | const UParticipant & | participant | ) | [virtual] |
Add participant to be persisted.
Implements Update::Updater.
Definition at line 593 of file PersistenceUpdater.cpp.
References ACE_TEXT(), allocator_, ACE_Message_Block::base(), ACE_OutputCDR::begin(), ACE_Hash_Map_With_Allocator< class, class >::bind(), ACE_CDR::consolidate(), Update::ParticipantStrt< Q >::domainId, OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), len, ACE_Message_Block::length(), LM_ERROR, ACE_OS::memcpy(), Update::ParticipantStrt< Q >::owner, participant_index_, Update::ParticipantStrt< QosSeq >::participantId, Update::ParticipantStrt< Q >::participantId, Update::ParticipantQos, and Update::ParticipantStrt< Q >::participantQos.
00594 { 00595 // serialize the Topic QOS 00596 TAO_OutputCDR outCdr; 00597 outCdr << participant.participantQos; 00598 ACE_Message_Block dst; 00599 ACE_CDR::consolidate(&dst, outCdr.begin()); 00600 00601 size_t len = dst.length(); 00602 char *buf; 00603 ACE_NEW_NORETURN(buf, char[len]); 00604 ArrDelAdapter<char> guard(buf); 00605 00606 if (buf == 0) { 00607 ACE_ERROR((LM_ERROR, 00608 ACE_TEXT("(%P|%t) PersistenceUpdater::create( UParticipant): allocation failed.\n"))); 00609 return; 00610 } 00611 00612 ACE_OS::memcpy(buf, dst.base(), len); 00613 00614 BinSeq qos_bin(len, buf); 00615 00616 QosSeq p(ParticipantQos, qos_bin); 00617 DParticipant participant_data 00618 (participant.domainId, participant.owner, participant.participantId, p); 00619 00620 // allocate memory for ParticipantData 00621 void* buffer; 00622 ACE_ALLOCATOR(buffer, allocator_->malloc 00623 (sizeof(PersistenceUpdater::Participant))); 00624 00625 // Initialize ParticipantData 00626 PersistenceUpdater::Participant* persistent_data 00627 = new(buffer) PersistenceUpdater::Participant(participant_data 00628 , allocator_.get()); 00629 00630 IdType_ExtId ext(participant_data.participantId); 00631 00632 // bind ParticipantData with the participantId 00633 if (participant_index_->bind(ext, persistent_data, allocator_.get()) != 0) { 00634 allocator_->free((void *) buffer); 00635 return; 00636 } 00637 }
void Update::PersistenceUpdater::create | ( | const UTopic & | topic | ) | [virtual] |
Add topic to be persisted.
Implements Update::Updater.
Definition at line 547 of file PersistenceUpdater.cpp.
References ACE_TEXT(), allocator_, ACE_Message_Block::base(), ACE_OutputCDR::begin(), ACE_Hash_Map_With_Allocator< class, class >::bind(), ACE_CDR::consolidate(), Update::TopicStrt< Q, S >::dataType, Update::TopicStrt< Q, S >::domainId, OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), len, ACE_Message_Block::length(), LM_ERROR, ACE_OS::memcpy(), Update::TopicStrt< Q, S >::name, Update::TopicStrt< Q, S >::participantId, topic_index_, Update::TopicStrt< Q, S >::topicId, Update::TopicQos, and Update::TopicStrt< Q, S >::topicQos.
00548 { 00549 // serialize the Topic QOS 00550 TAO_OutputCDR outCdr; 00551 outCdr << topic.topicQos; 00552 ACE_Message_Block dst; 00553 ACE_CDR::consolidate(&dst, outCdr.begin()); 00554 00555 size_t len = dst.length(); 00556 char *buf; 00557 ACE_NEW_NORETURN(buf, char[len]); 00558 ArrDelAdapter<char> guard(buf); 00559 00560 if (buf == 0) { 00561 ACE_ERROR((LM_ERROR, 00562 ACE_TEXT("(%P|%t) PersistenceUpdater::create( UTopic): allocation failed.\n"))); 00563 return; 00564 } 00565 00566 ACE_OS::memcpy(buf, dst.base(), len); 00567 00568 BinSeq qos_bin(len, buf); 00569 00570 QosSeq p(TopicQos, qos_bin); 00571 DTopic topic_data(topic.domainId, topic.topicId, topic.participantId 00572 , topic.name.c_str(), topic.dataType.c_str(), p); 00573 00574 // allocate memory for TopicData 00575 void* buffer; 00576 ACE_ALLOCATOR(buffer, allocator_->malloc 00577 (sizeof(PersistenceUpdater::Topic))); 00578 00579 // Initialize TopicData 00580 PersistenceUpdater::Topic* persistent_data 00581 = new(buffer) PersistenceUpdater::Topic(topic_data, allocator_.get()); 00582 00583 IdType_ExtId ext(topic_data.topicId); 00584 00585 // bind TopicData with the topicId 00586 if (topic_index_->bind(ext, persistent_data, allocator_.get()) != 0) { 00587 allocator_->free((void *) buffer); 00588 return; 00589 } 00590 }
void Update::PersistenceUpdater::destroy | ( | const IdPath & | id, | |
ItemType | type, | |||
ActorType | actor | |||
) | [virtual] |
Remove an entity (but not children) from persistence.
Implements Update::Updater.
Definition at line 995 of file PersistenceUpdater.cpp.
References ACE_TEXT(), Update::Actor, actor_index_, allocator_, Update::ActorStrt< QosSeq, QosSeq, ACE_CString, BinSeq, ContentSubscriptionBin >::cleanup(), Update::ParticipantStrt< QosSeq >::cleanup(), Update::TopicStrt< QosSeq, ACE_CString >::cleanup(), OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), LM_ERROR, Update::Participant, participant_index_, Update::Topic, topic_index_, and ACE_Hash_Map_With_Allocator< class, class >::unbind().
00996 { 00997 IdType_ExtId ext(id.id); 00998 PersistenceUpdater::Topic* topic = 0; 00999 PersistenceUpdater::Participant* participant = 0; 01000 PersistenceUpdater::RWActor* actor = 0; 01001 01002 switch (type) { 01003 case Update::Topic: 01004 01005 if (topic_index_->unbind(ext, topic, allocator_.get()) == 0) { 01006 topic->cleanup(allocator_.get()); 01007 allocator_->free((void *) topic); 01008 } 01009 01010 break; 01011 case Update::Participant: 01012 01013 if (participant_index_->unbind(ext, participant, allocator_.get()) == 0) { 01014 participant->cleanup(allocator_.get()); 01015 allocator_->free((void *) participant); 01016 } 01017 01018 break; 01019 case Update::Actor: 01020 01021 if (actor_index_->unbind(ext, actor, allocator_.get()) == 0) { 01022 actor->cleanup(allocator_.get()); 01023 allocator_->free((void *) actor); 01024 } 01025 01026 break; 01027 default: { 01028 OpenDDS::DCPS::RepoIdConverter converter(id.id); 01029 ACE_ERROR((LM_ERROR, 01030 ACE_TEXT("(%P|%t) PersistenceUpdater::destroy: ") 01031 ACE_TEXT("unknown entity - %C.\n"), 01032 std::string(converter).c_str())); 01033 } 01034 } 01035 }
int Update::PersistenceUpdater::fini | ( | void | ) | [virtual] |
ACE_Task_Base finish method.
Reimplemented from ACE_Shared_Object.
Definition at line 387 of file PersistenceUpdater.cpp.
int Update::PersistenceUpdater::init | ( | int | argc, | |
ACE_TCHAR * | argv[] | |||
) | [virtual] |
Service object initialization.
Reimplemented from ACE_Shared_Object.
Definition at line 274 of file PersistenceUpdater.cpp.
References ACE_NEW_RETURN(), ACE_TEXT(), actor_index_, Update::Manager::add(), allocator_, ACE_String_Base< ACE_CHAR_T >::c_str(), Update::createIndex(), OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), Update::index_cleanup(), last_part_id_, LM_ERROR, parse(), participant_index_, persistence_file_, OpenDDS::DCPS::unique_ptr< T, Deleter >::reset(), reset_, topic_index_, and um_.
00275 { 00276 // discover the UpdateManager 00277 um_ = ACE_Dynamic_Service<Update::Manager>::instance("UpdateManagerSvc"); 00278 00279 if (um_ == 0) { 00280 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: PersistenceUpdater::init ") 00281 ACE_TEXT("No UpdateManager discovered.\n"))); 00282 return -1; 00283 } 00284 00285 this->parse(argc, argv); 00286 00287 #if defined ACE_HAS_MAC_OSX && defined __x86_64__ && __x86_64__ 00288 ACE_MMAP_Memory_Pool::OPTIONS options((void*)0x200000000); 00289 #else 00290 ACE_MMAP_Memory_Pool::OPTIONS options(ACE_DEFAULT_BASE_ADDR); 00291 #endif 00292 00293 // Create the allocator with the appropriate options. The name used 00294 // for the lock is the same as one used for the file. 00295 ALLOCATOR* allocator; 00296 ACE_NEW_RETURN(allocator, 00297 ALLOCATOR(persistence_file_.c_str(), 00298 persistence_file_.c_str(), 00299 &options), 00300 -1); 00301 allocator_.reset(allocator); 00302 00303 bool exists = false; 00304 00305 char* topic_index = (char*)createIndex("TopicIndex", *allocator_ 00306 , sizeof(TopicIndex), exists); 00307 if (!topic_index) { 00308 return -1; 00309 } 00310 00311 char* participant_index = (char*)createIndex("ParticipantIndex", *allocator_ 00312 , sizeof(ParticipantIndex), exists); 00313 if (!participant_index) { 00314 return -1; 00315 } 00316 00317 char* actor_index = (char*)createIndex("ActorIndex", *allocator_ 00318 , sizeof(ActorIndex), exists); 00319 if (!actor_index) { 00320 return -1; 00321 } 00322 00323 void* last_part_id = createIndex( 00324 "LastParticipantId", *allocator_, sizeof(PartIdType), exists); 00325 if (!last_part_id) { 00326 return -1; 00327 } 00328 last_part_id_ = reinterpret_cast<PartIdType*>(last_part_id); 00329 00330 if (exists) { 00331 topic_index_ = reinterpret_cast<TopicIndex*>(topic_index); 00332 participant_index_ = reinterpret_cast<ParticipantIndex*>(participant_index); 00333 actor_index_ = reinterpret_cast<ActorIndex*>(actor_index); 00334 } else { 00335 topic_index_ = new(topic_index) TopicIndex(allocator_.get()); 00336 participant_index_ = new(participant_index) ParticipantIndex(allocator_.get()); 00337 actor_index_ = new(actor_index) ActorIndex(allocator_.get()); 00338 *last_part_id_ = 0; 00339 } 00340 00341 if (reset_) { 00342 index_cleanup(topic_index_, allocator_.get()); 00343 index_cleanup(participant_index_, allocator_.get()); 00344 index_cleanup(actor_index_, allocator_.get()); 00345 *last_part_id_ = 0; 00346 } 00347 00348 // lastly register the callback 00349 um_->add(this); 00350 00351 return 0; 00352 }
int Update::PersistenceUpdater::parse | ( | int | argc, | |
ACE_TCHAR * | argv[] | |||
) | [private] |
Definition at line 355 of file PersistenceUpdater.cpp.
References ACE_TEXT(), ACE_OS::atoi(), LM_DEBUG, persistence_file_, reset_, and ACE_OS::strcasecmp().
Referenced by init().
00356 { 00357 for (ssize_t count = 0; count < argc; count++) { 00358 if (ACE_OS::strcasecmp(argv[count], ACE_TEXT("-file")) == 0) { 00359 if ((count + 1) < argc) { 00360 persistence_file_ = argv[count+1]; 00361 count++; 00362 } 00363 00364 } else if (ACE_OS::strcasecmp(argv[count], ACE_TEXT("-reset")) == 0) { 00365 if ((count + 1) < argc) { 00366 int val = ACE_OS::atoi(argv[count+1]); 00367 reset_ = true; 00368 00369 if (val == 0) { 00370 reset_ = false; 00371 } 00372 00373 count++; 00374 } 00375 00376 } else { 00377 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) PersistenceUpdater::parse: Unknown option %s\n") 00378 , argv[count])); 00379 return -1; 00380 } 00381 } 00382 00383 return 0; 00384 }
void Update::PersistenceUpdater::requestImage | ( | ) | [virtual] |
Request an image refresh to be sent upstream. This is currently done synchronously. TBD: Move to an asynchronous model
Implements Update::Updater.
Definition at line 399 of file PersistenceUpdater.cpp.
References ACE_TEXT(), actor_index_, Update::ActorStrt< QosSeq, QosSeq, ACE_CString, BinSeq, ContentSubscriptionBin >::actorId, Update::ImageData< T, P, A, W >::actors, ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::begin(), ACE_String_Base< ACE_CHAR_T >::c_str(), Update::ActorStrt< QosSeq, QosSeq, ACE_CString, BinSeq, ContentSubscriptionBin >::callback, Update::ActorStrt< QosSeq, QosSeq, ACE_CString, BinSeq, ContentSubscriptionBin >::contentSubscriptionProfile, Update::DataReader, Update::TopicStrt< QosSeq, ACE_CString >::dataType, OpenDDS::DCPS::DCPS_debug_level, Update::ActorStrt< QosSeq, QosSeq, ACE_CString, BinSeq, ContentSubscriptionBin >::domainId, Update::TopicStrt< QosSeq, ACE_CString >::domainId, Update::ParticipantStrt< QosSeq >::domainId, Update::ActorStrt< QosSeq, QosSeq, ACE_CString, BinSeq, ContentSubscriptionBin >::drdwQos, ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::end(), Update::ContentSubscriptionBin::exprParams, Update::ContentSubscriptionBin::filterClassName, Update::ContentSubscriptionBin::filterExpr, last_part_id_, Update::ImageData< T, P, A, W >::lastPartId, LM_DEBUG, LM_ERROR, ACE_OS::memcpy(), Update::TopicStrt< QosSeq, ACE_CString >::name, OPENDDS_STRING, Update::ParticipantStrt< QosSeq >::owner, participant_index_, Update::ActorStrt< QosSeq, QosSeq, ACE_CString, BinSeq, ContentSubscriptionBin >::participantId, Update::TopicStrt< QosSeq, ACE_CString >::participantId, Update::ParticipantStrt< QosSeq >::participantId, Update::ParticipantQos, Update::ParticipantStrt< QosSeq >::participantQos, Update::ImageData< T, P, A, W >::participants, Update::ActorStrt< QosSeq, QosSeq, ACE_CString, BinSeq, ContentSubscriptionBin >::pubsubQos, Update::Manager::pushImage(), topic_index_, Update::ActorStrt< QosSeq, QosSeq, ACE_CString, BinSeq, ContentSubscriptionBin >::topicId, Update::TopicStrt< QosSeq, ACE_CString >::topicId, Update::TopicQos, Update::TopicStrt< QosSeq, ACE_CString >::topicQos, Update::ImageData< T, P, A, W >::topics, Update::ActorStrt< QosSeq, QosSeq, ACE_CString, BinSeq, ContentSubscriptionBin >::transportInterfaceInfo, Update::ActorStrt< QosSeq, QosSeq, ACE_CString, BinSeq, ContentSubscriptionBin >::type, and um_.
00400 { 00401 if (um_ == NULL) { 00402 return; 00403 } 00404 00405 DImage image; 00406 00407 // Allocate space to hold the QOS sequences. 00408 std::vector<ArrDelAdapter<char> > qos_sequences; 00409 00410 for (ParticipantIndex::ITERATOR iter = participant_index_->begin(); 00411 iter != participant_index_->end(); iter++) { 00412 const PersistenceUpdater::Participant* participant 00413 = (*iter).int_id_; 00414 00415 size_t qos_len = participant->participantQos.second.first; 00416 char *buf; 00417 ACE_NEW_NORETURN(buf, char[qos_len]); 00418 qos_sequences.push_back(ArrDelAdapter<char>(buf)); 00419 00420 if (buf == 0) { 00421 ACE_ERROR((LM_ERROR, 00422 ACE_TEXT("(%P|%t) PersistenceUpdater::requestImage(): allocation failed.\n"))); 00423 return; 00424 } 00425 00426 ACE_OS::memcpy(buf, participant->participantQos.second.second, qos_len); 00427 00428 BinSeq in_seq(qos_len, buf); 00429 QosSeq qos(ParticipantQos, in_seq); 00430 DParticipant dparticipant(participant->domainId 00431 , participant->owner 00432 , participant->participantId 00433 , qos); 00434 image.participants.push_back(dparticipant); 00435 if (OpenDDS::DCPS::DCPS_debug_level >= 2) { 00436 OpenDDS::DCPS::RepoIdConverter conv(participant->participantId); 00437 ACE_DEBUG((LM_DEBUG, 00438 "(%P|%t) PersistenceUpdater::requestImage(): loaded participant %C\n", 00439 OPENDDS_STRING(conv).c_str())); 00440 } 00441 } 00442 00443 for (TopicIndex::ITERATOR iter = topic_index_->begin(); 00444 iter != topic_index_->end(); iter++) { 00445 const PersistenceUpdater::Topic* topic = (*iter).int_id_; 00446 00447 size_t qos_len = topic->topicQos.second.first; 00448 char *buf; 00449 ACE_NEW_NORETURN(buf, char[qos_len]); 00450 qos_sequences.push_back(ArrDelAdapter<char>(buf)); 00451 00452 if (buf == 0) { 00453 ACE_ERROR((LM_ERROR, 00454 ACE_TEXT("(%P|%t) PersistenceUpdater::requestImage(): allocation failed.\n"))); 00455 return; 00456 } 00457 00458 ACE_OS::memcpy(buf, topic->topicQos.second.second, qos_len); 00459 00460 BinSeq in_seq(qos_len, buf); 00461 QosSeq qos(TopicQos, in_seq); 00462 DTopic dTopic(topic->domainId, topic->topicId 00463 , topic->participantId, topic->name.c_str() 00464 , topic->dataType.c_str(), qos); 00465 image.topics.push_back(dTopic); 00466 } 00467 00468 for (ActorIndex::ITERATOR iter = actor_index_->begin(); 00469 iter != actor_index_->end(); iter++) { 00470 const PersistenceUpdater::RWActor* actor = (*iter).int_id_; 00471 00472 size_t qos_len = actor->pubsubQos.second.first; 00473 char *buf; 00474 ACE_NEW_NORETURN(buf, char[qos_len]); 00475 qos_sequences.push_back(ArrDelAdapter<char>(buf)); 00476 00477 if (buf == 0) { 00478 ACE_ERROR((LM_ERROR, 00479 ACE_TEXT("(%P|%t) PersistenceUpdater::requestImage(): allocation failed.\n"))); 00480 return; 00481 } 00482 00483 ACE_OS::memcpy(buf, actor->pubsubQos.second.second, qos_len); 00484 00485 BinSeq in_pubsub_seq(qos_len, buf); 00486 QosSeq pubsub_qos(actor->pubsubQos.first, in_pubsub_seq); 00487 00488 qos_len = actor->drdwQos.second.first; 00489 ACE_NEW_NORETURN(buf, char[qos_len]); 00490 qos_sequences.push_back(ArrDelAdapter<char>(buf)); 00491 00492 if (buf == 0) { 00493 ACE_ERROR((LM_ERROR, 00494 ACE_TEXT("(%P|%t) PersistenceUpdater::requestImage(): allocation failed.\n"))); 00495 return; 00496 } 00497 00498 ACE_OS::memcpy(buf, actor->drdwQos.second.second, qos_len); 00499 00500 BinSeq in_drdw_seq(qos_len, buf); 00501 QosSeq drdw_qos(actor->drdwQos.first, in_drdw_seq); 00502 00503 size_t transport_len = actor->transportInterfaceInfo.first; 00504 ACE_NEW_NORETURN(buf, char[transport_len]); 00505 qos_sequences.push_back(ArrDelAdapter<char>(buf)); 00506 00507 if (buf == 0) { 00508 ACE_ERROR((LM_ERROR, 00509 ACE_TEXT("(%P|%t) PersistenceUpdater::requestImage(): allocation failed.\n"))); 00510 return; 00511 } 00512 00513 ACE_OS::memcpy(buf, actor->transportInterfaceInfo.second, transport_len); 00514 00515 BinSeq in_transport_seq(transport_len, buf); 00516 00517 ContentSubscriptionBin in_csp_bin; 00518 if (actor->type == DataReader) { 00519 in_csp_bin.filterClassName = actor->contentSubscriptionProfile.filterClassName; 00520 in_csp_bin.filterExpr = actor->contentSubscriptionProfile.filterExpr; 00521 BinSeq& params = in_csp_bin.exprParams; 00522 ACE_NEW_NORETURN(params.second, char[params.first]); 00523 if (params.second == 0) { 00524 ACE_ERROR((LM_ERROR, 00525 ACE_TEXT("(%P|%t) PersistenceUpdater::requestImage(): allocation ") 00526 ACE_TEXT("failed.\n"))); 00527 return; 00528 } 00529 qos_sequences.push_back(ArrDelAdapter<char>(params.second)); 00530 ACE_OS::memcpy(params.second, 00531 actor->contentSubscriptionProfile.exprParams.second, params.first); 00532 } 00533 00534 DActor dActor(actor->domainId, actor->actorId, actor->topicId 00535 , actor->participantId 00536 , actor->type, actor->callback.c_str() 00537 , pubsub_qos, drdw_qos, in_transport_seq, in_csp_bin); 00538 image.actors.push_back(dActor); 00539 } 00540 00541 image.lastPartId = *last_part_id_; 00542 00543 um_->pushImage(image); 00544 }
void Update::PersistenceUpdater::storeUpdate | ( | const ACE_Message_Block & | data, | |
BinSeq & | storage | |||
) | [private] |
Definition at line 1038 of file PersistenceUpdater.cpp.
References allocator_, ACE_Message_Block::base(), ACE_Message_Block::length(), and ACE_OS::memcpy().
Referenced by update().
01039 { 01040 size_t len = data.length(); 01041 01042 void* buffer; 01043 ACE_ALLOCATOR(buffer, this->allocator_->malloc(len)); 01044 ACE_OS::memcpy(buffer, data.base(), len); 01045 01046 storage.first = len; 01047 storage.second = static_cast<char*>(buffer); 01048 }
int Update::PersistenceUpdater::svc | ( | void | ) | [virtual] |
ACE_Task_Base start method.
Reimplemented from ACE_Task_Base.
Definition at line 393 of file PersistenceUpdater.cpp.
void Update::PersistenceUpdater::update | ( | const IdPath & | id, | |
const DDS::StringSeq & | exprParams | |||
) | [virtual] |
Persist updated subscription exprParams.
Implements Update::Updater.
Definition at line 972 of file PersistenceUpdater.cpp.
References ACE_TEXT(), actor_index_, allocator_, ACE_OutputCDR::begin(), ACE_CDR::consolidate(), Update::ActorStrt< QosSeq, QosSeq, ACE_CString, BinSeq, ContentSubscriptionBin >::contentSubscriptionProfile, Update::ContentSubscriptionBin::exprParams, ACE_Hash_Map_With_Allocator< class, class >::find(), OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), LM_ERROR, and storeUpdate().
00973 { 00974 IdType_ExtId ext(id.id); 00975 PersistenceUpdater::RWActor* actor_data = 0; 00976 00977 if (actor_index_->find(ext, actor_data, allocator_.get()) == 0) { 00978 TAO_OutputCDR outCdr; 00979 outCdr << exprParams; 00980 ACE_Message_Block dst; 00981 ACE_CDR::consolidate(&dst, outCdr.begin()); 00982 00983 storeUpdate(dst, actor_data->contentSubscriptionProfile.exprParams); 00984 00985 } else { 00986 OpenDDS::DCPS::RepoIdConverter converter(id.id); 00987 ACE_ERROR((LM_ERROR, 00988 ACE_TEXT("(%P|%t) PersistenceUpdater::update(readerQos): ") 00989 ACE_TEXT("subscription %C not found\n"), 00990 std::string(converter).c_str())); 00991 } 00992 }
void Update::PersistenceUpdater::update | ( | const IdPath & | id, | |
const DDS::SubscriberQos & | qos | |||
) | [virtual] |
Persist updated Qos parameters for a Subscriber.
Implements Update::Updater.
Definition at line 949 of file PersistenceUpdater.cpp.
References ACE_TEXT(), actor_index_, ACE_OutputCDR::begin(), ACE_CDR::consolidate(), ACE_Hash_Map_With_Allocator< class, class >::find(), LM_ERROR, Update::ActorStrt< QosSeq, QosSeq, ACE_CString, BinSeq, ContentSubscriptionBin >::pubsubQos, and storeUpdate().
00950 { 00951 IdType_ExtId ext(id.id); 00952 PersistenceUpdater::RWActor* actor_data = 0; 00953 00954 if (this->actor_index_->find(ext, actor_data, this->allocator_.get()) == 0) { 00955 TAO_OutputCDR outCdr; 00956 outCdr << qos; 00957 ACE_Message_Block dst; 00958 ACE_CDR::consolidate(&dst, outCdr.begin()); 00959 00960 this->storeUpdate(dst, actor_data->pubsubQos.second); 00961 00962 } else { 00963 OpenDDS::DCPS::RepoIdConverter converter(id.id); 00964 ACE_ERROR((LM_ERROR, 00965 ACE_TEXT("(%P|%t) PersistenceUpdater::update(subscriberQos): ") 00966 ACE_TEXT("subscription %C not found\n"), 00967 std::string(converter).c_str())); 00968 } 00969 }
void Update::PersistenceUpdater::update | ( | const IdPath & | id, | |
const DDS::DataReaderQos & | qos | |||
) | [virtual] |
Persist updated Qos parameters for a DataReader.
Implements Update::Updater.
Definition at line 926 of file PersistenceUpdater.cpp.
References ACE_TEXT(), actor_index_, ACE_OutputCDR::begin(), ACE_CDR::consolidate(), Update::ActorStrt< QosSeq, QosSeq, ACE_CString, BinSeq, ContentSubscriptionBin >::drdwQos, ACE_Hash_Map_With_Allocator< class, class >::find(), LM_ERROR, and storeUpdate().
00927 { 00928 IdType_ExtId ext(id.id); 00929 PersistenceUpdater::RWActor* actor_data = 0; 00930 00931 if (this->actor_index_->find(ext, actor_data, this->allocator_.get()) == 0) { 00932 TAO_OutputCDR outCdr; 00933 outCdr << qos; 00934 ACE_Message_Block dst; 00935 ACE_CDR::consolidate(&dst, outCdr.begin()); 00936 00937 this->storeUpdate(dst, actor_data->drdwQos.second); 00938 00939 } else { 00940 OpenDDS::DCPS::RepoIdConverter converter(id.id); 00941 ACE_ERROR((LM_ERROR, 00942 ACE_TEXT("(%P|%t) PersistenceUpdater::update(readerQos): ") 00943 ACE_TEXT("subscription %C not found\n"), 00944 std::string(converter).c_str())); 00945 } 00946 }
void Update::PersistenceUpdater::update | ( | const IdPath & | id, | |
const DDS::PublisherQos & | qos | |||
) | [virtual] |
Persist updated Qos parameters for a Publisher.
Implements Update::Updater.
Definition at line 903 of file PersistenceUpdater.cpp.
References ACE_TEXT(), actor_index_, ACE_OutputCDR::begin(), ACE_CDR::consolidate(), ACE_Hash_Map_With_Allocator< class, class >::find(), LM_ERROR, Update::ActorStrt< QosSeq, QosSeq, ACE_CString, BinSeq, ContentSubscriptionBin >::pubsubQos, and storeUpdate().
00904 { 00905 IdType_ExtId ext(id.id); 00906 PersistenceUpdater::RWActor* actor_data = 0; 00907 00908 if (this->actor_index_->find(ext, actor_data, this->allocator_.get()) == 0) { 00909 TAO_OutputCDR outCdr; 00910 outCdr << qos; 00911 ACE_Message_Block dst; 00912 ACE_CDR::consolidate(&dst, outCdr.begin()); 00913 00914 this->storeUpdate(dst, actor_data->pubsubQos.second); 00915 00916 } else { 00917 OpenDDS::DCPS::RepoIdConverter converter(id.id); 00918 ACE_ERROR((LM_ERROR, 00919 ACE_TEXT("(%P|%t) PersistenceUpdater::update(publisherQos): ") 00920 ACE_TEXT("publication %C not found\n"), 00921 std::string(converter).c_str())); 00922 } 00923 }
void Update::PersistenceUpdater::update | ( | const IdPath & | id, | |
const DDS::DataWriterQos & | qos | |||
) | [virtual] |
Persist updated Qos parameters for a DataWriter.
Implements Update::Updater.
Definition at line 880 of file PersistenceUpdater.cpp.
References ACE_TEXT(), actor_index_, ACE_OutputCDR::begin(), ACE_CDR::consolidate(), Update::ActorStrt< QosSeq, QosSeq, ACE_CString, BinSeq, ContentSubscriptionBin >::drdwQos, ACE_Hash_Map_With_Allocator< class, class >::find(), LM_ERROR, and storeUpdate().
00881 { 00882 IdType_ExtId ext(id.id); 00883 PersistenceUpdater::RWActor* actor_data = 0; 00884 00885 if (this->actor_index_->find(ext, actor_data, this->allocator_.get()) == 0) { 00886 TAO_OutputCDR outCdr; 00887 outCdr << qos; 00888 ACE_Message_Block dst; 00889 ACE_CDR::consolidate(&dst, outCdr.begin()); 00890 00891 this->storeUpdate(dst, actor_data->drdwQos.second); 00892 00893 } else { 00894 OpenDDS::DCPS::RepoIdConverter converter(id.id); 00895 ACE_ERROR((LM_ERROR, 00896 ACE_TEXT("(%P|%t) PersistenceUpdater::update(writerQos): ") 00897 ACE_TEXT("publication %C not found\n"), 00898 std::string(converter).c_str())); 00899 } 00900 }
void Update::PersistenceUpdater::update | ( | const IdPath & | id, | |
const DDS::TopicQos & | qos | |||
) | [virtual] |
Persist updated Qos parameters for a Topic.
Implements Update::Updater.
Definition at line 857 of file PersistenceUpdater.cpp.
References ACE_TEXT(), ACE_OutputCDR::begin(), ACE_CDR::consolidate(), ACE_Hash_Map_With_Allocator< class, class >::find(), LM_ERROR, storeUpdate(), topic_index_, and Update::TopicStrt< QosSeq, ACE_CString >::topicQos.
00858 { 00859 IdType_ExtId ext(id.id); 00860 PersistenceUpdater::Topic* topic_data = 0; 00861 00862 if (this->topic_index_->find(ext, topic_data, this->allocator_.get()) == 0) { 00863 TAO_OutputCDR outCdr; 00864 outCdr << qos; 00865 ACE_Message_Block dst; 00866 ACE_CDR::consolidate(&dst, outCdr.begin()); 00867 00868 this->storeUpdate(dst, topic_data->topicQos.second); 00869 00870 } else { 00871 OpenDDS::DCPS::RepoIdConverter converter(id.id); 00872 ACE_ERROR((LM_ERROR, 00873 ACE_TEXT("(%P|%t) PersistenceUpdater::update: ") 00874 ACE_TEXT("topic %C not found\n"), 00875 std::string(converter).c_str())); 00876 } 00877 }
void Update::PersistenceUpdater::update | ( | const IdPath & | id, | |
const DDS::DomainParticipantQos & | qos | |||
) | [virtual] |
Persist updated Qos parameters for a Participant.
Implements Update::Updater.
Definition at line 834 of file PersistenceUpdater.cpp.
References ACE_TEXT(), ACE_OutputCDR::begin(), ACE_CDR::consolidate(), ACE_Hash_Map_With_Allocator< class, class >::find(), LM_ERROR, participant_index_, Update::ParticipantStrt< QosSeq >::participantQos, and storeUpdate().
00835 { 00836 IdType_ExtId ext(id.id); 00837 PersistenceUpdater::Participant* part_data = 0; 00838 00839 if (this->participant_index_->find(ext, part_data, this->allocator_.get()) == 0) { 00840 TAO_OutputCDR outCdr; 00841 outCdr << qos; 00842 ACE_Message_Block dst; 00843 ACE_CDR::consolidate(&dst, outCdr.begin()); 00844 00845 this->storeUpdate(dst, part_data->participantQos.second); 00846 00847 } else { 00848 OpenDDS::DCPS::RepoIdConverter converter(id.id); 00849 ACE_ERROR((LM_ERROR, 00850 ACE_TEXT("(%P|%t) PersistenceUpdater::update: ") 00851 ACE_TEXT("participant %C not found\n"), 00852 std::string(converter).c_str())); 00853 } 00854 }
void Update::PersistenceUpdater::updateLastPartId | ( | PartIdType | partId | ) | [virtual] |
Update Last Participant Id for repo.
Reimplemented from Update::Updater.
Definition at line 1050 of file PersistenceUpdater.cpp.
References last_part_id_.
01051 { 01052 *last_part_id_ = partId; 01053 }
Persisted Readers and Writers.
Definition at line 149 of file PersistenceUpdater.h.
Referenced by create(), destroy(), init(), requestImage(), and update().
Definition at line 140 of file PersistenceUpdater.h.
Referenced by create(), destroy(), init(), storeUpdate(), and update().
What the last participant id is/was.
Definition at line 152 of file PersistenceUpdater.h.
Referenced by init(), requestImage(), and updateLastPartId().
Persisted Participants.
Definition at line 146 of file PersistenceUpdater.h.
Referenced by create(), destroy(), init(), requestImage(), and update().
Definition at line 135 of file PersistenceUpdater.h.
bool Update::PersistenceUpdater::reset_ [private] |
Definition at line 136 of file PersistenceUpdater.h.
Persisted Topics.
Definition at line 143 of file PersistenceUpdater.h.
Referenced by create(), destroy(), init(), requestImage(), and update().
Manager* Update::PersistenceUpdater::um_ [private] |
Definition at line 138 of file PersistenceUpdater.h.
Referenced by init(), and requestImage().