PersistenceUpdater.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DcpsInfo_pch.h"
00009 
00010 #include "PersistenceUpdater.h"
00011 #include "UpdateManager.h"
00012 #include "ArrDelAdapter.h"
00013 
00014 #include "dds/DCPS/RepoIdConverter.h"
00015 #include "dds/DCPS/GuidUtils.h"
00016 #include "dds/DCPS/debug.h"
00017 
00018 #include "ace/Malloc_T.h"
00019 #include "ace/MMAP_Memory_Pool.h"
00020 #include "ace/OS_NS_strings.h"
00021 #include "ace/Svc_Handler.h"
00022 #include "ace/Dynamic_Service.h"
00023 
00024 #include <algorithm>
00025 
00026 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00027 
00028 namespace {
00029   void assign(Update::BinSeq& to, const Update::BinSeq& from,
00030               Update::PersistenceUpdater::ALLOCATOR* allocator)
00031   {
00032     const size_t len = from.first;
00033     void* out_buf;
00034     ACE_ALLOCATOR(out_buf, allocator->malloc(len));
00035     ACE_OS::memcpy(out_buf, from.second, len);
00036     to = std::make_pair(len, static_cast<char*>(out_buf));
00037   }
00038 
00039   void assign(ACE_CString& to, const char* from,
00040               Update::PersistenceUpdater::ALLOCATOR* allocator)
00041   {
00042     const size_t len = ACE_OS::strlen (from) + 1;
00043     void* out_buf;
00044     ACE_ALLOCATOR(out_buf, allocator->malloc(len));
00045     ACE_OS::memcpy(out_buf, from, len);
00046     to.set(static_cast<char*>(out_buf), len - 1, false);
00047   }
00048 }
00049 
00050 namespace Update {
00051 
00052 // Template specializations with custom constructors
00053 // and cleanup methods
00054 template<>
00055 struct TopicStrt<QosSeq, ACE_CString> {
00056   DDS::DomainId_t   domainId;
00057   IdType            topicId;
00058   IdType            participantId;
00059   ACE_CString       name;
00060   ACE_CString       dataType;
00061   QosSeq            topicQos;
00062 
00063   TopicStrt(const DTopic& topic,
00064             PersistenceUpdater::ALLOCATOR* allocator)
00065     : domainId(topic.domainId),
00066       topicId(topic.topicId),
00067       participantId(topic.participantId)
00068   {
00069     assign(name, topic.name.c_str(), allocator);
00070     assign(dataType, topic.dataType.c_str(), allocator);
00071 
00072     topicQos.first = TopicQos;
00073     assign(topicQos.second, topic.topicQos.second, allocator);
00074   }
00075 
00076   void cleanup(PersistenceUpdater::ALLOCATOR* allocator)
00077   {
00078     if (name.length() > 0)
00079     {
00080       char* strMemory = const_cast<char*>(name.fast_rep());
00081       name.fast_clear();
00082       allocator->free(strMemory);
00083     }
00084     if (dataType.length() > 0)
00085     {
00086       char* strMemory = const_cast<char*>(dataType.fast_rep());
00087       dataType.fast_clear();
00088       allocator->free(strMemory);
00089     }
00090 
00091     allocator->free(topicQos.second.second);
00092   }
00093 };
00094 
00095 template<>
00096 struct ParticipantStrt<QosSeq> {
00097   DDS::DomainId_t   domainId;
00098   long              owner;
00099   IdType            participantId;
00100   QosSeq            participantQos;
00101 
00102   ParticipantStrt(const DDS::DomainId_t& dId,
00103                   long                   own,
00104                   const IdType&          pId,
00105                   const QosSeq&          pQos)
00106     : domainId(dId),
00107       owner(own),
00108       participantId(pId),
00109       participantQos(pQos) {}
00110 
00111   ParticipantStrt(const DParticipant& participant,
00112                   PersistenceUpdater::ALLOCATOR* allocator)
00113     : domainId(participant.domainId),
00114       owner(participant.owner),
00115       participantId(participant.participantId)
00116   {
00117     participantQos.first = ParticipantQos;
00118     assign(participantQos.second, participant.participantQos.second, allocator);
00119   }
00120 
00121   void cleanup(PersistenceUpdater::ALLOCATOR* allocator)
00122   {
00123     allocator->free(participantQos.second.second);
00124   }
00125 };
00126 
00127 template<>
00128 struct ActorStrt<QosSeq, QosSeq,
00129                  ACE_CString, BinSeq, ContentSubscriptionBin> {
00130   DDS::DomainId_t   domainId;
00131   IdType            actorId;
00132   IdType            topicId;
00133   IdType            participantId;
00134   ActorType         type;
00135   ACE_CString       callback;
00136   QosSeq            pubsubQos;
00137   QosSeq            drdwQos;
00138   BinSeq            transportInterfaceInfo;
00139   ContentSubscriptionBin contentSubscriptionProfile;
00140 
00141   ActorStrt(const DActor& actor,
00142             PersistenceUpdater::ALLOCATOR* allocator)
00143     : domainId(actor.domainId),
00144       actorId(actor.actorId),
00145       topicId(actor.topicId),
00146       participantId(actor.participantId), type(actor.type)
00147   {
00148     assign(callback, actor.callback.c_str(), allocator);
00149 
00150     pubsubQos.first = actor.pubsubQos.first;
00151     assign(pubsubQos.second, actor.pubsubQos.second, allocator);
00152 
00153     drdwQos.first = actor.drdwQos.first;
00154     assign(drdwQos.second, actor.drdwQos.second, allocator);
00155 
00156     assign(transportInterfaceInfo, actor.transportInterfaceInfo, allocator);
00157 
00158     contentSubscriptionProfile.filterClassName =
00159       ACE_CString(actor.contentSubscriptionProfile.filterClassName.c_str(),
00160                   allocator);
00161     contentSubscriptionProfile.filterExpr =
00162       ACE_CString(actor.contentSubscriptionProfile.filterExpr.c_str(),
00163                   allocator);
00164     assign(contentSubscriptionProfile.exprParams,
00165       actor.contentSubscriptionProfile.exprParams, allocator);
00166   }
00167 
00168   void cleanup(PersistenceUpdater::ALLOCATOR* allocator)
00169   {
00170     if (callback.length() > 0)
00171     {
00172       char* strMemory = const_cast<char*>(callback.fast_rep());
00173       callback.fast_clear();
00174       allocator->free(strMemory);
00175     }
00176 
00177     allocator->free(pubsubQos.second.second);
00178     allocator->free(drdwQos.second.second);
00179     allocator->free(transportInterfaceInfo.second);
00180     allocator->free(contentSubscriptionProfile.exprParams.second);
00181   }
00182 };
00183 
00184 PersistenceUpdater::IdType_ExtId::IdType_ExtId()
00185   : id_(OpenDDS::DCPS::GUID_UNKNOWN)
00186 {}
00187 
00188 PersistenceUpdater::IdType_ExtId::IdType_ExtId(IdType id)
00189   : id_(id)
00190 {}
00191 
00192 PersistenceUpdater::IdType_ExtId::IdType_ExtId(const IdType_ExtId& ext)
00193   : id_(ext.id_)
00194 {}
00195 
00196 void
00197 PersistenceUpdater::IdType_ExtId::operator= (const IdType_ExtId& ext)
00198 {
00199   id_ = ext.id_;
00200 }
00201 
00202 bool
00203 PersistenceUpdater::IdType_ExtId::operator== (const IdType_ExtId& ext) const
00204 {
00205   return (id_ == ext.id_);
00206 }
00207 
00208 unsigned long
00209 PersistenceUpdater::IdType_ExtId::hash() const
00210 {
00211   return OpenDDS::DCPS::RepoIdConverter(id_).checksum();
00212 };
00213 
00214 PersistenceUpdater::PersistenceUpdater()
00215   : persistence_file_(ACE_TEXT("InforepoPersist"))
00216   , reset_(false)
00217   , um_(0)
00218   , topic_index_(0)
00219   , participant_index_(0)
00220   , actor_index_(0)
00221   , last_part_id_(0)
00222 {}
00223 
00224 PersistenceUpdater::~PersistenceUpdater()
00225 {}
00226 
00227 // utility functions
00228 void* createIndex(const std::string& tag
00229                   , PersistenceUpdater::ALLOCATOR& allocator
00230                   , size_t size, bool& exists)
00231 {
00232   void* index = 0;
00233 
00234   // This is the easy case since if we find hash table in the
00235   // memory-mapped file we know it's already initialized.
00236   if (allocator.find(tag.c_str(), index) == 0) {
00237     exists = true;
00238     return index;
00239 
00240   } else {
00241     ACE_ALLOCATOR_RETURN(index, allocator.malloc(size), 0);
00242 
00243     if (allocator.bind(tag.c_str(), index) == -1) {
00244       allocator.free(index);
00245       index = 0;
00246     }
00247   }
00248 
00249   if (!index) {
00250     ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: PersistenceUpdater::init "
00251       "Initial allocation/Bind failed for %C.\n"), tag.c_str()));
00252   }
00253 
00254   return index;
00255 }
00256 
00257 template<typename I> void
00258 index_cleanup(I* index
00259               , PersistenceUpdater::ALLOCATOR* allocator)
00260 {
00261   for (typename I::ITERATOR iter = index->begin()
00262                                    ; iter != index->end();) {
00263     typename I::ITERATOR current_iter = iter;
00264     iter++;
00265 
00266     if (index->unbind((*current_iter).ext_id_, allocator) != 0) {
00267       ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: PersistenceUpdater::init:"
00268         "Index unbind failed.\n")));
00269     }
00270   }
00271 }
00272 
00273 int
00274 PersistenceUpdater::init(int argc, ACE_TCHAR *argv[])
00275 {
00276   // discover the UpdateManager
00277   um_ = ACE_Dynamic_Service<Update::Manager>::instance("UpdateManagerSvc");
00278 
00279   if (um_ == 0) {
00280     ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: PersistenceUpdater::init ")
00281                ACE_TEXT("No UpdateManager discovered.\n")));
00282     return -1;
00283   }
00284 
00285   this->parse(argc, argv);
00286 
00287 #if defined ACE_HAS_MAC_OSX && defined __x86_64__ && __x86_64__
00288   ACE_MMAP_Memory_Pool::OPTIONS options((void*)0x200000000);
00289 #else
00290   ACE_MMAP_Memory_Pool::OPTIONS options(ACE_DEFAULT_BASE_ADDR);
00291 #endif
00292 
00293   // Create the allocator with the appropriate options.  The name used
00294   // for  the lock is the same as one used for the file.
00295   ALLOCATOR* allocator;
00296   ACE_NEW_RETURN(allocator,
00297                  ALLOCATOR(persistence_file_.c_str(),
00298                            persistence_file_.c_str(),
00299                            &options),
00300                  -1);
00301   allocator_.reset(allocator);
00302 
00303   bool exists = false;
00304 
00305   char* topic_index = (char*)createIndex("TopicIndex", *allocator_
00306                                          , sizeof(TopicIndex), exists);
00307   if (!topic_index) {
00308     return -1;
00309   }
00310 
00311   char* participant_index = (char*)createIndex("ParticipantIndex", *allocator_
00312                                                , sizeof(ParticipantIndex), exists);
00313   if (!participant_index) {
00314     return -1;
00315   }
00316 
00317   char* actor_index = (char*)createIndex("ActorIndex", *allocator_
00318                                          , sizeof(ActorIndex), exists);
00319   if (!actor_index) {
00320     return -1;
00321   }
00322 
00323   void* last_part_id = createIndex(
00324     "LastParticipantId", *allocator_, sizeof(PartIdType), exists);
00325   if (!last_part_id) {
00326     return -1;
00327   }
00328   last_part_id_ = reinterpret_cast<PartIdType*>(last_part_id);
00329 
00330   if (exists) {
00331     topic_index_ = reinterpret_cast<TopicIndex*>(topic_index);
00332     participant_index_ = reinterpret_cast<ParticipantIndex*>(participant_index);
00333     actor_index_ = reinterpret_cast<ActorIndex*>(actor_index);
00334   } else {
00335     topic_index_ = new(topic_index) TopicIndex(allocator_.get());
00336     participant_index_ = new(participant_index) ParticipantIndex(allocator_.get());
00337     actor_index_ = new(actor_index) ActorIndex(allocator_.get());
00338     *last_part_id_ = 0;
00339   }
00340 
00341   if (reset_) {
00342     index_cleanup(topic_index_, allocator_.get());
00343     index_cleanup(participant_index_, allocator_.get());
00344     index_cleanup(actor_index_, allocator_.get());
00345     *last_part_id_ = 0;
00346   }
00347 
00348   // lastly register the callback
00349   um_->add(this);
00350 
00351   return 0;
00352 }
00353 
00354 int
00355 PersistenceUpdater::parse(int argc, ACE_TCHAR *argv[])
00356 {
00357   for (ssize_t count = 0; count < argc; count++) {
00358     if (ACE_OS::strcasecmp(argv[count], ACE_TEXT("-file")) == 0) {
00359       if ((count + 1) < argc) {
00360         persistence_file_ = argv[count+1];
00361         count++;
00362       }
00363 
00364     } else if (ACE_OS::strcasecmp(argv[count], ACE_TEXT("-reset")) == 0) {
00365       if ((count + 1) < argc) {
00366         int val = ACE_OS::atoi(argv[count+1]);
00367         reset_ = true;
00368 
00369         if (val == 0) {
00370           reset_ = false;
00371         }
00372 
00373         count++;
00374       }
00375 
00376     } else {
00377       ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) PersistenceUpdater::parse: Unknown option %s\n")
00378                  , argv[count]));
00379       return -1;
00380     }
00381   }
00382 
00383   return 0;
00384 }
00385 
00386 int
00387 PersistenceUpdater::fini()
00388 {
00389   return 0;
00390 }
00391 
00392 int
00393 PersistenceUpdater::svc()
00394 {
00395   return 0;
00396 }
00397 
00398 void
00399 PersistenceUpdater::requestImage()
00400 {
00401   if (um_ == NULL) {
00402     return;
00403   }
00404 
00405   DImage image;
00406 
00407   // Allocate space to hold the QOS sequences.
00408   std::vector<ArrDelAdapter<char> > qos_sequences;
00409 
00410   for (ParticipantIndex::ITERATOR iter = participant_index_->begin();
00411        iter != participant_index_->end(); iter++) {
00412     const PersistenceUpdater::Participant* participant
00413     = (*iter).int_id_;
00414 
00415     size_t qos_len = participant->participantQos.second.first;
00416     char *buf;
00417     ACE_NEW_NORETURN(buf, char[qos_len]);
00418     qos_sequences.push_back(ArrDelAdapter<char>(buf));
00419 
00420     if (buf == 0) {
00421       ACE_ERROR((LM_ERROR,
00422                  ACE_TEXT("(%P|%t) PersistenceUpdater::requestImage(): allocation failed.\n")));
00423       return;
00424     }
00425 
00426     ACE_OS::memcpy(buf, participant->participantQos.second.second, qos_len);
00427 
00428     BinSeq in_seq(qos_len, buf);
00429     QosSeq qos(ParticipantQos, in_seq);
00430     DParticipant dparticipant(participant->domainId
00431                               , participant->owner
00432                               , participant->participantId
00433                               , qos);
00434     image.participants.push_back(dparticipant);
00435     if (OpenDDS::DCPS::DCPS_debug_level >= 2)  {
00436       OpenDDS::DCPS::RepoIdConverter conv(participant->participantId);
00437       ACE_DEBUG((LM_DEBUG,
00438         "(%P|%t) PersistenceUpdater::requestImage(): loaded participant %C\n",
00439         OPENDDS_STRING(conv).c_str()));
00440     }
00441   }
00442 
00443   for (TopicIndex::ITERATOR iter = topic_index_->begin();
00444        iter != topic_index_->end(); iter++) {
00445     const PersistenceUpdater::Topic* topic = (*iter).int_id_;
00446 
00447     size_t qos_len = topic->topicQos.second.first;
00448     char *buf;
00449     ACE_NEW_NORETURN(buf, char[qos_len]);
00450     qos_sequences.push_back(ArrDelAdapter<char>(buf));
00451 
00452     if (buf == 0) {
00453       ACE_ERROR((LM_ERROR,
00454                  ACE_TEXT("(%P|%t) PersistenceUpdater::requestImage(): allocation failed.\n")));
00455       return;
00456     }
00457 
00458     ACE_OS::memcpy(buf, topic->topicQos.second.second, qos_len);
00459 
00460     BinSeq in_seq(qos_len, buf);
00461     QosSeq qos(TopicQos, in_seq);
00462     DTopic dTopic(topic->domainId, topic->topicId
00463                   , topic->participantId, topic->name.c_str()
00464                   , topic->dataType.c_str(), qos);
00465     image.topics.push_back(dTopic);
00466   }
00467 
00468   for (ActorIndex::ITERATOR iter = actor_index_->begin();
00469        iter != actor_index_->end(); iter++) {
00470     const PersistenceUpdater::RWActor* actor = (*iter).int_id_;
00471 
00472     size_t qos_len = actor->pubsubQos.second.first;
00473     char *buf;
00474     ACE_NEW_NORETURN(buf, char[qos_len]);
00475     qos_sequences.push_back(ArrDelAdapter<char>(buf));
00476 
00477     if (buf == 0) {
00478       ACE_ERROR((LM_ERROR,
00479                  ACE_TEXT("(%P|%t) PersistenceUpdater::requestImage(): allocation failed.\n")));
00480       return;
00481     }
00482 
00483     ACE_OS::memcpy(buf, actor->pubsubQos.second.second, qos_len);
00484 
00485     BinSeq in_pubsub_seq(qos_len, buf);
00486     QosSeq pubsub_qos(actor->pubsubQos.first, in_pubsub_seq);
00487 
00488     qos_len = actor->drdwQos.second.first;
00489     ACE_NEW_NORETURN(buf, char[qos_len]);
00490     qos_sequences.push_back(ArrDelAdapter<char>(buf));
00491 
00492     if (buf == 0) {
00493       ACE_ERROR((LM_ERROR,
00494                  ACE_TEXT("(%P|%t) PersistenceUpdater::requestImage(): allocation failed.\n")));
00495       return;
00496     }
00497 
00498     ACE_OS::memcpy(buf, actor->drdwQos.second.second, qos_len);
00499 
00500     BinSeq in_drdw_seq(qos_len, buf);
00501     QosSeq drdw_qos(actor->drdwQos.first, in_drdw_seq);
00502 
00503     size_t transport_len = actor->transportInterfaceInfo.first;
00504     ACE_NEW_NORETURN(buf, char[transport_len]);
00505     qos_sequences.push_back(ArrDelAdapter<char>(buf));
00506 
00507     if (buf == 0) {
00508       ACE_ERROR((LM_ERROR,
00509                  ACE_TEXT("(%P|%t) PersistenceUpdater::requestImage(): allocation failed.\n")));
00510       return;
00511     }
00512 
00513     ACE_OS::memcpy(buf, actor->transportInterfaceInfo.second, transport_len);
00514 
00515     BinSeq in_transport_seq(transport_len, buf);
00516 
00517     ContentSubscriptionBin in_csp_bin;
00518     if (actor->type == DataReader) {
00519       in_csp_bin.filterClassName = actor->contentSubscriptionProfile.filterClassName;
00520       in_csp_bin.filterExpr = actor->contentSubscriptionProfile.filterExpr;
00521       BinSeq& params = in_csp_bin.exprParams;
00522       ACE_NEW_NORETURN(params.second, char[params.first]);
00523       if (params.second == 0) {
00524         ACE_ERROR((LM_ERROR,
00525                    ACE_TEXT("(%P|%t) PersistenceUpdater::requestImage(): allocation ")
00526                    ACE_TEXT("failed.\n")));
00527         return;
00528       }
00529       qos_sequences.push_back(ArrDelAdapter<char>(params.second));
00530       ACE_OS::memcpy(params.second,
00531         actor->contentSubscriptionProfile.exprParams.second, params.first);
00532     }
00533 
00534     DActor dActor(actor->domainId, actor->actorId, actor->topicId
00535                   , actor->participantId
00536                   , actor->type, actor->callback.c_str()
00537                   , pubsub_qos, drdw_qos, in_transport_seq, in_csp_bin);
00538     image.actors.push_back(dActor);
00539   }
00540 
00541   image.lastPartId = *last_part_id_;
00542 
00543   um_->pushImage(image);
00544 }
00545 
00546 void
00547 PersistenceUpdater::create(const UTopic& topic)
00548 {
00549   // serialize the Topic QOS
00550   TAO_OutputCDR outCdr;
00551   outCdr << topic.topicQos;
00552   ACE_Message_Block dst;
00553   ACE_CDR::consolidate(&dst, outCdr.begin());
00554 
00555   size_t len = dst.length();
00556   char *buf;
00557   ACE_NEW_NORETURN(buf, char[len]);
00558   ArrDelAdapter<char> guard(buf);
00559 
00560   if (buf == 0) {
00561     ACE_ERROR((LM_ERROR,
00562                ACE_TEXT("(%P|%t) PersistenceUpdater::create( UTopic): allocation failed.\n")));
00563     return;
00564   }
00565 
00566   ACE_OS::memcpy(buf, dst.base(), len);
00567 
00568   BinSeq qos_bin(len, buf);
00569 
00570   QosSeq p(TopicQos, qos_bin);
00571   DTopic topic_data(topic.domainId, topic.topicId, topic.participantId
00572                     , topic.name.c_str(), topic.dataType.c_str(), p);
00573 
00574   // allocate memory for TopicData
00575   void* buffer;
00576   ACE_ALLOCATOR(buffer, allocator_->malloc
00577                 (sizeof(PersistenceUpdater::Topic)));
00578 
00579   // Initialize TopicData
00580   PersistenceUpdater::Topic* persistent_data
00581   = new(buffer) PersistenceUpdater::Topic(topic_data, allocator_.get());
00582 
00583   IdType_ExtId ext(topic_data.topicId);
00584 
00585   // bind TopicData with the topicId
00586   if (topic_index_->bind(ext, persistent_data, allocator_.get()) != 0) {
00587     allocator_->free((void *) buffer);
00588     return;
00589   }
00590 }
00591 
00592 void
00593 PersistenceUpdater::create(const UParticipant& participant)
00594 {
00595   // serialize the Topic QOS
00596   TAO_OutputCDR outCdr;
00597   outCdr << participant.participantQos;
00598   ACE_Message_Block dst;
00599   ACE_CDR::consolidate(&dst, outCdr.begin());
00600 
00601   size_t len = dst.length();
00602   char *buf;
00603   ACE_NEW_NORETURN(buf, char[len]);
00604   ArrDelAdapter<char> guard(buf);
00605 
00606   if (buf == 0) {
00607     ACE_ERROR((LM_ERROR,
00608                ACE_TEXT("(%P|%t) PersistenceUpdater::create( UParticipant): allocation failed.\n")));
00609     return;
00610   }
00611 
00612   ACE_OS::memcpy(buf, dst.base(), len);
00613 
00614   BinSeq qos_bin(len, buf);
00615 
00616   QosSeq p(ParticipantQos, qos_bin);
00617   DParticipant participant_data
00618   (participant.domainId, participant.owner, participant.participantId, p);
00619 
00620   // allocate memory for ParticipantData
00621   void* buffer;
00622   ACE_ALLOCATOR(buffer, allocator_->malloc
00623                 (sizeof(PersistenceUpdater::Participant)));
00624 
00625   // Initialize ParticipantData
00626   PersistenceUpdater::Participant* persistent_data
00627   = new(buffer) PersistenceUpdater::Participant(participant_data
00628                                                 , allocator_.get());
00629 
00630   IdType_ExtId ext(participant_data.participantId);
00631 
00632   // bind ParticipantData with the participantId
00633   if (participant_index_->bind(ext, persistent_data, allocator_.get()) != 0) {
00634     allocator_->free((void *) buffer);
00635     return;
00636   }
00637 }
00638 
00639 void
00640 PersistenceUpdater::create(const URActor& actor)
00641 {
00642   TAO_OutputCDR outCdr;
00643   outCdr << actor.pubsubQos;
00644   ACE_Message_Block dst;
00645   ACE_CDR::consolidate(&dst, outCdr.begin());
00646 
00647   size_t len = dst.length();
00648   char *buf = new (std::nothrow) char[len];
00649 
00650   if (buf == 0) {
00651     ACE_ERROR((LM_ERROR, "(%P|%t) PersistenceUpdater::create( subscription): allocation failed.\n"));
00652     return;
00653   }
00654 
00655   ArrDelAdapter<char> guard(buf);
00656 
00657   ACE_OS::memcpy(buf, dst.base(), len);
00658 
00659   BinSeq pubsub_qos_bin(len, buf);
00660   QosSeq pubsub_qos(SubscriberQos, pubsub_qos_bin);
00661 
00662   outCdr.reset();
00663   outCdr << actor.drdwQos;
00664   ACE_Message_Block dst2;
00665   ACE_CDR::consolidate(&dst2, outCdr.begin());
00666 
00667   len = dst2.length();
00668   char *buf2 = new (std::nothrow) char[len];
00669 
00670   if (buf2 == 0) {
00671     ACE_ERROR((LM_ERROR, "(%P|%t) PersistenceUpdater::create( subscription): allocation failed.\n"));
00672     return;
00673   }
00674 
00675   ArrDelAdapter<char> guard2(buf2);
00676 
00677   ACE_OS::memcpy(buf2, dst2.base(), len);
00678 
00679   BinSeq dwdr_qos_bin(len, buf2);
00680   QosSeq dwdr_qos(DataReaderQos, dwdr_qos_bin);
00681 
00682   outCdr.reset();
00683   outCdr << actor.transportInterfaceInfo;
00684   ACE_Message_Block dst3;
00685   ACE_CDR::consolidate(&dst3, outCdr.begin());
00686 
00687   len = dst3.length();
00688   char *buf3 = new (std::nothrow) char[len];
00689 
00690   if (buf3 == 0) {
00691     ACE_ERROR((LM_ERROR, "(%P|%t) PersistenceUpdater::create( subscription) allocation failed.\n"));
00692     return;
00693   }
00694 
00695   ArrDelAdapter<char> guard3(buf3);
00696 
00697   ACE_OS::memcpy(buf3, dst3.base(), len);
00698   BinSeq tr_bin(len, buf3);
00699 
00700   outCdr.reset();
00701   outCdr << actor.contentSubscriptionProfile.exprParams;
00702   ACE_Message_Block dst4;
00703   ACE_CDR::consolidate(&dst4, outCdr.begin());
00704   len = dst4.length();
00705   char* buf4 = new (std::nothrow) char[len];
00706   if (buf4 == 0) {
00707     ACE_ERROR((LM_ERROR, "(%P|%t) PersistenceUpdater::create( subscription) allocation failed.\n"));
00708     return;
00709   }
00710   ArrDelAdapter<char> guard4(buf4);
00711 
00712   ACE_OS::memcpy(buf4, dst4.base(), len);
00713   ContentSubscriptionBin csp_bin;
00714   csp_bin.filterClassName = actor.contentSubscriptionProfile.filterClassName;
00715   csp_bin.filterExpr = actor.contentSubscriptionProfile.filterExpr;
00716   csp_bin.exprParams = std::make_pair(len, buf4);
00717 
00718   DActor actor_data(actor.domainId, actor.actorId, actor.topicId
00719                     , actor.participantId
00720                     , DataReader, actor.callback.c_str(), pubsub_qos
00721                     , dwdr_qos, tr_bin, csp_bin);
00722 
00723   // allocate memory for ActorData
00724   void* buffer;
00725   ACE_ALLOCATOR(buffer, allocator_->malloc
00726                 (sizeof(PersistenceUpdater::RWActor)));
00727 
00728   // Initialize ActorData
00729   PersistenceUpdater::RWActor* persistent_data =
00730     new(buffer) PersistenceUpdater::RWActor(actor_data
00731                                             , allocator_.get());
00732 
00733   IdType_ExtId ext(actor.actorId);
00734 
00735   // bind ActorData with the actorId
00736   if (actor_index_->bind(ext, persistent_data, allocator_.get()) != 0) {
00737     allocator_->free((void *) buffer);
00738     return;
00739   }
00740 }
00741 
00742 void
00743 PersistenceUpdater::create(const UWActor& actor)
00744 {
00745   TAO_OutputCDR outCdr;
00746   outCdr << actor.pubsubQos;
00747   ACE_Message_Block dst;
00748   ACE_CDR::consolidate(&dst, outCdr.begin());
00749 
00750   size_t len = dst.length();
00751   char *buf = new (std::nothrow) char[len];
00752 
00753   if (buf == 0) {
00754     ACE_ERROR((LM_ERROR, "(%P|%t) PersistenceUpdater::create( publication): allocation failed.\n"));
00755     return;
00756   }
00757 
00758   ArrDelAdapter<char> guard(buf);
00759 
00760   ACE_OS::memcpy(buf, dst.base(), len);
00761 
00762   BinSeq pubsub_qos_bin(len, buf);
00763   QosSeq pubsub_qos(PublisherQos, pubsub_qos_bin);
00764 
00765   outCdr.reset();
00766   outCdr << actor.drdwQos;
00767   ACE_Message_Block dst2;
00768   ACE_CDR::consolidate(&dst2, outCdr.begin());
00769 
00770   len = dst2.length();
00771   char *buf2 = new (std::nothrow) char[len];
00772 
00773   if (buf2 == 0) {
00774     ACE_ERROR((LM_ERROR, "(%P|%t) PersistenceUpdater::create( publication): allocation failed.\n"));
00775     return;
00776   }
00777 
00778   ArrDelAdapter<char> guard2(buf2);
00779 
00780   ACE_OS::memcpy(buf2, dst2.base(), len);
00781 
00782   BinSeq dwdr_qos_bin(len, buf2);
00783   QosSeq dwdr_qos(DataWriterQos, dwdr_qos_bin);
00784 
00785   outCdr.reset();
00786   outCdr << actor.transportInterfaceInfo;
00787   ACE_Message_Block dst3;
00788   ACE_CDR::consolidate(&dst3, outCdr.begin());
00789 
00790   len = dst3.length();
00791   char *buf3 = new (std::nothrow) char[len];
00792 
00793   if (buf3 == 0) {
00794     ACE_ERROR((LM_ERROR, "(%P|%t) PersistenceUpdater::create( publication): allocation failed.\n"));
00795     return;
00796   }
00797 
00798   ArrDelAdapter<char> guard3(buf3);
00799 
00800   ACE_OS::memcpy(buf3, dst3.base(), len);
00801   BinSeq tr_bin(len, buf3);
00802 
00803   DActor actor_data(actor.domainId, actor.actorId, actor.topicId
00804                     , actor.participantId
00805                     , DataWriter, actor.callback.c_str(), pubsub_qos
00806                     , dwdr_qos, tr_bin, ContentSubscriptionBin());
00807 
00808   // allocate memory for ActorData
00809   void* buffer;
00810   ACE_ALLOCATOR(buffer, allocator_->malloc
00811                 (sizeof(PersistenceUpdater::RWActor)));
00812 
00813   // Initialize ActorData
00814   PersistenceUpdater::RWActor* persistent_data =
00815     new(buffer) PersistenceUpdater::RWActor(actor_data
00816                                             , allocator_.get());
00817 
00818   IdType_ExtId ext(actor.actorId);
00819 
00820   // bind ActorData with the actorId
00821   if (actor_index_->bind(ext, persistent_data, allocator_.get()) != 0) {
00822     allocator_->free((void *) buffer);
00823     return;
00824   }
00825 }
00826 
00827 void
00828 PersistenceUpdater::create(const OwnershipData& /* data */)
00829 {
00830   /* This method intentionally left unimplemented. */
00831 }
00832 
00833 void
00834 PersistenceUpdater::update(const IdPath& id, const DDS::DomainParticipantQos& qos)
00835 {
00836   IdType_ExtId ext(id.id);
00837   PersistenceUpdater::Participant* part_data = 0;
00838 
00839   if (this->participant_index_->find(ext, part_data, this->allocator_.get()) ==  0) {
00840     TAO_OutputCDR outCdr;
00841     outCdr << qos;
00842     ACE_Message_Block dst;
00843     ACE_CDR::consolidate(&dst, outCdr.begin());
00844 
00845     this->storeUpdate(dst, part_data->participantQos.second);
00846 
00847   } else {
00848     OpenDDS::DCPS::RepoIdConverter converter(id.id);
00849     ACE_ERROR((LM_ERROR,
00850                ACE_TEXT("(%P|%t) PersistenceUpdater::update: ")
00851                ACE_TEXT("participant %C not found\n"),
00852                std::string(converter).c_str()));
00853   }
00854 }
00855 
00856 void
00857 PersistenceUpdater::update(const IdPath& id, const DDS::TopicQos& qos)
00858 {
00859   IdType_ExtId ext(id.id);
00860   PersistenceUpdater::Topic* topic_data = 0;
00861 
00862   if (this->topic_index_->find(ext, topic_data, this->allocator_.get()) ==  0) {
00863     TAO_OutputCDR outCdr;
00864     outCdr << qos;
00865     ACE_Message_Block dst;
00866     ACE_CDR::consolidate(&dst, outCdr.begin());
00867 
00868     this->storeUpdate(dst, topic_data->topicQos.second);
00869 
00870   } else {
00871     OpenDDS::DCPS::RepoIdConverter converter(id.id);
00872     ACE_ERROR((LM_ERROR,
00873                ACE_TEXT("(%P|%t) PersistenceUpdater::update: ")
00874                ACE_TEXT("topic %C not found\n"),
00875                std::string(converter).c_str()));
00876   }
00877 }
00878 
00879 void
00880 PersistenceUpdater::update(const IdPath& id, const DDS::DataWriterQos& qos)
00881 {
00882   IdType_ExtId ext(id.id);
00883   PersistenceUpdater::RWActor* actor_data = 0;
00884 
00885   if (this->actor_index_->find(ext, actor_data, this->allocator_.get()) ==  0) {
00886     TAO_OutputCDR outCdr;
00887     outCdr << qos;
00888     ACE_Message_Block dst;
00889     ACE_CDR::consolidate(&dst, outCdr.begin());
00890 
00891     this->storeUpdate(dst, actor_data->drdwQos.second);
00892 
00893   } else {
00894     OpenDDS::DCPS::RepoIdConverter converter(id.id);
00895     ACE_ERROR((LM_ERROR,
00896                ACE_TEXT("(%P|%t) PersistenceUpdater::update(writerQos): ")
00897                ACE_TEXT("publication %C not found\n"),
00898                std::string(converter).c_str()));
00899   }
00900 }
00901 
00902 void
00903 PersistenceUpdater::update(const IdPath& id, const DDS::PublisherQos& qos)
00904 {
00905   IdType_ExtId ext(id.id);
00906   PersistenceUpdater::RWActor* actor_data = 0;
00907 
00908   if (this->actor_index_->find(ext, actor_data, this->allocator_.get()) ==  0) {
00909     TAO_OutputCDR outCdr;
00910     outCdr << qos;
00911     ACE_Message_Block dst;
00912     ACE_CDR::consolidate(&dst, outCdr.begin());
00913 
00914     this->storeUpdate(dst, actor_data->pubsubQos.second);
00915 
00916   } else {
00917     OpenDDS::DCPS::RepoIdConverter converter(id.id);
00918     ACE_ERROR((LM_ERROR,
00919                ACE_TEXT("(%P|%t) PersistenceUpdater::update(publisherQos): ")
00920                ACE_TEXT("publication %C not found\n"),
00921                std::string(converter).c_str()));
00922   }
00923 }
00924 
00925 void
00926 PersistenceUpdater::update(const IdPath& id, const DDS::DataReaderQos& qos)
00927 {
00928   IdType_ExtId ext(id.id);
00929   PersistenceUpdater::RWActor* actor_data = 0;
00930 
00931   if (this->actor_index_->find(ext, actor_data, this->allocator_.get()) ==  0) {
00932     TAO_OutputCDR outCdr;
00933     outCdr << qos;
00934     ACE_Message_Block dst;
00935     ACE_CDR::consolidate(&dst, outCdr.begin());
00936 
00937     this->storeUpdate(dst, actor_data->drdwQos.second);
00938 
00939   } else {
00940     OpenDDS::DCPS::RepoIdConverter converter(id.id);
00941     ACE_ERROR((LM_ERROR,
00942                ACE_TEXT("(%P|%t) PersistenceUpdater::update(readerQos): ")
00943                ACE_TEXT("subscription %C not found\n"),
00944                std::string(converter).c_str()));
00945   }
00946 }
00947 
00948 void
00949 PersistenceUpdater::update(const IdPath& id, const DDS::SubscriberQos& qos)
00950 {
00951   IdType_ExtId ext(id.id);
00952   PersistenceUpdater::RWActor* actor_data = 0;
00953 
00954   if (this->actor_index_->find(ext, actor_data, this->allocator_.get()) ==  0) {
00955     TAO_OutputCDR outCdr;
00956     outCdr << qos;
00957     ACE_Message_Block dst;
00958     ACE_CDR::consolidate(&dst, outCdr.begin());
00959 
00960     this->storeUpdate(dst, actor_data->pubsubQos.second);
00961 
00962   } else {
00963     OpenDDS::DCPS::RepoIdConverter converter(id.id);
00964     ACE_ERROR((LM_ERROR,
00965                ACE_TEXT("(%P|%t) PersistenceUpdater::update(subscriberQos): ")
00966                ACE_TEXT("subscription %C not found\n"),
00967                std::string(converter).c_str()));
00968   }
00969 }
00970 
00971 void
00972 PersistenceUpdater::update(const IdPath& id, const DDS::StringSeq& exprParams)
00973 {
00974   IdType_ExtId ext(id.id);
00975   PersistenceUpdater::RWActor* actor_data = 0;
00976 
00977   if (actor_index_->find(ext, actor_data, allocator_.get()) ==  0) {
00978     TAO_OutputCDR outCdr;
00979     outCdr << exprParams;
00980     ACE_Message_Block dst;
00981     ACE_CDR::consolidate(&dst, outCdr.begin());
00982 
00983     storeUpdate(dst, actor_data->contentSubscriptionProfile.exprParams);
00984 
00985   } else {
00986     OpenDDS::DCPS::RepoIdConverter converter(id.id);
00987     ACE_ERROR((LM_ERROR,
00988                ACE_TEXT("(%P|%t) PersistenceUpdater::update(readerQos): ")
00989                ACE_TEXT("subscription %C not found\n"),
00990                std::string(converter).c_str()));
00991   }
00992 }
00993 
00994 void
00995 PersistenceUpdater::destroy(const IdPath& id, ItemType type, ActorType)
00996 {
00997   IdType_ExtId ext(id.id);
00998   PersistenceUpdater::Topic* topic = 0;
00999   PersistenceUpdater::Participant* participant = 0;
01000   PersistenceUpdater::RWActor* actor = 0;
01001 
01002   switch (type) {
01003   case Update::Topic:
01004 
01005     if (topic_index_->unbind(ext, topic, allocator_.get()) == 0) {
01006       topic->cleanup(allocator_.get());
01007       allocator_->free((void *) topic);
01008     }
01009 
01010     break;
01011   case Update::Participant:
01012 
01013     if (participant_index_->unbind(ext, participant, allocator_.get()) == 0) {
01014       participant->cleanup(allocator_.get());
01015       allocator_->free((void *) participant);
01016     }
01017 
01018     break;
01019   case Update::Actor:
01020 
01021     if (actor_index_->unbind(ext, actor, allocator_.get()) == 0) {
01022       actor->cleanup(allocator_.get());
01023       allocator_->free((void *) actor);
01024     }
01025 
01026     break;
01027   default: {
01028     OpenDDS::DCPS::RepoIdConverter converter(id.id);
01029     ACE_ERROR((LM_ERROR,
01030                ACE_TEXT("(%P|%t) PersistenceUpdater::destroy: ")
01031                ACE_TEXT("unknown entity - %C.\n"),
01032                std::string(converter).c_str()));
01033   }
01034   }
01035 }
01036 
01037 void
01038 PersistenceUpdater::storeUpdate(const ACE_Message_Block& data, BinSeq& storage)
01039 {
01040   size_t len = data.length();
01041 
01042   void* buffer;
01043   ACE_ALLOCATOR(buffer, this->allocator_->malloc(len));
01044   ACE_OS::memcpy(buffer, data.base(), len);
01045 
01046   storage.first  = len;
01047   storage.second = static_cast<char*>(buffer);
01048 }
01049 
01050 void PersistenceUpdater::updateLastPartId(PartIdType partId)
01051 {
01052   *last_part_id_ = partId;
01053 }
01054 
01055 } // namespace Update
01056 
01057 int
01058 PersistenceUpdaterSvc_Loader::init()
01059 {
01060   return ACE_Service_Config::process_directive
01061          (ace_svc_desc_PersistenceUpdaterSvc);
01062   return 0;
01063 }
01064 
01065 // from the "ACE Programmers Guide (P. 424)
01066 ACE_FACTORY_DEFINE(ACE_Local_Service, PersistenceUpdaterSvc)
01067 
01068 ACE_STATIC_SVC_DEFINE(PersistenceUpdaterSvc,
01069                       ACE_TEXT("PersistenceUpdaterSvc"),
01070                       ACE_SVC_OBJ_T,
01071                       &ACE_SVC_NAME(PersistenceUpdaterSvc),
01072                       ACE_Service_Type::DELETE_THIS |
01073                       ACE_Service_Type::DELETE_OBJ,
01074                       0)
01075 
01076 ACE_STATIC_SVC_REQUIRE(PersistenceUpdaterSvc)
01077 
01078 OPENDDS_END_VERSIONED_NAMESPACE_DECL
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1