00001
00002
00003
00004
00005
00006
00007
00008 #include "DcpsInfo_pch.h"
00009
00010 #include "PersistenceUpdater.h"
00011 #include "UpdateManager.h"
00012 #include "ArrDelAdapter.h"
00013
00014 #include "dds/DCPS/RepoIdConverter.h"
00015 #include "dds/DCPS/GuidUtils.h"
00016
00017 #include "ace/Malloc_T.h"
00018 #include "ace/MMAP_Memory_Pool.h"
00019 #include "ace/OS_NS_strings.h"
00020 #include "ace/Svc_Handler.h"
00021 #include "ace/Dynamic_Service.h"
00022
00023 #include <algorithm>
00024
00025 namespace {
00026 void assign(Update::BinSeq& to, const Update::BinSeq& from,
00027 Update::PersistenceUpdater::ALLOCATOR* allocator)
00028 {
00029 const size_t len = from.first;
00030 void* out_buf;
00031 ACE_ALLOCATOR(out_buf, allocator->malloc(len));
00032 ACE_OS::memcpy(out_buf, from.second, len);
00033 to = std::make_pair(len, static_cast<char*>(out_buf));
00034 }
00035
00036 void assign(ACE_CString& to, const char* from,
00037 Update::PersistenceUpdater::ALLOCATOR* allocator)
00038 {
00039 const size_t len = ACE_OS::strlen (from) + 1;
00040 void* out_buf;
00041 ACE_ALLOCATOR(out_buf, allocator->malloc(len));
00042 ACE_OS::memcpy(out_buf, from, len);
00043 to.set(static_cast<char*>(out_buf), len - 1, false);
00044 }
00045 }
00046
00047 namespace Update {
00048
00049
00050
00051 template<>
00052 struct TopicStrt<QosSeq, ACE_CString> {
00053 DDS::DomainId_t domainId;
00054 IdType topicId;
00055 IdType participantId;
00056 ACE_CString name;
00057 ACE_CString dataType;
00058 QosSeq topicQos;
00059
00060 TopicStrt(const DTopic& topic,
00061 PersistenceUpdater::ALLOCATOR* allocator)
00062 : domainId(topic.domainId),
00063 topicId(topic.topicId),
00064 participantId(topic.participantId)
00065 {
00066 assign(name, topic.name.c_str(), allocator);
00067 assign(dataType, topic.dataType.c_str(), allocator);
00068
00069 topicQos.first = TopicQos;
00070 assign(topicQos.second, topic.topicQos.second, allocator);
00071 }
00072
00073 void cleanup(PersistenceUpdater::ALLOCATOR* allocator)
00074 {
00075 if (name.length() > 0)
00076 {
00077 char* strMemory = const_cast<char*>(name.fast_rep());
00078 name.fast_clear();
00079 allocator->free(strMemory);
00080 }
00081 if (dataType.length() > 0)
00082 {
00083 char* strMemory = const_cast<char*>(dataType.fast_rep());
00084 dataType.fast_clear();
00085 allocator->free(strMemory);
00086 }
00087
00088 allocator->free(topicQos.second.second);
00089 }
00090 };
00091
00092 template<>
00093 struct ParticipantStrt<QosSeq> {
00094 DDS::DomainId_t domainId;
00095 long owner;
00096 IdType participantId;
00097 QosSeq participantQos;
00098
00099 ParticipantStrt(const DDS::DomainId_t& dId,
00100 long own,
00101 const IdType& pId,
00102 const QosSeq& pQos)
00103 : domainId(dId),
00104 owner(own),
00105 participantId(pId),
00106 participantQos(pQos) {}
00107
00108 ParticipantStrt(const DParticipant& participant,
00109 PersistenceUpdater::ALLOCATOR* allocator)
00110 : domainId(participant.domainId),
00111 owner(participant.owner),
00112 participantId(participant.participantId)
00113 {
00114 participantQos.first = ParticipantQos;
00115 assign(participantQos.second, participant.participantQos.second, allocator);
00116 }
00117
00118 void cleanup(PersistenceUpdater::ALLOCATOR* allocator)
00119 {
00120 allocator->free(participantQos.second.second);
00121 }
00122 };
00123
00124 template<>
00125 struct ActorStrt<QosSeq, QosSeq,
00126 ACE_CString, BinSeq, ContentSubscriptionBin> {
00127 DDS::DomainId_t domainId;
00128 IdType actorId;
00129 IdType topicId;
00130 IdType participantId;
00131 ActorType type;
00132 ACE_CString callback;
00133 QosSeq pubsubQos;
00134 QosSeq drdwQos;
00135 BinSeq transportInterfaceInfo;
00136 ContentSubscriptionBin contentSubscriptionProfile;
00137
00138 ActorStrt(const DActor& actor,
00139 PersistenceUpdater::ALLOCATOR* allocator)
00140 : domainId(actor.domainId),
00141 actorId(actor.actorId),
00142 topicId(actor.topicId),
00143 participantId(actor.participantId), type(actor.type)
00144 {
00145 assign(callback, actor.callback.c_str(), allocator);
00146
00147 pubsubQos.first = actor.pubsubQos.first;
00148 assign(pubsubQos.second, actor.pubsubQos.second, allocator);
00149
00150 drdwQos.first = actor.drdwQos.first;
00151 assign(drdwQos.second, actor.drdwQos.second, allocator);
00152
00153 assign(transportInterfaceInfo, actor.transportInterfaceInfo, allocator);
00154
00155 contentSubscriptionProfile.filterClassName =
00156 ACE_CString(actor.contentSubscriptionProfile.filterClassName.c_str(),
00157 allocator);
00158 contentSubscriptionProfile.filterExpr =
00159 ACE_CString(actor.contentSubscriptionProfile.filterExpr.c_str(),
00160 allocator);
00161 assign(contentSubscriptionProfile.exprParams,
00162 actor.contentSubscriptionProfile.exprParams, allocator);
00163 }
00164
00165 void cleanup(PersistenceUpdater::ALLOCATOR* allocator)
00166 {
00167 if (callback.length() > 0)
00168 {
00169 char* strMemory = const_cast<char*>(callback.fast_rep());
00170 callback.fast_clear();
00171 allocator->free(strMemory);
00172 }
00173
00174 allocator->free(pubsubQos.second.second);
00175 allocator->free(drdwQos.second.second);
00176 allocator->free(transportInterfaceInfo.second);
00177 allocator->free(contentSubscriptionProfile.exprParams.second);
00178 }
00179 };
00180
00181 PersistenceUpdater::IdType_ExtId::IdType_ExtId()
00182 : id_(OpenDDS::DCPS::GUID_UNKNOWN)
00183 {}
00184
00185 PersistenceUpdater::IdType_ExtId::IdType_ExtId(IdType id)
00186 : id_(id)
00187 {}
00188
00189 PersistenceUpdater::IdType_ExtId::IdType_ExtId(const IdType_ExtId& ext)
00190 : id_(ext.id_)
00191 {}
00192
00193 void
00194 PersistenceUpdater::IdType_ExtId::operator= (const IdType_ExtId& ext)
00195 {
00196 id_ = ext.id_;
00197 }
00198
00199 bool
00200 PersistenceUpdater::IdType_ExtId::operator== (const IdType_ExtId& ext) const
00201 {
00202 return (id_ == ext.id_);
00203 }
00204
00205 unsigned long
00206 PersistenceUpdater::IdType_ExtId::hash() const
00207 {
00208 return OpenDDS::DCPS::RepoIdConverter(id_).checksum();
00209 };
00210
00211 PersistenceUpdater::PersistenceUpdater()
00212 : persistence_file_(ACE_TEXT("InforepoPersist"))
00213 , reset_(false)
00214 , um_(0)
00215 , allocator_(0)
00216 , topic_index_(0)
00217 , participant_index_(0)
00218 , actor_index_(0)
00219
00220 {}
00221
00222 PersistenceUpdater::~PersistenceUpdater()
00223 {}
00224
00225
00226 void* createIndex(const std::string& tag
00227 , PersistenceUpdater::ALLOCATOR& allocator
00228 , size_t size, bool& exists)
00229 {
00230 void* index = 0;
00231
00232
00233
00234 if (allocator.find(tag.c_str(), index) == 0) {
00235 exists = true;
00236 return index;
00237
00238 } else {
00239 exists = false;
00240
00241 ACE_ALLOCATOR_RETURN(index, allocator.malloc(size), 0);
00242
00243 if (allocator.bind(tag.c_str(), index) == -1) {
00244 allocator.free(index);
00245 index = 0;
00246 }
00247 }
00248
00249 return index;
00250 }
00251
00252 template<typename I> void
00253 index_cleanup(I* index
00254 , PersistenceUpdater::ALLOCATOR* allocator)
00255 {
00256 for (typename I::ITERATOR iter = index->begin()
00257 ; iter != index->end();) {
00258 typename I::ITERATOR current_iter = iter;
00259 iter++;
00260
00261 if (index->unbind((*current_iter).ext_id_, allocator) != 0) {
00262 ACE_ERROR((LM_ERROR, ACE_TEXT("Index unbind failed.\n")));
00263 }
00264 }
00265 }
00266
00267 int
00268 PersistenceUpdater::init(int argc, ACE_TCHAR *argv[])
00269 {
00270
00271 um_ = ACE_Dynamic_Service<Update::Manager>::instance
00272 ("UpdateManagerSvc");
00273
00274 if (um_ == 0) {
00275 ACE_ERROR((LM_ERROR, ACE_TEXT("PersistenceUpdater initialization failed. ")
00276 ACE_TEXT("No UpdateManager discovered.\n")));
00277 return -1;
00278 }
00279
00280 this->parse(argc, argv);
00281
00282 #if defined ACE_HAS_MAC_OSX && defined __x86_64__ && __x86_64__
00283 ACE_MMAP_Memory_Pool::OPTIONS options((void*)0x200000000);
00284 #else
00285 ACE_MMAP_Memory_Pool::OPTIONS options(ACE_DEFAULT_BASE_ADDR);
00286 #endif
00287
00288
00289
00290 ACE_NEW_RETURN(allocator_,
00291 ALLOCATOR(persistence_file_.c_str(),
00292 persistence_file_.c_str(),
00293 &options),
00294 -1);
00295
00296 std::string topic_tag("TopicIndex");
00297 std::string participant_tag("ParticipantIndex");
00298 std::string actor_tag("ActorIndex");
00299 bool exists = false, ex = false;
00300
00301 char* topic_index = (char*)createIndex(topic_tag, *allocator_
00302 , sizeof(TopicIndex), ex);
00303
00304 if (topic_index == 0) {
00305 ACE_DEBUG((LM_DEBUG, ACE_TEXT("Initial allocation/Bind failed 1.\n")));
00306 return -1;
00307 }
00308
00309 exists = exists || ex;
00310
00311 char* participant_index = (char*)createIndex(participant_tag, *allocator_
00312 , sizeof(ParticipantIndex), ex);
00313
00314 if (participant_index == 0) {
00315 ACE_DEBUG((LM_DEBUG, ACE_TEXT("Initial allocation/Bind failed 2.\n")));
00316 return -1;
00317 }
00318
00319 exists = exists || ex;
00320
00321 char* actor_index = (char*)createIndex(actor_tag, *allocator_
00322 , sizeof(ActorIndex), ex);
00323
00324 if (actor_index == 0) {
00325 ACE_DEBUG((LM_DEBUG, ACE_TEXT("Initial allocation/Bind failed 2.\n")));
00326 return -1;
00327 }
00328
00329 exists = exists || ex;
00330
00331 if (exists) {
00332 topic_index_ = reinterpret_cast<TopicIndex*>(topic_index);
00333 participant_index_ = reinterpret_cast<ParticipantIndex*>(participant_index);
00334 actor_index_ = reinterpret_cast<ActorIndex*>(actor_index);
00335
00336 if (!(topic_index_ && participant_index_ && actor_index_)) {
00337 ACE_ERROR((LM_DEBUG, ACE_TEXT("Unable to narrow persistent indexes.\n")));
00338 return -1;
00339 }
00340
00341 } else {
00342 topic_index_ = new(topic_index) TopicIndex(allocator_);
00343 participant_index_ = new(participant_index) ParticipantIndex(allocator_);
00344 actor_index_ = new(actor_index) ActorIndex(allocator_);
00345 }
00346
00347 if (reset_) {
00348 index_cleanup(topic_index_, allocator_);
00349 index_cleanup(participant_index_, allocator_);
00350 index_cleanup(actor_index_, allocator_);
00351 }
00352
00353
00354 um_->add(this);
00355
00356 return 0;
00357 }
00358
00359 int
00360 PersistenceUpdater::parse(int argc, ACE_TCHAR *argv[])
00361 {
00362 for (ssize_t count = 0; count < argc; count++) {
00363 if (ACE_OS::strcasecmp(argv[count], ACE_TEXT("-file")) == 0) {
00364 if ((count + 1) < argc) {
00365 persistence_file_ = argv[count+1];
00366 count++;
00367 }
00368
00369 } else if (ACE_OS::strcasecmp(argv[count], ACE_TEXT("-reset")) == 0) {
00370 if ((count + 1) < argc) {
00371 int val = ACE_OS::atoi(argv[count+1]);
00372 reset_ = true;
00373
00374 if (val == 0) {
00375 reset_ = false;
00376 }
00377
00378 count++;
00379 }
00380
00381 } else {
00382 ACE_DEBUG((LM_DEBUG, ACE_TEXT("Unknown option %s\n")
00383 , argv[count]));
00384 return -1;
00385 }
00386 }
00387
00388 return 0;
00389 }
00390
00391 int
00392 PersistenceUpdater::fini()
00393 {
00394 return 0;
00395 }
00396
00397 int
00398 PersistenceUpdater::svc()
00399 {
00400 return 0;
00401 }
00402
00403 void
00404 PersistenceUpdater::requestImage()
00405 {
00406 if (um_ == NULL) {
00407 return;
00408 }
00409
00410 DImage image;
00411
00412
00413 std::vector<ArrDelAdapter<char> > qos_sequences;
00414
00415 for (ParticipantIndex::ITERATOR iter = participant_index_->begin();
00416 iter != participant_index_->end(); iter++) {
00417 const PersistenceUpdater::Participant* participant
00418 = (*iter).int_id_;
00419
00420 size_t qos_len = participant->participantQos.second.first;
00421 char *buf;
00422 ACE_NEW_NORETURN(buf, char[qos_len]);
00423 qos_sequences.push_back(ArrDelAdapter<char>(buf));
00424
00425 if (buf == 0) {
00426 ACE_ERROR((LM_ERROR,
00427 ACE_TEXT("PersistenceUpdater::requestImage(): allocation failed.\n")));
00428 return;
00429 }
00430
00431 ACE_OS::memcpy(buf, participant->participantQos.second.second, qos_len);
00432
00433 BinSeq in_seq(qos_len, buf);
00434 QosSeq qos(ParticipantQos, in_seq);
00435 DParticipant dparticipant(participant->domainId
00436 , participant->owner
00437 , participant->participantId
00438 , qos);
00439 image.participants.push_back(dparticipant);
00440 }
00441
00442 for (TopicIndex::ITERATOR iter = topic_index_->begin();
00443 iter != topic_index_->end(); iter++) {
00444 const PersistenceUpdater::Topic* topic = (*iter).int_id_;
00445
00446 size_t qos_len = topic->topicQos.second.first;
00447 char *buf;
00448 ACE_NEW_NORETURN(buf, char[qos_len]);
00449 qos_sequences.push_back(ArrDelAdapter<char>(buf));
00450
00451 if (buf == 0) {
00452 ACE_ERROR((LM_ERROR,
00453 ACE_TEXT("PersistenceUpdater::requestImage(): allocation failed.\n")));
00454 return;
00455 }
00456
00457 ACE_OS::memcpy(buf, topic->topicQos.second.second, qos_len);
00458
00459 BinSeq in_seq(qos_len, buf);
00460 QosSeq qos(TopicQos, in_seq);
00461 DTopic dTopic(topic->domainId, topic->topicId
00462 , topic->participantId, topic->name.c_str()
00463 , topic->dataType.c_str(), qos);
00464 image.topics.push_back(dTopic);
00465 }
00466
00467 for (ActorIndex::ITERATOR iter = actor_index_->begin();
00468 iter != actor_index_->end(); iter++) {
00469 const PersistenceUpdater::RWActor* actor = (*iter).int_id_;
00470
00471 size_t qos_len = actor->pubsubQos.second.first;
00472 char *buf;
00473 ACE_NEW_NORETURN(buf, char[qos_len]);
00474 qos_sequences.push_back(ArrDelAdapter<char>(buf));
00475
00476 if (buf == 0) {
00477 ACE_ERROR((LM_ERROR,
00478 ACE_TEXT("PersistenceUpdater::requestImage(): allocation failed.\n")));
00479 return;
00480 }
00481
00482 ACE_OS::memcpy(buf, actor->pubsubQos.second.second, qos_len);
00483
00484 BinSeq in_pubsub_seq(qos_len, buf);
00485 QosSeq pubsub_qos(actor->pubsubQos.first, in_pubsub_seq);
00486
00487 qos_len = actor->drdwQos.second.first;
00488 ACE_NEW_NORETURN(buf, char[qos_len]);
00489 qos_sequences.push_back(ArrDelAdapter<char>(buf));
00490
00491 if (buf == 0) {
00492 ACE_ERROR((LM_ERROR,
00493 ACE_TEXT("PersistenceUpdater::requestImage(): allocation failed.\n")));
00494 return;
00495 }
00496
00497 ACE_OS::memcpy(buf, actor->drdwQos.second.second, qos_len);
00498
00499 BinSeq in_drdw_seq(qos_len, buf);
00500 QosSeq drdw_qos(actor->drdwQos.first, in_drdw_seq);
00501
00502 qos_len = actor->transportInterfaceInfo.first;
00503 ACE_NEW_NORETURN(buf, char[qos_len]);
00504 qos_sequences.push_back(ArrDelAdapter<char>(buf));
00505
00506 if (buf == 0) {
00507 ACE_ERROR((LM_ERROR,
00508 ACE_TEXT("PersistenceUpdater::requestImage(): allocation failed.\n")));
00509 return;
00510 }
00511
00512 ACE_OS::memcpy(buf, actor->transportInterfaceInfo.second, qos_len);
00513
00514 BinSeq in_transport_seq(qos_len, buf);
00515
00516 ContentSubscriptionBin in_csp_bin;
00517 if (actor->type == DataReader) {
00518 in_csp_bin.filterClassName = actor->contentSubscriptionProfile.filterClassName;
00519 in_csp_bin.filterExpr = actor->contentSubscriptionProfile.filterExpr;
00520 BinSeq& params = in_csp_bin.exprParams;
00521 ACE_NEW_NORETURN(params.second, char[params.first]);
00522 if (params.second == 0) {
00523 ACE_ERROR((LM_ERROR,
00524 ACE_TEXT("PersistenceUpdater::requestImage(): allocation ")
00525 ACE_TEXT("failed.\n")));
00526 return;
00527 }
00528 qos_sequences.push_back(ArrDelAdapter<char>(params.second));
00529 ACE_OS::memcpy(params.second,
00530 actor->contentSubscriptionProfile.exprParams.second, params.first);
00531 }
00532
00533 DActor dActor(actor->domainId, actor->actorId, actor->topicId
00534 , actor->participantId
00535 , actor->type, actor->callback.c_str()
00536 , pubsub_qos, drdw_qos, in_transport_seq, in_csp_bin);
00537 image.actors.push_back(dActor);
00538 }
00539
00540 um_->pushImage(image);
00541 }
00542
00543 void
00544 PersistenceUpdater::create(const UTopic& topic)
00545 {
00546
00547 TAO_OutputCDR outCdr;
00548 outCdr << topic.topicQos;
00549 ACE_Message_Block dst;
00550 ACE_CDR::consolidate(&dst, outCdr.begin());
00551
00552 size_t len = dst.length();
00553 char *buf;
00554 ACE_NEW_NORETURN(buf, char[len]);
00555 ArrDelAdapter<char> guard(buf);
00556
00557 if (buf == 0) {
00558 ACE_ERROR((LM_ERROR,
00559 ACE_TEXT("PersistenceUpdater::create( UTopic): allocation failed.\n")));
00560 return;
00561 }
00562
00563 ACE_OS::memcpy(buf, dst.base(), len);
00564
00565 BinSeq qos_bin(len, buf);
00566
00567 QosSeq p(TopicQos, qos_bin);
00568 DTopic topic_data(topic.domainId, topic.topicId, topic.participantId
00569 , topic.name.c_str(), topic.dataType.c_str(), p);
00570
00571
00572 void* buffer;
00573 ACE_ALLOCATOR(buffer, allocator_->malloc
00574 (sizeof(PersistenceUpdater::Topic)));
00575
00576
00577 PersistenceUpdater::Topic* persistent_data
00578 = new(buffer) PersistenceUpdater::Topic(topic_data, allocator_);
00579
00580 IdType_ExtId ext(topic_data.topicId);
00581
00582
00583 if (topic_index_->bind(ext, persistent_data, allocator_) != 0) {
00584 allocator_->free((void *) buffer);
00585 return;
00586 }
00587 }
00588
00589 void
00590 PersistenceUpdater::create(const UParticipant& participant)
00591 {
00592
00593 TAO_OutputCDR outCdr;
00594 outCdr << participant.participantQos;
00595 ACE_Message_Block dst;
00596 ACE_CDR::consolidate(&dst, outCdr.begin());
00597
00598 size_t len = dst.length();
00599 char *buf;
00600 ACE_NEW_NORETURN(buf, char[len]);
00601 ArrDelAdapter<char> guard(buf);
00602
00603 if (buf == 0) {
00604 ACE_ERROR((LM_ERROR,
00605 ACE_TEXT("PersistenceUpdater::create( UParticipant): allocation failed.\n")));
00606 return;
00607 }
00608
00609 ACE_OS::memcpy(buf, dst.base(), len);
00610
00611 BinSeq qos_bin(len, buf);
00612
00613 QosSeq p(ParticipantQos, qos_bin);
00614 DParticipant participant_data
00615 (participant.domainId, participant.owner, participant.participantId, p);
00616
00617
00618 void* buffer;
00619 ACE_ALLOCATOR(buffer, allocator_->malloc
00620 (sizeof(PersistenceUpdater::Participant)));
00621
00622
00623 PersistenceUpdater::Participant* persistent_data
00624 = new(buffer) PersistenceUpdater::Participant(participant_data
00625 , allocator_);
00626
00627 IdType_ExtId ext(participant_data.participantId);
00628
00629
00630 if (participant_index_->bind(ext, persistent_data, allocator_) != 0) {
00631 allocator_->free((void *) buffer);
00632 return;
00633 }
00634 }
00635
00636 void
00637 PersistenceUpdater::create(const URActor& actor)
00638 {
00639 TAO_OutputCDR outCdr;
00640 outCdr << actor.pubsubQos;
00641 ACE_Message_Block dst;
00642 ACE_CDR::consolidate(&dst, outCdr.begin());
00643
00644 size_t len = dst.length();
00645 char *buf = new (std::nothrow) char[len];
00646
00647 if (buf == 0) {
00648 ACE_ERROR((LM_ERROR, "PersistenceUpdater::create( subscription): allocation failed.\n"));
00649 return;
00650 }
00651
00652 ArrDelAdapter<char> guard(buf);
00653
00654 ACE_OS::memcpy(buf, dst.base(), len);
00655
00656 BinSeq pubsub_qos_bin(len, buf);
00657 QosSeq pubsub_qos(SubscriberQos, pubsub_qos_bin);
00658
00659 outCdr.reset();
00660 outCdr << actor.drdwQos;
00661 ACE_Message_Block dst2;
00662 ACE_CDR::consolidate(&dst2, outCdr.begin());
00663
00664 len = dst2.length();
00665 char *buf2 = new (std::nothrow) char[len];
00666
00667 if (buf2 == 0) {
00668 ACE_ERROR((LM_ERROR, "PersistenceUpdater::create( subscription): allocation failed.\n"));
00669 return;
00670 }
00671
00672 ArrDelAdapter<char> guard2(buf2);
00673
00674 ACE_OS::memcpy(buf2, dst2.base(), len);
00675
00676 BinSeq dwdr_qos_bin(len, buf2);
00677 QosSeq dwdr_qos(DataReaderQos, dwdr_qos_bin);
00678
00679 outCdr.reset();
00680 outCdr << actor.transportInterfaceInfo;
00681 ACE_Message_Block dst3;
00682 ACE_CDR::consolidate(&dst3, outCdr.begin());
00683
00684 len = dst3.length();
00685 char *buf3 = new (std::nothrow) char[len];
00686
00687 if (buf3 == 0) {
00688 ACE_ERROR((LM_ERROR, "PersistenceUpdater::create( subscription) allocation failed.\n"));
00689 return;
00690 }
00691
00692 ArrDelAdapter<char> guard3(buf3);
00693
00694 ACE_OS::memcpy(buf3, dst3.base(), len);
00695 BinSeq tr_bin(len, buf3);
00696
00697 outCdr.reset();
00698 outCdr << actor.contentSubscriptionProfile.exprParams;
00699 ACE_Message_Block dst4;
00700 ACE_CDR::consolidate(&dst4, outCdr.begin());
00701 len = dst4.length();
00702 char* buf4 = new (std::nothrow) char[len];
00703 if (buf4 == 0) {
00704 ACE_ERROR((LM_ERROR, "PersistenceUpdater::create( subscription) allocation failed.\n"));
00705 return;
00706 }
00707 ArrDelAdapter<char> guard4(buf4);
00708
00709 ACE_OS::memcpy(buf4, dst4.base(), len);
00710 ContentSubscriptionBin csp_bin;
00711 csp_bin.filterClassName = actor.contentSubscriptionProfile.filterClassName;
00712 csp_bin.filterExpr = actor.contentSubscriptionProfile.filterExpr;
00713 csp_bin.exprParams = std::make_pair(len, buf4);
00714
00715 DActor actor_data(actor.domainId, actor.actorId, actor.topicId
00716 , actor.participantId
00717 , DataReader, actor.callback.c_str(), pubsub_qos
00718 , dwdr_qos, tr_bin, csp_bin);
00719
00720
00721 void* buffer;
00722 ACE_ALLOCATOR(buffer, allocator_->malloc
00723 (sizeof(PersistenceUpdater::RWActor)));
00724
00725
00726 PersistenceUpdater::RWActor* persistent_data =
00727 new(buffer) PersistenceUpdater::RWActor(actor_data
00728 , allocator_);
00729
00730 IdType_ExtId ext(actor.actorId);
00731
00732
00733 if (actor_index_->bind(ext, persistent_data, allocator_) != 0) {
00734 allocator_->free((void *) buffer);
00735 return;
00736 }
00737 }
00738
00739 void
00740 PersistenceUpdater::create(const UWActor& actor)
00741 {
00742 TAO_OutputCDR outCdr;
00743 outCdr << actor.pubsubQos;
00744 ACE_Message_Block dst;
00745 ACE_CDR::consolidate(&dst, outCdr.begin());
00746
00747 size_t len = dst.length();
00748 char *buf = new (std::nothrow) char[len];
00749
00750 if (buf == 0) {
00751 ACE_ERROR((LM_ERROR, "PersistenceUpdater::create( publication): allocation failed.\n"));
00752 return;
00753 }
00754
00755 ArrDelAdapter<char> guard(buf);
00756
00757 ACE_OS::memcpy(buf, dst.base(), len);
00758
00759 BinSeq pubsub_qos_bin(len, buf);
00760 QosSeq pubsub_qos(PublisherQos, pubsub_qos_bin);
00761
00762 outCdr.reset();
00763 outCdr << actor.drdwQos;
00764 ACE_Message_Block dst2;
00765 ACE_CDR::consolidate(&dst2, outCdr.begin());
00766
00767 len = dst2.length();
00768 char *buf2 = new (std::nothrow) char[len];
00769
00770 if (buf2 == 0) {
00771 ACE_ERROR((LM_ERROR, "PersistenceUpdater::create( publication): allocation failed.\n"));
00772 return;
00773 }
00774
00775 ArrDelAdapter<char> guard2(buf2);
00776
00777 ACE_OS::memcpy(buf2, dst2.base(), len);
00778
00779 BinSeq dwdr_qos_bin(len, buf2);
00780 QosSeq dwdr_qos(DataWriterQos, dwdr_qos_bin);
00781
00782 outCdr.reset();
00783 outCdr << actor.transportInterfaceInfo;
00784 ACE_Message_Block dst3;
00785 ACE_CDR::consolidate(&dst3, outCdr.begin());
00786
00787 len = dst3.length();
00788 char *buf3 = new (std::nothrow) char[len];
00789
00790 if (buf3 == 0) {
00791 ACE_ERROR((LM_ERROR, "PersistenceUpdater::create( publication): allocation failed.\n"));
00792 return;
00793 }
00794
00795 ArrDelAdapter<char> guard3(buf3);
00796
00797 ACE_OS::memcpy(buf3, dst3.base(), len);
00798 BinSeq tr_bin(len, buf3);
00799
00800 DActor actor_data(actor.domainId, actor.actorId, actor.topicId
00801 , actor.participantId
00802 , DataWriter, actor.callback.c_str(), pubsub_qos
00803 , dwdr_qos, tr_bin, ContentSubscriptionBin());
00804
00805
00806 void* buffer;
00807 ACE_ALLOCATOR(buffer, allocator_->malloc
00808 (sizeof(PersistenceUpdater::RWActor)));
00809
00810
00811 PersistenceUpdater::RWActor* persistent_data =
00812 new(buffer) PersistenceUpdater::RWActor(actor_data
00813 , allocator_);
00814
00815 IdType_ExtId ext(actor.actorId);
00816
00817
00818 if (actor_index_->bind(ext, persistent_data, allocator_) != 0) {
00819 allocator_->free((void *) buffer);
00820 return;
00821 }
00822 }
00823
00824 void
00825 PersistenceUpdater::create(const OwnershipData& )
00826 {
00827
00828 }
00829
00830 void
00831 PersistenceUpdater::update(const IdPath& id, const DDS::DomainParticipantQos& qos)
00832 {
00833 IdType_ExtId ext(id.id);
00834 PersistenceUpdater::Participant* part_data = 0;
00835
00836 if (this->participant_index_->find(ext, part_data, this->allocator_) == 0) {
00837 TAO_OutputCDR outCdr;
00838 outCdr << qos;
00839 ACE_Message_Block dst;
00840 ACE_CDR::consolidate(&dst, outCdr.begin());
00841
00842 this->storeUpdate(dst, part_data->participantQos.second);
00843
00844 } else {
00845 OpenDDS::DCPS::RepoIdConverter converter(id.id);
00846 ACE_ERROR((LM_ERROR,
00847 ACE_TEXT("(%P|%t) PersistenceUpdater::update: ")
00848 ACE_TEXT("participant %C not found\n"),
00849 std::string(converter).c_str()));
00850 }
00851 }
00852
00853 void
00854 PersistenceUpdater::update(const IdPath& id, const DDS::TopicQos& qos)
00855 {
00856 IdType_ExtId ext(id.id);
00857 PersistenceUpdater::Topic* topic_data = 0;
00858
00859 if (this->topic_index_->find(ext, topic_data, this->allocator_) == 0) {
00860 TAO_OutputCDR outCdr;
00861 outCdr << qos;
00862 ACE_Message_Block dst;
00863 ACE_CDR::consolidate(&dst, outCdr.begin());
00864
00865 this->storeUpdate(dst, topic_data->topicQos.second);
00866
00867 } else {
00868 OpenDDS::DCPS::RepoIdConverter converter(id.id);
00869 ACE_ERROR((LM_ERROR,
00870 ACE_TEXT("(%P|%t) PersistenceUpdater::update: ")
00871 ACE_TEXT("topic %C not found\n"),
00872 std::string(converter).c_str()));
00873 }
00874 }
00875
00876 void
00877 PersistenceUpdater::update(const IdPath& id, const DDS::DataWriterQos& qos)
00878 {
00879 IdType_ExtId ext(id.id);
00880 PersistenceUpdater::RWActor* actor_data = 0;
00881
00882 if (this->actor_index_->find(ext, actor_data, this->allocator_) == 0) {
00883 TAO_OutputCDR outCdr;
00884 outCdr << qos;
00885 ACE_Message_Block dst;
00886 ACE_CDR::consolidate(&dst, outCdr.begin());
00887
00888 this->storeUpdate(dst, actor_data->drdwQos.second);
00889
00890 } else {
00891 OpenDDS::DCPS::RepoIdConverter converter(id.id);
00892 ACE_ERROR((LM_ERROR,
00893 ACE_TEXT("(%P|%t) PersistenceUpdater::update(writerQos): ")
00894 ACE_TEXT("publication %C not found\n"),
00895 std::string(converter).c_str()));
00896 }
00897 }
00898
00899 void
00900 PersistenceUpdater::update(const IdPath& id, const DDS::PublisherQos& qos)
00901 {
00902 IdType_ExtId ext(id.id);
00903 PersistenceUpdater::RWActor* actor_data = 0;
00904
00905 if (this->actor_index_->find(ext, actor_data, this->allocator_) == 0) {
00906 TAO_OutputCDR outCdr;
00907 outCdr << qos;
00908 ACE_Message_Block dst;
00909 ACE_CDR::consolidate(&dst, outCdr.begin());
00910
00911 this->storeUpdate(dst, actor_data->pubsubQos.second);
00912
00913 } else {
00914 OpenDDS::DCPS::RepoIdConverter converter(id.id);
00915 ACE_ERROR((LM_ERROR,
00916 ACE_TEXT("(%P|%t) PersistenceUpdater::update(publisherQos): ")
00917 ACE_TEXT("publication %C not found\n"),
00918 std::string(converter).c_str()));
00919 }
00920 }
00921
00922 void
00923 PersistenceUpdater::update(const IdPath& id, const DDS::DataReaderQos& qos)
00924 {
00925 IdType_ExtId ext(id.id);
00926 PersistenceUpdater::RWActor* actor_data = 0;
00927
00928 if (this->actor_index_->find(ext, actor_data, this->allocator_) == 0) {
00929 TAO_OutputCDR outCdr;
00930 outCdr << qos;
00931 ACE_Message_Block dst;
00932 ACE_CDR::consolidate(&dst, outCdr.begin());
00933
00934 this->storeUpdate(dst, actor_data->drdwQos.second);
00935
00936 } else {
00937 OpenDDS::DCPS::RepoIdConverter converter(id.id);
00938 ACE_ERROR((LM_ERROR,
00939 ACE_TEXT("(%P|%t) PersistenceUpdater::update(readerQos): ")
00940 ACE_TEXT("subscription %C not found\n"),
00941 std::string(converter).c_str()));
00942 }
00943 }
00944
00945 void
00946 PersistenceUpdater::update(const IdPath& id, const DDS::SubscriberQos& qos)
00947 {
00948 IdType_ExtId ext(id.id);
00949 PersistenceUpdater::RWActor* actor_data = 0;
00950
00951 if (this->actor_index_->find(ext, actor_data, this->allocator_) == 0) {
00952 TAO_OutputCDR outCdr;
00953 outCdr << qos;
00954 ACE_Message_Block dst;
00955 ACE_CDR::consolidate(&dst, outCdr.begin());
00956
00957 this->storeUpdate(dst, actor_data->pubsubQos.second);
00958
00959 } else {
00960 OpenDDS::DCPS::RepoIdConverter converter(id.id);
00961 ACE_ERROR((LM_ERROR,
00962 ACE_TEXT("(%P|%t) PersistenceUpdater::update(subscriberQos): ")
00963 ACE_TEXT("subscription %C not found\n"),
00964 std::string(converter).c_str()));
00965 }
00966 }
00967
00968 void
00969 PersistenceUpdater::update(const IdPath& id, const DDS::StringSeq& exprParams)
00970 {
00971 IdType_ExtId ext(id.id);
00972 PersistenceUpdater::RWActor* actor_data = 0;
00973
00974 if (actor_index_->find(ext, actor_data, allocator_) == 0) {
00975 TAO_OutputCDR outCdr;
00976 outCdr << exprParams;
00977 ACE_Message_Block dst;
00978 ACE_CDR::consolidate(&dst, outCdr.begin());
00979
00980 storeUpdate(dst, actor_data->contentSubscriptionProfile.exprParams);
00981
00982 } else {
00983 OpenDDS::DCPS::RepoIdConverter converter(id.id);
00984 ACE_ERROR((LM_ERROR,
00985 ACE_TEXT("(%P|%t) PersistenceUpdater::update(readerQos): ")
00986 ACE_TEXT("subscription %C not found\n"),
00987 std::string(converter).c_str()));
00988 }
00989 }
00990
00991 void
00992 PersistenceUpdater::destroy(const IdPath& id, ItemType type, ActorType)
00993 {
00994 IdType_ExtId ext(id.id);
00995 PersistenceUpdater::Topic* topic = 0;
00996 PersistenceUpdater::Participant* participant = 0;
00997 PersistenceUpdater::RWActor* actor = 0;
00998
00999 switch (type) {
01000 case Update::Topic:
01001
01002 if (topic_index_->unbind(ext, topic, allocator_) == 0) {
01003 topic->cleanup(allocator_);
01004 allocator_->free((void *) topic);
01005 }
01006
01007 break;
01008 case Update::Participant:
01009
01010 if (participant_index_->unbind(ext, participant, allocator_) == 0) {
01011 participant->cleanup(allocator_);
01012 allocator_->free((void *) participant);
01013 }
01014
01015 break;
01016 case Update::Actor:
01017
01018 if (actor_index_->unbind(ext, actor, allocator_) == 0) {
01019 actor->cleanup(allocator_);
01020 allocator_->free((void *) actor);
01021 }
01022
01023 break;
01024 default: {
01025 OpenDDS::DCPS::RepoIdConverter converter(id.id);
01026 ACE_ERROR((LM_ERROR,
01027 ACE_TEXT("(%P | %t) PersistenceUpdater::destroy: ")
01028 ACE_TEXT("unknown entity - %C.\n"),
01029 std::string(converter).c_str()));
01030 }
01031 }
01032 }
01033
01034 void
01035 PersistenceUpdater::storeUpdate(const ACE_Message_Block& data, BinSeq& storage)
01036 {
01037 size_t len = data.length();
01038
01039 void* buffer;
01040 ACE_ALLOCATOR(buffer, this->allocator_->malloc(len));
01041 ACE_OS::memcpy(buffer, data.base(), len);
01042
01043 storage.first = len;
01044 storage.second = static_cast<char*>(buffer);
01045 }
01046
01047 }
01048
01049 int
01050 PersistenceUpdaterSvc_Loader::init()
01051 {
01052 return ACE_Service_Config::process_directive
01053 (ace_svc_desc_PersistenceUpdaterSvc);
01054 return 0;
01055 }
01056
01057
01058 ACE_FACTORY_DEFINE(ACE_Local_Service, PersistenceUpdaterSvc)
01059
01060 ACE_STATIC_SVC_DEFINE(PersistenceUpdaterSvc,
01061 ACE_TEXT("PersistenceUpdaterSvc"),
01062 ACE_SVC_OBJ_T,
01063 &ACE_SVC_NAME(PersistenceUpdaterSvc),
01064 ACE_Service_Type::DELETE_THIS |
01065 ACE_Service_Type::DELETE_OBJ,
01066 0)
01067
01068 ACE_STATIC_SVC_REQUIRE(PersistenceUpdaterSvc)