#include <PersistenceUpdater.h>
Inheritance diagram for Update::PersistenceUpdater:
Public Types | |
typedef ACE_Allocator_Adapter< ACE_Malloc< ACE_MMAP_MEMORY_POOL, TAO_SYNCH_MUTEX > > | ALLOCATOR |
typedef TopicStrt< QosSeq, ACE_CString > | Topic |
Persisted entity data structures. | |
typedef ParticipantStrt< QosSeq > | Participant |
typedef ActorStrt< QosSeq, QosSeq, ACE_CString, BinSeq, ContentSubscriptionBin > | RWActor |
typedef ACE_Hash_Map_With_Allocator< IdType_ExtId, Topic * > | TopicIndex |
typedef ACE_Hash_Map_With_Allocator< IdType_ExtId, Participant * > | ParticipantIndex |
typedef ACE_Hash_Map_With_Allocator< IdType_ExtId, RWActor * > | ActorIndex |
Public Member Functions | |
PersistenceUpdater () | |
virtual | ~PersistenceUpdater () |
virtual int | init (int argc, ACE_TCHAR *argv[]) |
Service object initialization. | |
virtual int | fini () |
pure ACE_Task_Base methods | |
virtual int | svc () |
virtual void | requestImage () |
virtual void | create (const UTopic &topic) |
Add entities to be persisted. | |
virtual void | create (const UParticipant &participant) |
virtual void | create (const URActor &actor) |
virtual void | create (const UWActor &actor) |
virtual void | create (const OwnershipData &data) |
virtual void | update (const IdPath &id, const DDS::DomainParticipantQos &qos) |
Persist updated Qos parameters for an entity. | |
virtual void | update (const IdPath &id, const DDS::TopicQos &qos) |
virtual void | update (const IdPath &id, const DDS::DataWriterQos &qos) |
virtual void | update (const IdPath &id, const DDS::PublisherQos &qos) |
virtual void | update (const IdPath &id, const DDS::DataReaderQos &qos) |
virtual void | update (const IdPath &id, const DDS::SubscriberQos &qos) |
virtual void | update (const IdPath &id, const DDS::StringSeq &exprParams) |
virtual void | destroy (const IdPath &id, ItemType type, ActorType actor) |
Remove an entity (but not children) from persistence. | |
Private Member Functions | |
int | parse (int argc, ACE_TCHAR *argv[]) |
void | storeUpdate (const ACE_Message_Block &data, BinSeq &storage) |
Private Attributes | |
ACE_TString | persistence_file_ |
bool | reset_ |
Manager * | um_ |
ALLOCATOR * | allocator_ |
TopicIndex * | topic_index_ |
ParticipantIndex * | participant_index_ |
ActorIndex * | actor_index_ |
Classes | |
class | IdType_ExtId |
Definition at line 30 of file PersistenceUpdater.h.
typedef ACE_Hash_Map_With_Allocator<IdType_ExtId, RWActor*> Update::PersistenceUpdater::ActorIndex |
Definition at line 62 of file PersistenceUpdater.h.
typedef ACE_Allocator_Adapter<ACE_Malloc <ACE_MMAP_MEMORY_POOL , TAO_SYNCH_MUTEX> > Update::PersistenceUpdater::ALLOCATOR |
Definition at line 52 of file PersistenceUpdater.h.
typedef struct ParticipantStrt< QosSeq > Update::PersistenceUpdater::Participant |
Definition at line 56 of file PersistenceUpdater.h.
typedef ACE_Hash_Map_With_Allocator<IdType_ExtId, Participant*> Update::PersistenceUpdater::ParticipantIndex |
Definition at line 61 of file PersistenceUpdater.h.
typedef struct ActorStrt< QosSeq, QosSeq, ACE_CString,BinSeq, ContentSubscriptionBin > Update::PersistenceUpdater::RWActor |
Definition at line 57 of file PersistenceUpdater.h.
typedef struct TopicStrt< QosSeq, ACE_CString > Update::PersistenceUpdater::Topic |
typedef ACE_Hash_Map_With_Allocator<IdType_ExtId, Topic*> Update::PersistenceUpdater::TopicIndex |
Definition at line 60 of file PersistenceUpdater.h.
Update::PersistenceUpdater::PersistenceUpdater | ( | ) |
Definition at line 211 of file PersistenceUpdater.cpp.
00212 : persistence_file_(ACE_TEXT("InforepoPersist")) 00213 , reset_(false) 00214 , um_(0) 00215 , allocator_(0) 00216 , topic_index_(0) 00217 , participant_index_(0) 00218 , actor_index_(0) 00219 00220 {}
Update::PersistenceUpdater::~PersistenceUpdater | ( | ) | [virtual] |
void Update::PersistenceUpdater::create | ( | const OwnershipData & | data | ) | [virtual] |
virtual void Update::PersistenceUpdater::create | ( | const UWActor & | actor | ) | [virtual] |
Implements Update::Updater.
void Update::PersistenceUpdater::create | ( | const URActor & | actor | ) | [virtual] |
Implements Update::Updater.
Definition at line 637 of file PersistenceUpdater.cpp.
References actor_index_, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::actorId, allocator_, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::callback, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::contentSubscriptionProfile, Update::DataReader, Update::DataReaderQos, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::domainId, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::drdwQos, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::participantId, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::pubsubQos, Update::SubscriberQos, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::topicId, and Update::ActorStrt< PSQ, RWQ, C, T, CSP >::transportInterfaceInfo.
00638 { 00639 TAO_OutputCDR outCdr; 00640 outCdr << actor.pubsubQos; 00641 ACE_Message_Block dst; 00642 ACE_CDR::consolidate(&dst, outCdr.begin()); 00643 00644 size_t len = dst.length(); 00645 char *buf = new (std::nothrow) char[len]; 00646 00647 if (buf == 0) { 00648 ACE_ERROR((LM_ERROR, "PersistenceUpdater::create( subscription): allocation failed.\n")); 00649 return; 00650 } 00651 00652 ArrDelAdapter<char> guard(buf); 00653 00654 ACE_OS::memcpy(buf, dst.base(), len); 00655 00656 BinSeq pubsub_qos_bin(len, buf); 00657 QosSeq pubsub_qos(SubscriberQos, pubsub_qos_bin); 00658 00659 outCdr.reset(); 00660 outCdr << actor.drdwQos; 00661 ACE_Message_Block dst2; 00662 ACE_CDR::consolidate(&dst2, outCdr.begin()); 00663 00664 len = dst2.length(); 00665 char *buf2 = new (std::nothrow) char[len]; 00666 00667 if (buf2 == 0) { 00668 ACE_ERROR((LM_ERROR, "PersistenceUpdater::create( subscription): allocation failed.\n")); 00669 return; 00670 } 00671 00672 ArrDelAdapter<char> guard2(buf2); 00673 00674 ACE_OS::memcpy(buf2, dst2.base(), len); 00675 00676 BinSeq dwdr_qos_bin(len, buf2); 00677 QosSeq dwdr_qos(DataReaderQos, dwdr_qos_bin); 00678 00679 outCdr.reset(); 00680 outCdr << actor.transportInterfaceInfo; 00681 ACE_Message_Block dst3; 00682 ACE_CDR::consolidate(&dst3, outCdr.begin()); 00683 00684 len = dst3.length(); 00685 char *buf3 = new (std::nothrow) char[len]; 00686 00687 if (buf3 == 0) { 00688 ACE_ERROR((LM_ERROR, "PersistenceUpdater::create( subscription) allocation failed.\n")); 00689 return; 00690 } 00691 00692 ArrDelAdapter<char> guard3(buf3); 00693 00694 ACE_OS::memcpy(buf3, dst3.base(), len); 00695 BinSeq tr_bin(len, buf3); 00696 00697 outCdr.reset(); 00698 outCdr << actor.contentSubscriptionProfile.exprParams; 00699 ACE_Message_Block dst4; 00700 ACE_CDR::consolidate(&dst4, outCdr.begin()); 00701 len = dst4.length(); 00702 char* buf4 = new (std::nothrow) char[len]; 00703 if (buf4 == 0) { 00704 ACE_ERROR((LM_ERROR, "PersistenceUpdater::create( subscription) allocation failed.\n")); 00705 return; 00706 } 00707 ArrDelAdapter<char> guard4(buf4); 00708 00709 ACE_OS::memcpy(buf4, dst4.base(), len); 00710 ContentSubscriptionBin csp_bin; 00711 csp_bin.filterClassName = actor.contentSubscriptionProfile.filterClassName; 00712 csp_bin.filterExpr = actor.contentSubscriptionProfile.filterExpr; 00713 csp_bin.exprParams = std::make_pair(len, buf4); 00714 00715 DActor actor_data(actor.domainId, actor.actorId, actor.topicId 00716 , actor.participantId 00717 , DataReader, actor.callback.c_str(), pubsub_qos 00718 , dwdr_qos, tr_bin, csp_bin); 00719 00720 // allocate memory for ActorData 00721 void* buffer; 00722 ACE_ALLOCATOR(buffer, allocator_->malloc 00723 (sizeof(PersistenceUpdater::RWActor))); 00724 00725 // Initialize ActorData 00726 PersistenceUpdater::RWActor* persistent_data = 00727 new(buffer) PersistenceUpdater::RWActor(actor_data 00728 , allocator_); 00729 00730 IdType_ExtId ext(actor.actorId); 00731 00732 // bind ActorData with the actorId 00733 if (actor_index_->bind(ext, persistent_data, allocator_) != 0) { 00734 allocator_->free((void *) buffer); 00735 return; 00736 } 00737 }
void Update::PersistenceUpdater::create | ( | const UParticipant & | participant | ) | [virtual] |
Implements Update::Updater.
Definition at line 590 of file PersistenceUpdater.cpp.
References allocator_, Update::ParticipantStrt< Q >::domainId, Update::ParticipantStrt< Q >::owner, participant_index_, Update::ParticipantStrt< QosSeq >::participantId, Update::ParticipantStrt< Q >::participantId, Update::ParticipantQos, and Update::ParticipantStrt< Q >::participantQos.
00591 { 00592 // serialize the Topic QOS 00593 TAO_OutputCDR outCdr; 00594 outCdr << participant.participantQos; 00595 ACE_Message_Block dst; 00596 ACE_CDR::consolidate(&dst, outCdr.begin()); 00597 00598 size_t len = dst.length(); 00599 char *buf; 00600 ACE_NEW_NORETURN(buf, char[len]); 00601 ArrDelAdapter<char> guard(buf); 00602 00603 if (buf == 0) { 00604 ACE_ERROR((LM_ERROR, 00605 ACE_TEXT("PersistenceUpdater::create( UParticipant): allocation failed.\n"))); 00606 return; 00607 } 00608 00609 ACE_OS::memcpy(buf, dst.base(), len); 00610 00611 BinSeq qos_bin(len, buf); 00612 00613 QosSeq p(ParticipantQos, qos_bin); 00614 DParticipant participant_data 00615 (participant.domainId, participant.owner, participant.participantId, p); 00616 00617 // allocate memory for ParticipantData 00618 void* buffer; 00619 ACE_ALLOCATOR(buffer, allocator_->malloc 00620 (sizeof(PersistenceUpdater::Participant))); 00621 00622 // Initialize ParticipantData 00623 PersistenceUpdater::Participant* persistent_data 00624 = new(buffer) PersistenceUpdater::Participant(participant_data 00625 , allocator_); 00626 00627 IdType_ExtId ext(participant_data.participantId); 00628 00629 // bind ParticipantData with the participantId 00630 if (participant_index_->bind(ext, persistent_data, allocator_) != 0) { 00631 allocator_->free((void *) buffer); 00632 return; 00633 } 00634 }
void Update::PersistenceUpdater::create | ( | const UTopic & | topic | ) | [virtual] |
Add entities to be persisted.
Implements Update::Updater.
Definition at line 544 of file PersistenceUpdater.cpp.
References allocator_, Update::TopicStrt< Q, S >::dataType, Update::TopicStrt< Q, S >::domainId, Update::TopicStrt< Q, S >::name, Update::TopicStrt< Q, S >::participantId, topic_index_, Update::TopicStrt< QosSeq, ACE_CString >::topicId, Update::TopicStrt< Q, S >::topicId, Update::TopicQos, and Update::TopicStrt< Q, S >::topicQos.
00545 { 00546 // serialize the Topic QOS 00547 TAO_OutputCDR outCdr; 00548 outCdr << topic.topicQos; 00549 ACE_Message_Block dst; 00550 ACE_CDR::consolidate(&dst, outCdr.begin()); 00551 00552 size_t len = dst.length(); 00553 char *buf; 00554 ACE_NEW_NORETURN(buf, char[len]); 00555 ArrDelAdapter<char> guard(buf); 00556 00557 if (buf == 0) { 00558 ACE_ERROR((LM_ERROR, 00559 ACE_TEXT("PersistenceUpdater::create( UTopic): allocation failed.\n"))); 00560 return; 00561 } 00562 00563 ACE_OS::memcpy(buf, dst.base(), len); 00564 00565 BinSeq qos_bin(len, buf); 00566 00567 QosSeq p(TopicQos, qos_bin); 00568 DTopic topic_data(topic.domainId, topic.topicId, topic.participantId 00569 , topic.name.c_str(), topic.dataType.c_str(), p); 00570 00571 // allocate memory for TopicData 00572 void* buffer; 00573 ACE_ALLOCATOR(buffer, allocator_->malloc 00574 (sizeof(PersistenceUpdater::Topic))); 00575 00576 // Initialize TopicData 00577 PersistenceUpdater::Topic* persistent_data 00578 = new(buffer) PersistenceUpdater::Topic(topic_data, allocator_); 00579 00580 IdType_ExtId ext(topic_data.topicId); 00581 00582 // bind TopicData with the topicId 00583 if (topic_index_->bind(ext, persistent_data, allocator_) != 0) { 00584 allocator_->free((void *) buffer); 00585 return; 00586 } 00587 }
void Update::PersistenceUpdater::destroy | ( | const IdPath & | id, | |
ItemType | type, | |||
ActorType | actor | |||
) | [virtual] |
Remove an entity (but not children) from persistence.
Implements Update::Updater.
Definition at line 992 of file PersistenceUpdater.cpp.
References Update::Actor, actor_index_, allocator_, Update::ActorStrt< QosSeq, QosSeq, ACE_CString, BinSeq, ContentSubscriptionBin >::cleanup(), Update::ParticipantStrt< QosSeq >::cleanup(), Update::TopicStrt< QosSeq, ACE_CString >::cleanup(), Update::Participant, participant_index_, Update::Topic, and topic_index_.
00993 { 00994 IdType_ExtId ext(id.id); 00995 PersistenceUpdater::Topic* topic = 0; 00996 PersistenceUpdater::Participant* participant = 0; 00997 PersistenceUpdater::RWActor* actor = 0; 00998 00999 switch (type) { 01000 case Update::Topic: 01001 01002 if (topic_index_->unbind(ext, topic, allocator_) == 0) { 01003 topic->cleanup(allocator_); 01004 allocator_->free((void *) topic); 01005 } 01006 01007 break; 01008 case Update::Participant: 01009 01010 if (participant_index_->unbind(ext, participant, allocator_) == 0) { 01011 participant->cleanup(allocator_); 01012 allocator_->free((void *) participant); 01013 } 01014 01015 break; 01016 case Update::Actor: 01017 01018 if (actor_index_->unbind(ext, actor, allocator_) == 0) { 01019 actor->cleanup(allocator_); 01020 allocator_->free((void *) actor); 01021 } 01022 01023 break; 01024 default: { 01025 OpenDDS::DCPS::RepoIdConverter converter(id.id); 01026 ACE_ERROR((LM_ERROR, 01027 ACE_TEXT("(%P | %t) PersistenceUpdater::destroy: ") 01028 ACE_TEXT("unknown entity - %C.\n"), 01029 std::string(converter).c_str())); 01030 } 01031 } 01032 }
int Update::PersistenceUpdater::fini | ( | ) | [virtual] |
int Update::PersistenceUpdater::init | ( | int | argc, | |
ACE_TCHAR * | argv[] | |||
) | [virtual] |
Service object initialization.
Definition at line 268 of file PersistenceUpdater.cpp.
References actor_index_, Update::Manager::add(), allocator_, Update::createIndex(), Update::index_cleanup(), parse(), participant_index_, persistence_file_, reset_, topic_index_, and um_.
00269 { 00270 // discover the UpdateManager 00271 um_ = ACE_Dynamic_Service<Update::Manager>::instance 00272 ("UpdateManagerSvc"); 00273 00274 if (um_ == 0) { 00275 ACE_ERROR((LM_ERROR, ACE_TEXT("PersistenceUpdater initialization failed. ") 00276 ACE_TEXT("No UpdateManager discovered.\n"))); 00277 return -1; 00278 } 00279 00280 this->parse(argc, argv); 00281 00282 #if defined ACE_HAS_MAC_OSX && defined __x86_64__ && __x86_64__ 00283 ACE_MMAP_Memory_Pool::OPTIONS options((void*)0x200000000); 00284 #else 00285 ACE_MMAP_Memory_Pool::OPTIONS options(ACE_DEFAULT_BASE_ADDR); 00286 #endif 00287 00288 // Create the allocator with the appropriate options. The name used 00289 // for the lock is the same as one used for the file. 00290 ACE_NEW_RETURN(allocator_, 00291 ALLOCATOR(persistence_file_.c_str(), 00292 persistence_file_.c_str(), 00293 &options), 00294 -1); 00295 00296 std::string topic_tag("TopicIndex"); 00297 std::string participant_tag("ParticipantIndex"); 00298 std::string actor_tag("ActorIndex"); 00299 bool exists = false, ex = false; 00300 00301 char* topic_index = (char*)createIndex(topic_tag, *allocator_ 00302 , sizeof(TopicIndex), ex); 00303 00304 if (topic_index == 0) { 00305 ACE_DEBUG((LM_DEBUG, ACE_TEXT("Initial allocation/Bind failed 1.\n"))); 00306 return -1; 00307 } 00308 00309 exists = exists || ex; 00310 00311 char* participant_index = (char*)createIndex(participant_tag, *allocator_ 00312 , sizeof(ParticipantIndex), ex); 00313 00314 if (participant_index == 0) { 00315 ACE_DEBUG((LM_DEBUG, ACE_TEXT("Initial allocation/Bind failed 2.\n"))); 00316 return -1; 00317 } 00318 00319 exists = exists || ex; 00320 00321 char* actor_index = (char*)createIndex(actor_tag, *allocator_ 00322 , sizeof(ActorIndex), ex); 00323 00324 if (actor_index == 0) { 00325 ACE_DEBUG((LM_DEBUG, ACE_TEXT("Initial allocation/Bind failed 2.\n"))); 00326 return -1; 00327 } 00328 00329 exists = exists || ex; 00330 00331 if (exists) { 00332 topic_index_ = reinterpret_cast<TopicIndex*>(topic_index); 00333 participant_index_ = reinterpret_cast<ParticipantIndex*>(participant_index); 00334 actor_index_ = reinterpret_cast<ActorIndex*>(actor_index); 00335 00336 if (!(topic_index_ && participant_index_ && actor_index_)) { 00337 ACE_ERROR((LM_DEBUG, ACE_TEXT("Unable to narrow persistent indexes.\n"))); 00338 return -1; 00339 } 00340 00341 } else { 00342 topic_index_ = new(topic_index) TopicIndex(allocator_); 00343 participant_index_ = new(participant_index) ParticipantIndex(allocator_); 00344 actor_index_ = new(actor_index) ActorIndex(allocator_); 00345 } 00346 00347 if (reset_) { 00348 index_cleanup(topic_index_, allocator_); 00349 index_cleanup(participant_index_, allocator_); 00350 index_cleanup(actor_index_, allocator_); 00351 } 00352 00353 // lastly register the callback 00354 um_->add(this); 00355 00356 return 0; 00357 }
int Update::PersistenceUpdater::parse | ( | int | argc, | |
ACE_TCHAR * | argv[] | |||
) | [private] |
Definition at line 360 of file PersistenceUpdater.cpp.
References persistence_file_, and reset_.
Referenced by init().
00361 { 00362 for (ssize_t count = 0; count < argc; count++) { 00363 if (ACE_OS::strcasecmp(argv[count], ACE_TEXT("-file")) == 0) { 00364 if ((count + 1) < argc) { 00365 persistence_file_ = argv[count+1]; 00366 count++; 00367 } 00368 00369 } else if (ACE_OS::strcasecmp(argv[count], ACE_TEXT("-reset")) == 0) { 00370 if ((count + 1) < argc) { 00371 int val = ACE_OS::atoi(argv[count+1]); 00372 reset_ = true; 00373 00374 if (val == 0) { 00375 reset_ = false; 00376 } 00377 00378 count++; 00379 } 00380 00381 } else { 00382 ACE_DEBUG((LM_DEBUG, ACE_TEXT("Unknown option %s\n") 00383 , argv[count])); 00384 return -1; 00385 } 00386 } 00387 00388 return 0; 00389 }
void Update::PersistenceUpdater::requestImage | ( | ) | [virtual] |
Request an image refresh to be sent upstream. This is currently done synchronously. TBD: Move to an asynchronous model
Implements Update::Updater.
Definition at line 404 of file PersistenceUpdater.cpp.
References actor_index_, Update::ImageData< T, P, A, W >::actors, Update::DataReader, Update::ContentSubscriptionBin::exprParams, Update::ContentSubscriptionBin::filterClassName, Update::ContentSubscriptionBin::filterExpr, participant_index_, Update::ParticipantQos, Update::ImageData< T, P, A, W >::participants, Update::Manager::pushImage(), topic_index_, Update::TopicQos, Update::ImageData< T, P, A, W >::topics, and um_.
00405 { 00406 if (um_ == NULL) { 00407 return; 00408 } 00409 00410 DImage image; 00411 00412 // Allocate space to hold the QOS sequences. 00413 std::vector<ArrDelAdapter<char> > qos_sequences; 00414 00415 for (ParticipantIndex::ITERATOR iter = participant_index_->begin(); 00416 iter != participant_index_->end(); iter++) { 00417 const PersistenceUpdater::Participant* participant 00418 = (*iter).int_id_; 00419 00420 size_t qos_len = participant->participantQos.second.first; 00421 char *buf; 00422 ACE_NEW_NORETURN(buf, char[qos_len]); 00423 qos_sequences.push_back(ArrDelAdapter<char>(buf)); 00424 00425 if (buf == 0) { 00426 ACE_ERROR((LM_ERROR, 00427 ACE_TEXT("PersistenceUpdater::requestImage(): allocation failed.\n"))); 00428 return; 00429 } 00430 00431 ACE_OS::memcpy(buf, participant->participantQos.second.second, qos_len); 00432 00433 BinSeq in_seq(qos_len, buf); 00434 QosSeq qos(ParticipantQos, in_seq); 00435 DParticipant dparticipant(participant->domainId 00436 , participant->owner 00437 , participant->participantId 00438 , qos); 00439 image.participants.push_back(dparticipant); 00440 } 00441 00442 for (TopicIndex::ITERATOR iter = topic_index_->begin(); 00443 iter != topic_index_->end(); iter++) { 00444 const PersistenceUpdater::Topic* topic = (*iter).int_id_; 00445 00446 size_t qos_len = topic->topicQos.second.first; 00447 char *buf; 00448 ACE_NEW_NORETURN(buf, char[qos_len]); 00449 qos_sequences.push_back(ArrDelAdapter<char>(buf)); 00450 00451 if (buf == 0) { 00452 ACE_ERROR((LM_ERROR, 00453 ACE_TEXT("PersistenceUpdater::requestImage(): allocation failed.\n"))); 00454 return; 00455 } 00456 00457 ACE_OS::memcpy(buf, topic->topicQos.second.second, qos_len); 00458 00459 BinSeq in_seq(qos_len, buf); 00460 QosSeq qos(TopicQos, in_seq); 00461 DTopic dTopic(topic->domainId, topic->topicId 00462 , topic->participantId, topic->name.c_str() 00463 , topic->dataType.c_str(), qos); 00464 image.topics.push_back(dTopic); 00465 } 00466 00467 for (ActorIndex::ITERATOR iter = actor_index_->begin(); 00468 iter != actor_index_->end(); iter++) { 00469 const PersistenceUpdater::RWActor* actor = (*iter).int_id_; 00470 00471 size_t qos_len = actor->pubsubQos.second.first; 00472 char *buf; 00473 ACE_NEW_NORETURN(buf, char[qos_len]); 00474 qos_sequences.push_back(ArrDelAdapter<char>(buf)); 00475 00476 if (buf == 0) { 00477 ACE_ERROR((LM_ERROR, 00478 ACE_TEXT("PersistenceUpdater::requestImage(): allocation failed.\n"))); 00479 return; 00480 } 00481 00482 ACE_OS::memcpy(buf, actor->pubsubQos.second.second, qos_len); 00483 00484 BinSeq in_pubsub_seq(qos_len, buf); 00485 QosSeq pubsub_qos(actor->pubsubQos.first, in_pubsub_seq); 00486 00487 qos_len = actor->drdwQos.second.first; 00488 ACE_NEW_NORETURN(buf, char[qos_len]); 00489 qos_sequences.push_back(ArrDelAdapter<char>(buf)); 00490 00491 if (buf == 0) { 00492 ACE_ERROR((LM_ERROR, 00493 ACE_TEXT("PersistenceUpdater::requestImage(): allocation failed.\n"))); 00494 return; 00495 } 00496 00497 ACE_OS::memcpy(buf, actor->drdwQos.second.second, qos_len); 00498 00499 BinSeq in_drdw_seq(qos_len, buf); 00500 QosSeq drdw_qos(actor->drdwQos.first, in_drdw_seq); 00501 00502 qos_len = actor->transportInterfaceInfo.first; 00503 ACE_NEW_NORETURN(buf, char[qos_len]); 00504 qos_sequences.push_back(ArrDelAdapter<char>(buf)); 00505 00506 if (buf == 0) { 00507 ACE_ERROR((LM_ERROR, 00508 ACE_TEXT("PersistenceUpdater::requestImage(): allocation failed.\n"))); 00509 return; 00510 } 00511 00512 ACE_OS::memcpy(buf, actor->transportInterfaceInfo.second, qos_len); 00513 00514 BinSeq in_transport_seq(qos_len, buf); 00515 00516 ContentSubscriptionBin in_csp_bin; 00517 if (actor->type == DataReader) { 00518 in_csp_bin.filterClassName = actor->contentSubscriptionProfile.filterClassName; 00519 in_csp_bin.filterExpr = actor->contentSubscriptionProfile.filterExpr; 00520 BinSeq& params = in_csp_bin.exprParams; 00521 ACE_NEW_NORETURN(params.second, char[params.first]); 00522 if (params.second == 0) { 00523 ACE_ERROR((LM_ERROR, 00524 ACE_TEXT("PersistenceUpdater::requestImage(): allocation ") 00525 ACE_TEXT("failed.\n"))); 00526 return; 00527 } 00528 qos_sequences.push_back(ArrDelAdapter<char>(params.second)); 00529 ACE_OS::memcpy(params.second, 00530 actor->contentSubscriptionProfile.exprParams.second, params.first); 00531 } 00532 00533 DActor dActor(actor->domainId, actor->actorId, actor->topicId 00534 , actor->participantId 00535 , actor->type, actor->callback.c_str() 00536 , pubsub_qos, drdw_qos, in_transport_seq, in_csp_bin); 00537 image.actors.push_back(dActor); 00538 } 00539 00540 um_->pushImage(image); 00541 }
void Update::PersistenceUpdater::storeUpdate | ( | const ACE_Message_Block & | data, | |
BinSeq & | storage | |||
) | [private] |
Definition at line 1035 of file PersistenceUpdater.cpp.
Referenced by update().
01036 { 01037 size_t len = data.length(); 01038 01039 void* buffer; 01040 ACE_ALLOCATOR(buffer, this->allocator_->malloc(len)); 01041 ACE_OS::memcpy(buffer, data.base(), len); 01042 01043 storage.first = len; 01044 storage.second = static_cast<char*>(buffer); 01045 }
int Update::PersistenceUpdater::svc | ( | ) | [virtual] |
void Update::PersistenceUpdater::update | ( | const IdPath & | id, | |
const DDS::StringSeq & | exprParams | |||
) | [virtual] |
Implements Update::Updater.
Definition at line 969 of file PersistenceUpdater.cpp.
References actor_index_, allocator_, Update::ActorStrt< QosSeq, QosSeq, ACE_CString, BinSeq, ContentSubscriptionBin >::contentSubscriptionProfile, Update::ContentSubscriptionBin::exprParams, and storeUpdate().
00970 { 00971 IdType_ExtId ext(id.id); 00972 PersistenceUpdater::RWActor* actor_data = 0; 00973 00974 if (actor_index_->find(ext, actor_data, allocator_) == 0) { 00975 TAO_OutputCDR outCdr; 00976 outCdr << exprParams; 00977 ACE_Message_Block dst; 00978 ACE_CDR::consolidate(&dst, outCdr.begin()); 00979 00980 storeUpdate(dst, actor_data->contentSubscriptionProfile.exprParams); 00981 00982 } else { 00983 OpenDDS::DCPS::RepoIdConverter converter(id.id); 00984 ACE_ERROR((LM_ERROR, 00985 ACE_TEXT("(%P|%t) PersistenceUpdater::update(readerQos): ") 00986 ACE_TEXT("subscription %C not found\n"), 00987 std::string(converter).c_str())); 00988 } 00989 }
void Update::PersistenceUpdater::update | ( | const IdPath & | id, | |
const DDS::SubscriberQos & | qos | |||
) | [virtual] |
Implements Update::Updater.
Definition at line 946 of file PersistenceUpdater.cpp.
References Update::ActorStrt< QosSeq, QosSeq, ACE_CString, BinSeq, ContentSubscriptionBin >::pubsubQos, and storeUpdate().
00947 { 00948 IdType_ExtId ext(id.id); 00949 PersistenceUpdater::RWActor* actor_data = 0; 00950 00951 if (this->actor_index_->find(ext, actor_data, this->allocator_) == 0) { 00952 TAO_OutputCDR outCdr; 00953 outCdr << qos; 00954 ACE_Message_Block dst; 00955 ACE_CDR::consolidate(&dst, outCdr.begin()); 00956 00957 this->storeUpdate(dst, actor_data->pubsubQos.second); 00958 00959 } else { 00960 OpenDDS::DCPS::RepoIdConverter converter(id.id); 00961 ACE_ERROR((LM_ERROR, 00962 ACE_TEXT("(%P|%t) PersistenceUpdater::update(subscriberQos): ") 00963 ACE_TEXT("subscription %C not found\n"), 00964 std::string(converter).c_str())); 00965 } 00966 }
void Update::PersistenceUpdater::update | ( | const IdPath & | id, | |
const DDS::DataReaderQos & | qos | |||
) | [virtual] |
Implements Update::Updater.
Definition at line 923 of file PersistenceUpdater.cpp.
References Update::ActorStrt< QosSeq, QosSeq, ACE_CString, BinSeq, ContentSubscriptionBin >::drdwQos, and storeUpdate().
00924 { 00925 IdType_ExtId ext(id.id); 00926 PersistenceUpdater::RWActor* actor_data = 0; 00927 00928 if (this->actor_index_->find(ext, actor_data, this->allocator_) == 0) { 00929 TAO_OutputCDR outCdr; 00930 outCdr << qos; 00931 ACE_Message_Block dst; 00932 ACE_CDR::consolidate(&dst, outCdr.begin()); 00933 00934 this->storeUpdate(dst, actor_data->drdwQos.second); 00935 00936 } else { 00937 OpenDDS::DCPS::RepoIdConverter converter(id.id); 00938 ACE_ERROR((LM_ERROR, 00939 ACE_TEXT("(%P|%t) PersistenceUpdater::update(readerQos): ") 00940 ACE_TEXT("subscription %C not found\n"), 00941 std::string(converter).c_str())); 00942 } 00943 }
void Update::PersistenceUpdater::update | ( | const IdPath & | id, | |
const DDS::PublisherQos & | qos | |||
) | [virtual] |
Implements Update::Updater.
Definition at line 900 of file PersistenceUpdater.cpp.
References Update::ActorStrt< QosSeq, QosSeq, ACE_CString, BinSeq, ContentSubscriptionBin >::pubsubQos, and storeUpdate().
00901 { 00902 IdType_ExtId ext(id.id); 00903 PersistenceUpdater::RWActor* actor_data = 0; 00904 00905 if (this->actor_index_->find(ext, actor_data, this->allocator_) == 0) { 00906 TAO_OutputCDR outCdr; 00907 outCdr << qos; 00908 ACE_Message_Block dst; 00909 ACE_CDR::consolidate(&dst, outCdr.begin()); 00910 00911 this->storeUpdate(dst, actor_data->pubsubQos.second); 00912 00913 } else { 00914 OpenDDS::DCPS::RepoIdConverter converter(id.id); 00915 ACE_ERROR((LM_ERROR, 00916 ACE_TEXT("(%P|%t) PersistenceUpdater::update(publisherQos): ") 00917 ACE_TEXT("publication %C not found\n"), 00918 std::string(converter).c_str())); 00919 } 00920 }
void Update::PersistenceUpdater::update | ( | const IdPath & | id, | |
const DDS::DataWriterQos & | qos | |||
) | [virtual] |
Implements Update::Updater.
Definition at line 877 of file PersistenceUpdater.cpp.
References Update::ActorStrt< QosSeq, QosSeq, ACE_CString, BinSeq, ContentSubscriptionBin >::drdwQos, and storeUpdate().
00878 { 00879 IdType_ExtId ext(id.id); 00880 PersistenceUpdater::RWActor* actor_data = 0; 00881 00882 if (this->actor_index_->find(ext, actor_data, this->allocator_) == 0) { 00883 TAO_OutputCDR outCdr; 00884 outCdr << qos; 00885 ACE_Message_Block dst; 00886 ACE_CDR::consolidate(&dst, outCdr.begin()); 00887 00888 this->storeUpdate(dst, actor_data->drdwQos.second); 00889 00890 } else { 00891 OpenDDS::DCPS::RepoIdConverter converter(id.id); 00892 ACE_ERROR((LM_ERROR, 00893 ACE_TEXT("(%P|%t) PersistenceUpdater::update(writerQos): ") 00894 ACE_TEXT("publication %C not found\n"), 00895 std::string(converter).c_str())); 00896 } 00897 }
void Update::PersistenceUpdater::update | ( | const IdPath & | id, | |
const DDS::TopicQos & | qos | |||
) | [virtual] |
Implements Update::Updater.
Definition at line 854 of file PersistenceUpdater.cpp.
References storeUpdate(), and Update::TopicStrt< QosSeq, ACE_CString >::topicQos.
00855 { 00856 IdType_ExtId ext(id.id); 00857 PersistenceUpdater::Topic* topic_data = 0; 00858 00859 if (this->topic_index_->find(ext, topic_data, this->allocator_) == 0) { 00860 TAO_OutputCDR outCdr; 00861 outCdr << qos; 00862 ACE_Message_Block dst; 00863 ACE_CDR::consolidate(&dst, outCdr.begin()); 00864 00865 this->storeUpdate(dst, topic_data->topicQos.second); 00866 00867 } else { 00868 OpenDDS::DCPS::RepoIdConverter converter(id.id); 00869 ACE_ERROR((LM_ERROR, 00870 ACE_TEXT("(%P|%t) PersistenceUpdater::update: ") 00871 ACE_TEXT("topic %C not found\n"), 00872 std::string(converter).c_str())); 00873 } 00874 }
void Update::PersistenceUpdater::update | ( | const IdPath & | id, | |
const DDS::DomainParticipantQos & | qos | |||
) | [virtual] |
Persist updated Qos parameters for an entity.
Implements Update::Updater.
Definition at line 831 of file PersistenceUpdater.cpp.
References Update::ParticipantStrt< QosSeq >::participantQos, and storeUpdate().
00832 { 00833 IdType_ExtId ext(id.id); 00834 PersistenceUpdater::Participant* part_data = 0; 00835 00836 if (this->participant_index_->find(ext, part_data, this->allocator_) == 0) { 00837 TAO_OutputCDR outCdr; 00838 outCdr << qos; 00839 ACE_Message_Block dst; 00840 ACE_CDR::consolidate(&dst, outCdr.begin()); 00841 00842 this->storeUpdate(dst, part_data->participantQos.second); 00843 00844 } else { 00845 OpenDDS::DCPS::RepoIdConverter converter(id.id); 00846 ACE_ERROR((LM_ERROR, 00847 ACE_TEXT("(%P|%t) PersistenceUpdater::update: ") 00848 ACE_TEXT("participant %C not found\n"), 00849 std::string(converter).c_str())); 00850 } 00851 }
Definition at line 112 of file PersistenceUpdater.h.
Referenced by create(), destroy(), init(), requestImage(), and update().
ALLOCATOR* Update::PersistenceUpdater::allocator_ [private] |
Definition at line 111 of file PersistenceUpdater.h.
Referenced by create(), destroy(), init(), and requestImage().
ACE_TString Update::PersistenceUpdater::persistence_file_ [private] |
bool Update::PersistenceUpdater::reset_ [private] |
Definition at line 110 of file PersistenceUpdater.h.
Referenced by create(), destroy(), init(), and requestImage().
Manager* Update::PersistenceUpdater::um_ [private] |