00001
00002
00003
00004
00005
00006
00007
00008 #include "DcpsInfo_pch.h"
00009
00010 #include "PersistenceUpdater.h"
00011 #include "UpdateManager.h"
00012 #include "ArrDelAdapter.h"
00013
00014 #include "dds/DCPS/RepoIdConverter.h"
00015 #include "dds/DCPS/GuidUtils.h"
00016 #include "dds/DCPS/debug.h"
00017
00018 #include "ace/Malloc_T.h"
00019 #include "ace/MMAP_Memory_Pool.h"
00020 #include "ace/OS_NS_strings.h"
00021 #include "ace/Svc_Handler.h"
00022 #include "ace/Dynamic_Service.h"
00023
00024 #include <algorithm>
00025
00026 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00027
00028 namespace {
00029 void assign(Update::BinSeq& to, const Update::BinSeq& from,
00030 Update::PersistenceUpdater::ALLOCATOR* allocator)
00031 {
00032 const size_t len = from.first;
00033 void* out_buf;
00034 ACE_ALLOCATOR(out_buf, allocator->malloc(len));
00035 ACE_OS::memcpy(out_buf, from.second, len);
00036 to = std::make_pair(len, static_cast<char*>(out_buf));
00037 }
00038
00039 void assign(ACE_CString& to, const char* from,
00040 Update::PersistenceUpdater::ALLOCATOR* allocator)
00041 {
00042 const size_t len = ACE_OS::strlen (from) + 1;
00043 void* out_buf;
00044 ACE_ALLOCATOR(out_buf, allocator->malloc(len));
00045 ACE_OS::memcpy(out_buf, from, len);
00046 to.set(static_cast<char*>(out_buf), len - 1, false);
00047 }
00048 }
00049
00050 namespace Update {
00051
00052
00053
00054 template<>
00055 struct TopicStrt<QosSeq, ACE_CString> {
00056 DDS::DomainId_t domainId;
00057 IdType topicId;
00058 IdType participantId;
00059 ACE_CString name;
00060 ACE_CString dataType;
00061 QosSeq topicQos;
00062
00063 TopicStrt(const DTopic& topic,
00064 PersistenceUpdater::ALLOCATOR* allocator)
00065 : domainId(topic.domainId),
00066 topicId(topic.topicId),
00067 participantId(topic.participantId)
00068 {
00069 assign(name, topic.name.c_str(), allocator);
00070 assign(dataType, topic.dataType.c_str(), allocator);
00071
00072 topicQos.first = TopicQos;
00073 assign(topicQos.second, topic.topicQos.second, allocator);
00074 }
00075
00076 void cleanup(PersistenceUpdater::ALLOCATOR* allocator)
00077 {
00078 if (name.length() > 0)
00079 {
00080 char* strMemory = const_cast<char*>(name.fast_rep());
00081 name.fast_clear();
00082 allocator->free(strMemory);
00083 }
00084 if (dataType.length() > 0)
00085 {
00086 char* strMemory = const_cast<char*>(dataType.fast_rep());
00087 dataType.fast_clear();
00088 allocator->free(strMemory);
00089 }
00090
00091 allocator->free(topicQos.second.second);
00092 }
00093 };
00094
00095 template<>
00096 struct ParticipantStrt<QosSeq> {
00097 DDS::DomainId_t domainId;
00098 long owner;
00099 IdType participantId;
00100 QosSeq participantQos;
00101
00102 ParticipantStrt(const DDS::DomainId_t& dId,
00103 long own,
00104 const IdType& pId,
00105 const QosSeq& pQos)
00106 : domainId(dId),
00107 owner(own),
00108 participantId(pId),
00109 participantQos(pQos) {}
00110
00111 ParticipantStrt(const DParticipant& participant,
00112 PersistenceUpdater::ALLOCATOR* allocator)
00113 : domainId(participant.domainId),
00114 owner(participant.owner),
00115 participantId(participant.participantId)
00116 {
00117 participantQos.first = ParticipantQos;
00118 assign(participantQos.second, participant.participantQos.second, allocator);
00119 }
00120
00121 void cleanup(PersistenceUpdater::ALLOCATOR* allocator)
00122 {
00123 allocator->free(participantQos.second.second);
00124 }
00125 };
00126
00127 template<>
00128 struct ActorStrt<QosSeq, QosSeq,
00129 ACE_CString, BinSeq, ContentSubscriptionBin> {
00130 DDS::DomainId_t domainId;
00131 IdType actorId;
00132 IdType topicId;
00133 IdType participantId;
00134 ActorType type;
00135 ACE_CString callback;
00136 QosSeq pubsubQos;
00137 QosSeq drdwQos;
00138 BinSeq transportInterfaceInfo;
00139 ContentSubscriptionBin contentSubscriptionProfile;
00140
00141 ActorStrt(const DActor& actor,
00142 PersistenceUpdater::ALLOCATOR* allocator)
00143 : domainId(actor.domainId),
00144 actorId(actor.actorId),
00145 topicId(actor.topicId),
00146 participantId(actor.participantId), type(actor.type)
00147 {
00148 assign(callback, actor.callback.c_str(), allocator);
00149
00150 pubsubQos.first = actor.pubsubQos.first;
00151 assign(pubsubQos.second, actor.pubsubQos.second, allocator);
00152
00153 drdwQos.first = actor.drdwQos.first;
00154 assign(drdwQos.second, actor.drdwQos.second, allocator);
00155
00156 assign(transportInterfaceInfo, actor.transportInterfaceInfo, allocator);
00157
00158 contentSubscriptionProfile.filterClassName =
00159 ACE_CString(actor.contentSubscriptionProfile.filterClassName.c_str(),
00160 allocator);
00161 contentSubscriptionProfile.filterExpr =
00162 ACE_CString(actor.contentSubscriptionProfile.filterExpr.c_str(),
00163 allocator);
00164 assign(contentSubscriptionProfile.exprParams,
00165 actor.contentSubscriptionProfile.exprParams, allocator);
00166 }
00167
00168 void cleanup(PersistenceUpdater::ALLOCATOR* allocator)
00169 {
00170 if (callback.length() > 0)
00171 {
00172 char* strMemory = const_cast<char*>(callback.fast_rep());
00173 callback.fast_clear();
00174 allocator->free(strMemory);
00175 }
00176
00177 allocator->free(pubsubQos.second.second);
00178 allocator->free(drdwQos.second.second);
00179 allocator->free(transportInterfaceInfo.second);
00180 allocator->free(contentSubscriptionProfile.exprParams.second);
00181 }
00182 };
00183
00184 PersistenceUpdater::IdType_ExtId::IdType_ExtId()
00185 : id_(OpenDDS::DCPS::GUID_UNKNOWN)
00186 {}
00187
00188 PersistenceUpdater::IdType_ExtId::IdType_ExtId(IdType id)
00189 : id_(id)
00190 {}
00191
00192 PersistenceUpdater::IdType_ExtId::IdType_ExtId(const IdType_ExtId& ext)
00193 : id_(ext.id_)
00194 {}
00195
00196 void
00197 PersistenceUpdater::IdType_ExtId::operator= (const IdType_ExtId& ext)
00198 {
00199 id_ = ext.id_;
00200 }
00201
00202 bool
00203 PersistenceUpdater::IdType_ExtId::operator== (const IdType_ExtId& ext) const
00204 {
00205 return (id_ == ext.id_);
00206 }
00207
00208 unsigned long
00209 PersistenceUpdater::IdType_ExtId::hash() const
00210 {
00211 return OpenDDS::DCPS::RepoIdConverter(id_).checksum();
00212 };
00213
00214 PersistenceUpdater::PersistenceUpdater()
00215 : persistence_file_(ACE_TEXT("InforepoPersist"))
00216 , reset_(false)
00217 , um_(0)
00218 , topic_index_(0)
00219 , participant_index_(0)
00220 , actor_index_(0)
00221 , last_part_id_(0)
00222 {}
00223
00224 PersistenceUpdater::~PersistenceUpdater()
00225 {}
00226
00227
00228 void* createIndex(const std::string& tag
00229 , PersistenceUpdater::ALLOCATOR& allocator
00230 , size_t size, bool& exists)
00231 {
00232 void* index = 0;
00233
00234
00235
00236 if (allocator.find(tag.c_str(), index) == 0) {
00237 exists = true;
00238 return index;
00239
00240 } else {
00241 ACE_ALLOCATOR_RETURN(index, allocator.malloc(size), 0);
00242
00243 if (allocator.bind(tag.c_str(), index) == -1) {
00244 allocator.free(index);
00245 index = 0;
00246 }
00247 }
00248
00249 if (!index) {
00250 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: PersistenceUpdater::init "
00251 "Initial allocation/Bind failed for %C.\n"), tag.c_str()));
00252 }
00253
00254 return index;
00255 }
00256
00257 template<typename I> void
00258 index_cleanup(I* index
00259 , PersistenceUpdater::ALLOCATOR* allocator)
00260 {
00261 for (typename I::ITERATOR iter = index->begin()
00262 ; iter != index->end();) {
00263 typename I::ITERATOR current_iter = iter;
00264 iter++;
00265
00266 if (index->unbind((*current_iter).ext_id_, allocator) != 0) {
00267 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: PersistenceUpdater::init:"
00268 "Index unbind failed.\n")));
00269 }
00270 }
00271 }
00272
00273 int
00274 PersistenceUpdater::init(int argc, ACE_TCHAR *argv[])
00275 {
00276
00277 um_ = ACE_Dynamic_Service<Update::Manager>::instance("UpdateManagerSvc");
00278
00279 if (um_ == 0) {
00280 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: PersistenceUpdater::init ")
00281 ACE_TEXT("No UpdateManager discovered.\n")));
00282 return -1;
00283 }
00284
00285 this->parse(argc, argv);
00286
00287 #if defined ACE_HAS_MAC_OSX && defined __x86_64__ && __x86_64__
00288 ACE_MMAP_Memory_Pool::OPTIONS options((void*)0x200000000);
00289 #else
00290 ACE_MMAP_Memory_Pool::OPTIONS options(ACE_DEFAULT_BASE_ADDR);
00291 #endif
00292
00293
00294
00295 ALLOCATOR* allocator;
00296 ACE_NEW_RETURN(allocator,
00297 ALLOCATOR(persistence_file_.c_str(),
00298 persistence_file_.c_str(),
00299 &options),
00300 -1);
00301 allocator_.reset(allocator);
00302
00303 bool exists = false;
00304
00305 char* topic_index = (char*)createIndex("TopicIndex", *allocator_
00306 , sizeof(TopicIndex), exists);
00307 if (!topic_index) {
00308 return -1;
00309 }
00310
00311 char* participant_index = (char*)createIndex("ParticipantIndex", *allocator_
00312 , sizeof(ParticipantIndex), exists);
00313 if (!participant_index) {
00314 return -1;
00315 }
00316
00317 char* actor_index = (char*)createIndex("ActorIndex", *allocator_
00318 , sizeof(ActorIndex), exists);
00319 if (!actor_index) {
00320 return -1;
00321 }
00322
00323 void* last_part_id = createIndex(
00324 "LastParticipantId", *allocator_, sizeof(PartIdType), exists);
00325 if (!last_part_id) {
00326 return -1;
00327 }
00328 last_part_id_ = reinterpret_cast<PartIdType*>(last_part_id);
00329
00330 if (exists) {
00331 topic_index_ = reinterpret_cast<TopicIndex*>(topic_index);
00332 participant_index_ = reinterpret_cast<ParticipantIndex*>(participant_index);
00333 actor_index_ = reinterpret_cast<ActorIndex*>(actor_index);
00334 } else {
00335 topic_index_ = new(topic_index) TopicIndex(allocator_.get());
00336 participant_index_ = new(participant_index) ParticipantIndex(allocator_.get());
00337 actor_index_ = new(actor_index) ActorIndex(allocator_.get());
00338 *last_part_id_ = 0;
00339 }
00340
00341 if (reset_) {
00342 index_cleanup(topic_index_, allocator_.get());
00343 index_cleanup(participant_index_, allocator_.get());
00344 index_cleanup(actor_index_, allocator_.get());
00345 *last_part_id_ = 0;
00346 }
00347
00348
00349 um_->add(this);
00350
00351 return 0;
00352 }
00353
00354 int
00355 PersistenceUpdater::parse(int argc, ACE_TCHAR *argv[])
00356 {
00357 for (ssize_t count = 0; count < argc; count++) {
00358 if (ACE_OS::strcasecmp(argv[count], ACE_TEXT("-file")) == 0) {
00359 if ((count + 1) < argc) {
00360 persistence_file_ = argv[count+1];
00361 count++;
00362 }
00363
00364 } else if (ACE_OS::strcasecmp(argv[count], ACE_TEXT("-reset")) == 0) {
00365 if ((count + 1) < argc) {
00366 int val = ACE_OS::atoi(argv[count+1]);
00367 reset_ = true;
00368
00369 if (val == 0) {
00370 reset_ = false;
00371 }
00372
00373 count++;
00374 }
00375
00376 } else {
00377 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) PersistenceUpdater::parse: Unknown option %s\n")
00378 , argv[count]));
00379 return -1;
00380 }
00381 }
00382
00383 return 0;
00384 }
00385
00386 int
00387 PersistenceUpdater::fini()
00388 {
00389 return 0;
00390 }
00391
00392 int
00393 PersistenceUpdater::svc()
00394 {
00395 return 0;
00396 }
00397
00398 void
00399 PersistenceUpdater::requestImage()
00400 {
00401 if (um_ == NULL) {
00402 return;
00403 }
00404
00405 DImage image;
00406
00407
00408 std::vector<ArrDelAdapter<char> > qos_sequences;
00409
00410 for (ParticipantIndex::ITERATOR iter = participant_index_->begin();
00411 iter != participant_index_->end(); iter++) {
00412 const PersistenceUpdater::Participant* participant
00413 = (*iter).int_id_;
00414
00415 size_t qos_len = participant->participantQos.second.first;
00416 char *buf;
00417 ACE_NEW_NORETURN(buf, char[qos_len]);
00418 qos_sequences.push_back(ArrDelAdapter<char>(buf));
00419
00420 if (buf == 0) {
00421 ACE_ERROR((LM_ERROR,
00422 ACE_TEXT("(%P|%t) PersistenceUpdater::requestImage(): allocation failed.\n")));
00423 return;
00424 }
00425
00426 ACE_OS::memcpy(buf, participant->participantQos.second.second, qos_len);
00427
00428 BinSeq in_seq(qos_len, buf);
00429 QosSeq qos(ParticipantQos, in_seq);
00430 DParticipant dparticipant(participant->domainId
00431 , participant->owner
00432 , participant->participantId
00433 , qos);
00434 image.participants.push_back(dparticipant);
00435 if (OpenDDS::DCPS::DCPS_debug_level >= 2) {
00436 OpenDDS::DCPS::RepoIdConverter conv(participant->participantId);
00437 ACE_DEBUG((LM_DEBUG,
00438 "(%P|%t) PersistenceUpdater::requestImage(): loaded participant %C\n",
00439 OPENDDS_STRING(conv).c_str()));
00440 }
00441 }
00442
00443 for (TopicIndex::ITERATOR iter = topic_index_->begin();
00444 iter != topic_index_->end(); iter++) {
00445 const PersistenceUpdater::Topic* topic = (*iter).int_id_;
00446
00447 size_t qos_len = topic->topicQos.second.first;
00448 char *buf;
00449 ACE_NEW_NORETURN(buf, char[qos_len]);
00450 qos_sequences.push_back(ArrDelAdapter<char>(buf));
00451
00452 if (buf == 0) {
00453 ACE_ERROR((LM_ERROR,
00454 ACE_TEXT("(%P|%t) PersistenceUpdater::requestImage(): allocation failed.\n")));
00455 return;
00456 }
00457
00458 ACE_OS::memcpy(buf, topic->topicQos.second.second, qos_len);
00459
00460 BinSeq in_seq(qos_len, buf);
00461 QosSeq qos(TopicQos, in_seq);
00462 DTopic dTopic(topic->domainId, topic->topicId
00463 , topic->participantId, topic->name.c_str()
00464 , topic->dataType.c_str(), qos);
00465 image.topics.push_back(dTopic);
00466 }
00467
00468 for (ActorIndex::ITERATOR iter = actor_index_->begin();
00469 iter != actor_index_->end(); iter++) {
00470 const PersistenceUpdater::RWActor* actor = (*iter).int_id_;
00471
00472 size_t qos_len = actor->pubsubQos.second.first;
00473 char *buf;
00474 ACE_NEW_NORETURN(buf, char[qos_len]);
00475 qos_sequences.push_back(ArrDelAdapter<char>(buf));
00476
00477 if (buf == 0) {
00478 ACE_ERROR((LM_ERROR,
00479 ACE_TEXT("(%P|%t) PersistenceUpdater::requestImage(): allocation failed.\n")));
00480 return;
00481 }
00482
00483 ACE_OS::memcpy(buf, actor->pubsubQos.second.second, qos_len);
00484
00485 BinSeq in_pubsub_seq(qos_len, buf);
00486 QosSeq pubsub_qos(actor->pubsubQos.first, in_pubsub_seq);
00487
00488 qos_len = actor->drdwQos.second.first;
00489 ACE_NEW_NORETURN(buf, char[qos_len]);
00490 qos_sequences.push_back(ArrDelAdapter<char>(buf));
00491
00492 if (buf == 0) {
00493 ACE_ERROR((LM_ERROR,
00494 ACE_TEXT("(%P|%t) PersistenceUpdater::requestImage(): allocation failed.\n")));
00495 return;
00496 }
00497
00498 ACE_OS::memcpy(buf, actor->drdwQos.second.second, qos_len);
00499
00500 BinSeq in_drdw_seq(qos_len, buf);
00501 QosSeq drdw_qos(actor->drdwQos.first, in_drdw_seq);
00502
00503 size_t transport_len = actor->transportInterfaceInfo.first;
00504 ACE_NEW_NORETURN(buf, char[transport_len]);
00505 qos_sequences.push_back(ArrDelAdapter<char>(buf));
00506
00507 if (buf == 0) {
00508 ACE_ERROR((LM_ERROR,
00509 ACE_TEXT("(%P|%t) PersistenceUpdater::requestImage(): allocation failed.\n")));
00510 return;
00511 }
00512
00513 ACE_OS::memcpy(buf, actor->transportInterfaceInfo.second, transport_len);
00514
00515 BinSeq in_transport_seq(transport_len, buf);
00516
00517 ContentSubscriptionBin in_csp_bin;
00518 if (actor->type == DataReader) {
00519 in_csp_bin.filterClassName = actor->contentSubscriptionProfile.filterClassName;
00520 in_csp_bin.filterExpr = actor->contentSubscriptionProfile.filterExpr;
00521 BinSeq& params = in_csp_bin.exprParams;
00522 ACE_NEW_NORETURN(params.second, char[params.first]);
00523 if (params.second == 0) {
00524 ACE_ERROR((LM_ERROR,
00525 ACE_TEXT("(%P|%t) PersistenceUpdater::requestImage(): allocation ")
00526 ACE_TEXT("failed.\n")));
00527 return;
00528 }
00529 qos_sequences.push_back(ArrDelAdapter<char>(params.second));
00530 ACE_OS::memcpy(params.second,
00531 actor->contentSubscriptionProfile.exprParams.second, params.first);
00532 }
00533
00534 DActor dActor(actor->domainId, actor->actorId, actor->topicId
00535 , actor->participantId
00536 , actor->type, actor->callback.c_str()
00537 , pubsub_qos, drdw_qos, in_transport_seq, in_csp_bin);
00538 image.actors.push_back(dActor);
00539 }
00540
00541 image.lastPartId = *last_part_id_;
00542
00543 um_->pushImage(image);
00544 }
00545
00546 void
00547 PersistenceUpdater::create(const UTopic& topic)
00548 {
00549
00550 TAO_OutputCDR outCdr;
00551 outCdr << topic.topicQos;
00552 ACE_Message_Block dst;
00553 ACE_CDR::consolidate(&dst, outCdr.begin());
00554
00555 size_t len = dst.length();
00556 char *buf;
00557 ACE_NEW_NORETURN(buf, char[len]);
00558 ArrDelAdapter<char> guard(buf);
00559
00560 if (buf == 0) {
00561 ACE_ERROR((LM_ERROR,
00562 ACE_TEXT("(%P|%t) PersistenceUpdater::create( UTopic): allocation failed.\n")));
00563 return;
00564 }
00565
00566 ACE_OS::memcpy(buf, dst.base(), len);
00567
00568 BinSeq qos_bin(len, buf);
00569
00570 QosSeq p(TopicQos, qos_bin);
00571 DTopic topic_data(topic.domainId, topic.topicId, topic.participantId
00572 , topic.name.c_str(), topic.dataType.c_str(), p);
00573
00574
00575 void* buffer;
00576 ACE_ALLOCATOR(buffer, allocator_->malloc
00577 (sizeof(PersistenceUpdater::Topic)));
00578
00579
00580 PersistenceUpdater::Topic* persistent_data
00581 = new(buffer) PersistenceUpdater::Topic(topic_data, allocator_.get());
00582
00583 IdType_ExtId ext(topic_data.topicId);
00584
00585
00586 if (topic_index_->bind(ext, persistent_data, allocator_.get()) != 0) {
00587 allocator_->free((void *) buffer);
00588 return;
00589 }
00590 }
00591
00592 void
00593 PersistenceUpdater::create(const UParticipant& participant)
00594 {
00595
00596 TAO_OutputCDR outCdr;
00597 outCdr << participant.participantQos;
00598 ACE_Message_Block dst;
00599 ACE_CDR::consolidate(&dst, outCdr.begin());
00600
00601 size_t len = dst.length();
00602 char *buf;
00603 ACE_NEW_NORETURN(buf, char[len]);
00604 ArrDelAdapter<char> guard(buf);
00605
00606 if (buf == 0) {
00607 ACE_ERROR((LM_ERROR,
00608 ACE_TEXT("(%P|%t) PersistenceUpdater::create( UParticipant): allocation failed.\n")));
00609 return;
00610 }
00611
00612 ACE_OS::memcpy(buf, dst.base(), len);
00613
00614 BinSeq qos_bin(len, buf);
00615
00616 QosSeq p(ParticipantQos, qos_bin);
00617 DParticipant participant_data
00618 (participant.domainId, participant.owner, participant.participantId, p);
00619
00620
00621 void* buffer;
00622 ACE_ALLOCATOR(buffer, allocator_->malloc
00623 (sizeof(PersistenceUpdater::Participant)));
00624
00625
00626 PersistenceUpdater::Participant* persistent_data
00627 = new(buffer) PersistenceUpdater::Participant(participant_data
00628 , allocator_.get());
00629
00630 IdType_ExtId ext(participant_data.participantId);
00631
00632
00633 if (participant_index_->bind(ext, persistent_data, allocator_.get()) != 0) {
00634 allocator_->free((void *) buffer);
00635 return;
00636 }
00637 }
00638
00639 void
00640 PersistenceUpdater::create(const URActor& actor)
00641 {
00642 TAO_OutputCDR outCdr;
00643 outCdr << actor.pubsubQos;
00644 ACE_Message_Block dst;
00645 ACE_CDR::consolidate(&dst, outCdr.begin());
00646
00647 size_t len = dst.length();
00648 char *buf = new (std::nothrow) char[len];
00649
00650 if (buf == 0) {
00651 ACE_ERROR((LM_ERROR, "(%P|%t) PersistenceUpdater::create( subscription): allocation failed.\n"));
00652 return;
00653 }
00654
00655 ArrDelAdapter<char> guard(buf);
00656
00657 ACE_OS::memcpy(buf, dst.base(), len);
00658
00659 BinSeq pubsub_qos_bin(len, buf);
00660 QosSeq pubsub_qos(SubscriberQos, pubsub_qos_bin);
00661
00662 outCdr.reset();
00663 outCdr << actor.drdwQos;
00664 ACE_Message_Block dst2;
00665 ACE_CDR::consolidate(&dst2, outCdr.begin());
00666
00667 len = dst2.length();
00668 char *buf2 = new (std::nothrow) char[len];
00669
00670 if (buf2 == 0) {
00671 ACE_ERROR((LM_ERROR, "(%P|%t) PersistenceUpdater::create( subscription): allocation failed.\n"));
00672 return;
00673 }
00674
00675 ArrDelAdapter<char> guard2(buf2);
00676
00677 ACE_OS::memcpy(buf2, dst2.base(), len);
00678
00679 BinSeq dwdr_qos_bin(len, buf2);
00680 QosSeq dwdr_qos(DataReaderQos, dwdr_qos_bin);
00681
00682 outCdr.reset();
00683 outCdr << actor.transportInterfaceInfo;
00684 ACE_Message_Block dst3;
00685 ACE_CDR::consolidate(&dst3, outCdr.begin());
00686
00687 len = dst3.length();
00688 char *buf3 = new (std::nothrow) char[len];
00689
00690 if (buf3 == 0) {
00691 ACE_ERROR((LM_ERROR, "(%P|%t) PersistenceUpdater::create( subscription) allocation failed.\n"));
00692 return;
00693 }
00694
00695 ArrDelAdapter<char> guard3(buf3);
00696
00697 ACE_OS::memcpy(buf3, dst3.base(), len);
00698 BinSeq tr_bin(len, buf3);
00699
00700 outCdr.reset();
00701 outCdr << actor.contentSubscriptionProfile.exprParams;
00702 ACE_Message_Block dst4;
00703 ACE_CDR::consolidate(&dst4, outCdr.begin());
00704 len = dst4.length();
00705 char* buf4 = new (std::nothrow) char[len];
00706 if (buf4 == 0) {
00707 ACE_ERROR((LM_ERROR, "(%P|%t) PersistenceUpdater::create( subscription) allocation failed.\n"));
00708 return;
00709 }
00710 ArrDelAdapter<char> guard4(buf4);
00711
00712 ACE_OS::memcpy(buf4, dst4.base(), len);
00713 ContentSubscriptionBin csp_bin;
00714 csp_bin.filterClassName = actor.contentSubscriptionProfile.filterClassName;
00715 csp_bin.filterExpr = actor.contentSubscriptionProfile.filterExpr;
00716 csp_bin.exprParams = std::make_pair(len, buf4);
00717
00718 DActor actor_data(actor.domainId, actor.actorId, actor.topicId
00719 , actor.participantId
00720 , DataReader, actor.callback.c_str(), pubsub_qos
00721 , dwdr_qos, tr_bin, csp_bin);
00722
00723
00724 void* buffer;
00725 ACE_ALLOCATOR(buffer, allocator_->malloc
00726 (sizeof(PersistenceUpdater::RWActor)));
00727
00728
00729 PersistenceUpdater::RWActor* persistent_data =
00730 new(buffer) PersistenceUpdater::RWActor(actor_data
00731 , allocator_.get());
00732
00733 IdType_ExtId ext(actor.actorId);
00734
00735
00736 if (actor_index_->bind(ext, persistent_data, allocator_.get()) != 0) {
00737 allocator_->free((void *) buffer);
00738 return;
00739 }
00740 }
00741
00742 void
00743 PersistenceUpdater::create(const UWActor& actor)
00744 {
00745 TAO_OutputCDR outCdr;
00746 outCdr << actor.pubsubQos;
00747 ACE_Message_Block dst;
00748 ACE_CDR::consolidate(&dst, outCdr.begin());
00749
00750 size_t len = dst.length();
00751 char *buf = new (std::nothrow) char[len];
00752
00753 if (buf == 0) {
00754 ACE_ERROR((LM_ERROR, "(%P|%t) PersistenceUpdater::create( publication): allocation failed.\n"));
00755 return;
00756 }
00757
00758 ArrDelAdapter<char> guard(buf);
00759
00760 ACE_OS::memcpy(buf, dst.base(), len);
00761
00762 BinSeq pubsub_qos_bin(len, buf);
00763 QosSeq pubsub_qos(PublisherQos, pubsub_qos_bin);
00764
00765 outCdr.reset();
00766 outCdr << actor.drdwQos;
00767 ACE_Message_Block dst2;
00768 ACE_CDR::consolidate(&dst2, outCdr.begin());
00769
00770 len = dst2.length();
00771 char *buf2 = new (std::nothrow) char[len];
00772
00773 if (buf2 == 0) {
00774 ACE_ERROR((LM_ERROR, "(%P|%t) PersistenceUpdater::create( publication): allocation failed.\n"));
00775 return;
00776 }
00777
00778 ArrDelAdapter<char> guard2(buf2);
00779
00780 ACE_OS::memcpy(buf2, dst2.base(), len);
00781
00782 BinSeq dwdr_qos_bin(len, buf2);
00783 QosSeq dwdr_qos(DataWriterQos, dwdr_qos_bin);
00784
00785 outCdr.reset();
00786 outCdr << actor.transportInterfaceInfo;
00787 ACE_Message_Block dst3;
00788 ACE_CDR::consolidate(&dst3, outCdr.begin());
00789
00790 len = dst3.length();
00791 char *buf3 = new (std::nothrow) char[len];
00792
00793 if (buf3 == 0) {
00794 ACE_ERROR((LM_ERROR, "(%P|%t) PersistenceUpdater::create( publication): allocation failed.\n"));
00795 return;
00796 }
00797
00798 ArrDelAdapter<char> guard3(buf3);
00799
00800 ACE_OS::memcpy(buf3, dst3.base(), len);
00801 BinSeq tr_bin(len, buf3);
00802
00803 DActor actor_data(actor.domainId, actor.actorId, actor.topicId
00804 , actor.participantId
00805 , DataWriter, actor.callback.c_str(), pubsub_qos
00806 , dwdr_qos, tr_bin, ContentSubscriptionBin());
00807
00808
00809 void* buffer;
00810 ACE_ALLOCATOR(buffer, allocator_->malloc
00811 (sizeof(PersistenceUpdater::RWActor)));
00812
00813
00814 PersistenceUpdater::RWActor* persistent_data =
00815 new(buffer) PersistenceUpdater::RWActor(actor_data
00816 , allocator_.get());
00817
00818 IdType_ExtId ext(actor.actorId);
00819
00820
00821 if (actor_index_->bind(ext, persistent_data, allocator_.get()) != 0) {
00822 allocator_->free((void *) buffer);
00823 return;
00824 }
00825 }
00826
00827 void
00828 PersistenceUpdater::create(const OwnershipData& )
00829 {
00830
00831 }
00832
00833 void
00834 PersistenceUpdater::update(const IdPath& id, const DDS::DomainParticipantQos& qos)
00835 {
00836 IdType_ExtId ext(id.id);
00837 PersistenceUpdater::Participant* part_data = 0;
00838
00839 if (this->participant_index_->find(ext, part_data, this->allocator_.get()) == 0) {
00840 TAO_OutputCDR outCdr;
00841 outCdr << qos;
00842 ACE_Message_Block dst;
00843 ACE_CDR::consolidate(&dst, outCdr.begin());
00844
00845 this->storeUpdate(dst, part_data->participantQos.second);
00846
00847 } else {
00848 OpenDDS::DCPS::RepoIdConverter converter(id.id);
00849 ACE_ERROR((LM_ERROR,
00850 ACE_TEXT("(%P|%t) PersistenceUpdater::update: ")
00851 ACE_TEXT("participant %C not found\n"),
00852 std::string(converter).c_str()));
00853 }
00854 }
00855
00856 void
00857 PersistenceUpdater::update(const IdPath& id, const DDS::TopicQos& qos)
00858 {
00859 IdType_ExtId ext(id.id);
00860 PersistenceUpdater::Topic* topic_data = 0;
00861
00862 if (this->topic_index_->find(ext, topic_data, this->allocator_.get()) == 0) {
00863 TAO_OutputCDR outCdr;
00864 outCdr << qos;
00865 ACE_Message_Block dst;
00866 ACE_CDR::consolidate(&dst, outCdr.begin());
00867
00868 this->storeUpdate(dst, topic_data->topicQos.second);
00869
00870 } else {
00871 OpenDDS::DCPS::RepoIdConverter converter(id.id);
00872 ACE_ERROR((LM_ERROR,
00873 ACE_TEXT("(%P|%t) PersistenceUpdater::update: ")
00874 ACE_TEXT("topic %C not found\n"),
00875 std::string(converter).c_str()));
00876 }
00877 }
00878
00879 void
00880 PersistenceUpdater::update(const IdPath& id, const DDS::DataWriterQos& qos)
00881 {
00882 IdType_ExtId ext(id.id);
00883 PersistenceUpdater::RWActor* actor_data = 0;
00884
00885 if (this->actor_index_->find(ext, actor_data, this->allocator_.get()) == 0) {
00886 TAO_OutputCDR outCdr;
00887 outCdr << qos;
00888 ACE_Message_Block dst;
00889 ACE_CDR::consolidate(&dst, outCdr.begin());
00890
00891 this->storeUpdate(dst, actor_data->drdwQos.second);
00892
00893 } else {
00894 OpenDDS::DCPS::RepoIdConverter converter(id.id);
00895 ACE_ERROR((LM_ERROR,
00896 ACE_TEXT("(%P|%t) PersistenceUpdater::update(writerQos): ")
00897 ACE_TEXT("publication %C not found\n"),
00898 std::string(converter).c_str()));
00899 }
00900 }
00901
00902 void
00903 PersistenceUpdater::update(const IdPath& id, const DDS::PublisherQos& qos)
00904 {
00905 IdType_ExtId ext(id.id);
00906 PersistenceUpdater::RWActor* actor_data = 0;
00907
00908 if (this->actor_index_->find(ext, actor_data, this->allocator_.get()) == 0) {
00909 TAO_OutputCDR outCdr;
00910 outCdr << qos;
00911 ACE_Message_Block dst;
00912 ACE_CDR::consolidate(&dst, outCdr.begin());
00913
00914 this->storeUpdate(dst, actor_data->pubsubQos.second);
00915
00916 } else {
00917 OpenDDS::DCPS::RepoIdConverter converter(id.id);
00918 ACE_ERROR((LM_ERROR,
00919 ACE_TEXT("(%P|%t) PersistenceUpdater::update(publisherQos): ")
00920 ACE_TEXT("publication %C not found\n"),
00921 std::string(converter).c_str()));
00922 }
00923 }
00924
00925 void
00926 PersistenceUpdater::update(const IdPath& id, const DDS::DataReaderQos& qos)
00927 {
00928 IdType_ExtId ext(id.id);
00929 PersistenceUpdater::RWActor* actor_data = 0;
00930
00931 if (this->actor_index_->find(ext, actor_data, this->allocator_.get()) == 0) {
00932 TAO_OutputCDR outCdr;
00933 outCdr << qos;
00934 ACE_Message_Block dst;
00935 ACE_CDR::consolidate(&dst, outCdr.begin());
00936
00937 this->storeUpdate(dst, actor_data->drdwQos.second);
00938
00939 } else {
00940 OpenDDS::DCPS::RepoIdConverter converter(id.id);
00941 ACE_ERROR((LM_ERROR,
00942 ACE_TEXT("(%P|%t) PersistenceUpdater::update(readerQos): ")
00943 ACE_TEXT("subscription %C not found\n"),
00944 std::string(converter).c_str()));
00945 }
00946 }
00947
00948 void
00949 PersistenceUpdater::update(const IdPath& id, const DDS::SubscriberQos& qos)
00950 {
00951 IdType_ExtId ext(id.id);
00952 PersistenceUpdater::RWActor* actor_data = 0;
00953
00954 if (this->actor_index_->find(ext, actor_data, this->allocator_.get()) == 0) {
00955 TAO_OutputCDR outCdr;
00956 outCdr << qos;
00957 ACE_Message_Block dst;
00958 ACE_CDR::consolidate(&dst, outCdr.begin());
00959
00960 this->storeUpdate(dst, actor_data->pubsubQos.second);
00961
00962 } else {
00963 OpenDDS::DCPS::RepoIdConverter converter(id.id);
00964 ACE_ERROR((LM_ERROR,
00965 ACE_TEXT("(%P|%t) PersistenceUpdater::update(subscriberQos): ")
00966 ACE_TEXT("subscription %C not found\n"),
00967 std::string(converter).c_str()));
00968 }
00969 }
00970
00971 void
00972 PersistenceUpdater::update(const IdPath& id, const DDS::StringSeq& exprParams)
00973 {
00974 IdType_ExtId ext(id.id);
00975 PersistenceUpdater::RWActor* actor_data = 0;
00976
00977 if (actor_index_->find(ext, actor_data, allocator_.get()) == 0) {
00978 TAO_OutputCDR outCdr;
00979 outCdr << exprParams;
00980 ACE_Message_Block dst;
00981 ACE_CDR::consolidate(&dst, outCdr.begin());
00982
00983 storeUpdate(dst, actor_data->contentSubscriptionProfile.exprParams);
00984
00985 } else {
00986 OpenDDS::DCPS::RepoIdConverter converter(id.id);
00987 ACE_ERROR((LM_ERROR,
00988 ACE_TEXT("(%P|%t) PersistenceUpdater::update(readerQos): ")
00989 ACE_TEXT("subscription %C not found\n"),
00990 std::string(converter).c_str()));
00991 }
00992 }
00993
00994 void
00995 PersistenceUpdater::destroy(const IdPath& id, ItemType type, ActorType)
00996 {
00997 IdType_ExtId ext(id.id);
00998 PersistenceUpdater::Topic* topic = 0;
00999 PersistenceUpdater::Participant* participant = 0;
01000 PersistenceUpdater::RWActor* actor = 0;
01001
01002 switch (type) {
01003 case Update::Topic:
01004
01005 if (topic_index_->unbind(ext, topic, allocator_.get()) == 0) {
01006 topic->cleanup(allocator_.get());
01007 allocator_->free((void *) topic);
01008 }
01009
01010 break;
01011 case Update::Participant:
01012
01013 if (participant_index_->unbind(ext, participant, allocator_.get()) == 0) {
01014 participant->cleanup(allocator_.get());
01015 allocator_->free((void *) participant);
01016 }
01017
01018 break;
01019 case Update::Actor:
01020
01021 if (actor_index_->unbind(ext, actor, allocator_.get()) == 0) {
01022 actor->cleanup(allocator_.get());
01023 allocator_->free((void *) actor);
01024 }
01025
01026 break;
01027 default: {
01028 OpenDDS::DCPS::RepoIdConverter converter(id.id);
01029 ACE_ERROR((LM_ERROR,
01030 ACE_TEXT("(%P|%t) PersistenceUpdater::destroy: ")
01031 ACE_TEXT("unknown entity - %C.\n"),
01032 std::string(converter).c_str()));
01033 }
01034 }
01035 }
01036
01037 void
01038 PersistenceUpdater::storeUpdate(const ACE_Message_Block& data, BinSeq& storage)
01039 {
01040 size_t len = data.length();
01041
01042 void* buffer;
01043 ACE_ALLOCATOR(buffer, this->allocator_->malloc(len));
01044 ACE_OS::memcpy(buffer, data.base(), len);
01045
01046 storage.first = len;
01047 storage.second = static_cast<char*>(buffer);
01048 }
01049
01050 void PersistenceUpdater::updateLastPartId(PartIdType partId)
01051 {
01052 *last_part_id_ = partId;
01053 }
01054
01055 }
01056
01057 int
01058 PersistenceUpdaterSvc_Loader::init()
01059 {
01060 return ACE_Service_Config::process_directive
01061 (ace_svc_desc_PersistenceUpdaterSvc);
01062 return 0;
01063 }
01064
01065
01066 ACE_FACTORY_DEFINE(ACE_Local_Service, PersistenceUpdaterSvc)
01067
01068 ACE_STATIC_SVC_DEFINE(PersistenceUpdaterSvc,
01069 ACE_TEXT("PersistenceUpdaterSvc"),
01070 ACE_SVC_OBJ_T,
01071 &ACE_SVC_NAME(PersistenceUpdaterSvc),
01072 ACE_Service_Type::DELETE_THIS |
01073 ACE_Service_Type::DELETE_OBJ,
01074 0)
01075
01076 ACE_STATIC_SVC_REQUIRE(PersistenceUpdaterSvc)
01077
01078 OPENDDS_END_VERSIONED_NAMESPACE_DECL