PersistenceUpdater.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DcpsInfo_pch.h"
00009 
00010 #include "PersistenceUpdater.h"
00011 #include "UpdateManager.h"
00012 #include "ArrDelAdapter.h"
00013 
00014 #include "dds/DCPS/RepoIdConverter.h"
00015 #include "dds/DCPS/GuidUtils.h"
00016 
00017 #include "ace/Malloc_T.h"
00018 #include "ace/MMAP_Memory_Pool.h"
00019 #include "ace/OS_NS_strings.h"
00020 #include "ace/Svc_Handler.h"
00021 #include "ace/Dynamic_Service.h"
00022 
00023 #include <algorithm>
00024 
00025 namespace {
00026   void assign(Update::BinSeq& to, const Update::BinSeq& from,
00027               Update::PersistenceUpdater::ALLOCATOR* allocator)
00028   {
00029     const size_t len = from.first;
00030     void* out_buf;
00031     ACE_ALLOCATOR(out_buf, allocator->malloc(len));
00032     ACE_OS::memcpy(out_buf, from.second, len);
00033     to = std::make_pair(len, static_cast<char*>(out_buf));
00034   }
00035 
00036   void assign(ACE_CString& to, const char* from,
00037               Update::PersistenceUpdater::ALLOCATOR* allocator)
00038   {
00039     const size_t len = ACE_OS::strlen (from) + 1;
00040     void* out_buf;
00041     ACE_ALLOCATOR(out_buf, allocator->malloc(len));
00042     ACE_OS::memcpy(out_buf, from, len);
00043     to.set(static_cast<char*>(out_buf), len - 1, false);
00044   }
00045 }
00046 
00047 namespace Update {
00048 
00049 // Template specializations with custom constructors
00050 // and cleanup methods
00051 template<>
00052 struct TopicStrt<QosSeq, ACE_CString> {
00053   DDS::DomainId_t   domainId;
00054   IdType            topicId;
00055   IdType            participantId;
00056   ACE_CString       name;
00057   ACE_CString       dataType;
00058   QosSeq            topicQos;
00059 
00060   TopicStrt(const DTopic& topic,
00061             PersistenceUpdater::ALLOCATOR* allocator)
00062     : domainId(topic.domainId),
00063       topicId(topic.topicId),
00064       participantId(topic.participantId)
00065   {
00066     assign(name, topic.name.c_str(), allocator);
00067     assign(dataType, topic.dataType.c_str(), allocator);
00068 
00069     topicQos.first = TopicQos;
00070     assign(topicQos.second, topic.topicQos.second, allocator);
00071   }
00072 
00073   void cleanup(PersistenceUpdater::ALLOCATOR* allocator)
00074   {
00075     if (name.length() > 0)
00076     {
00077       char* strMemory = const_cast<char*>(name.fast_rep());
00078       name.fast_clear();
00079       allocator->free(strMemory);
00080     }
00081     if (dataType.length() > 0)
00082     {
00083       char* strMemory = const_cast<char*>(dataType.fast_rep());
00084       dataType.fast_clear();
00085       allocator->free(strMemory);
00086     }
00087 
00088     allocator->free(topicQos.second.second);
00089   }
00090 };
00091 
00092 template<>
00093 struct ParticipantStrt<QosSeq> {
00094   DDS::DomainId_t   domainId;
00095   long              owner;
00096   IdType            participantId;
00097   QosSeq            participantQos;
00098 
00099   ParticipantStrt(const DDS::DomainId_t& dId,
00100                   long                   own,
00101                   const IdType&          pId,
00102                   const QosSeq&          pQos)
00103     : domainId(dId),
00104       owner(own),
00105       participantId(pId),
00106       participantQos(pQos) {}
00107 
00108   ParticipantStrt(const DParticipant& participant,
00109                   PersistenceUpdater::ALLOCATOR* allocator)
00110     : domainId(participant.domainId),
00111       owner(participant.owner),
00112       participantId(participant.participantId)
00113   {
00114     participantQos.first = ParticipantQos;
00115     assign(participantQos.second, participant.participantQos.second, allocator);
00116   }
00117 
00118   void cleanup(PersistenceUpdater::ALLOCATOR* allocator)
00119   {
00120     allocator->free(participantQos.second.second);
00121   }
00122 };
00123 
00124 template<>
00125 struct ActorStrt<QosSeq, QosSeq,
00126                  ACE_CString, BinSeq, ContentSubscriptionBin> {
00127   DDS::DomainId_t   domainId;
00128   IdType            actorId;
00129   IdType            topicId;
00130   IdType            participantId;
00131   ActorType         type;
00132   ACE_CString       callback;
00133   QosSeq            pubsubQos;
00134   QosSeq            drdwQos;
00135   BinSeq            transportInterfaceInfo;
00136   ContentSubscriptionBin contentSubscriptionProfile;
00137 
00138   ActorStrt(const DActor& actor,
00139             PersistenceUpdater::ALLOCATOR* allocator)
00140     : domainId(actor.domainId),
00141       actorId(actor.actorId),
00142       topicId(actor.topicId),
00143       participantId(actor.participantId), type(actor.type)
00144   {
00145     assign(callback, actor.callback.c_str(), allocator);
00146 
00147     pubsubQos.first = actor.pubsubQos.first;
00148     assign(pubsubQos.second, actor.pubsubQos.second, allocator);
00149 
00150     drdwQos.first = actor.drdwQos.first;
00151     assign(drdwQos.second, actor.drdwQos.second, allocator);
00152 
00153     assign(transportInterfaceInfo, actor.transportInterfaceInfo, allocator);
00154 
00155     contentSubscriptionProfile.filterClassName =
00156       ACE_CString(actor.contentSubscriptionProfile.filterClassName.c_str(),
00157                   allocator);
00158     contentSubscriptionProfile.filterExpr =
00159       ACE_CString(actor.contentSubscriptionProfile.filterExpr.c_str(),
00160                   allocator);
00161     assign(contentSubscriptionProfile.exprParams,
00162       actor.contentSubscriptionProfile.exprParams, allocator);
00163   }
00164 
00165   void cleanup(PersistenceUpdater::ALLOCATOR* allocator)
00166   {
00167     if (callback.length() > 0)
00168     {
00169       char* strMemory = const_cast<char*>(callback.fast_rep());
00170       callback.fast_clear();
00171       allocator->free(strMemory);
00172     }
00173 
00174     allocator->free(pubsubQos.second.second);
00175     allocator->free(drdwQos.second.second);
00176     allocator->free(transportInterfaceInfo.second);
00177     allocator->free(contentSubscriptionProfile.exprParams.second);
00178   }
00179 };
00180 
00181 PersistenceUpdater::IdType_ExtId::IdType_ExtId()
00182   : id_(OpenDDS::DCPS::GUID_UNKNOWN)
00183 {}
00184 
00185 PersistenceUpdater::IdType_ExtId::IdType_ExtId(IdType id)
00186   : id_(id)
00187 {}
00188 
00189 PersistenceUpdater::IdType_ExtId::IdType_ExtId(const IdType_ExtId& ext)
00190   : id_(ext.id_)
00191 {}
00192 
00193 void
00194 PersistenceUpdater::IdType_ExtId::operator= (const IdType_ExtId& ext)
00195 {
00196   id_ = ext.id_;
00197 }
00198 
00199 bool
00200 PersistenceUpdater::IdType_ExtId::operator== (const IdType_ExtId& ext) const
00201 {
00202   return (id_ == ext.id_);
00203 }
00204 
00205 unsigned long
00206 PersistenceUpdater::IdType_ExtId::hash() const
00207 {
00208   return OpenDDS::DCPS::RepoIdConverter(id_).checksum();
00209 };
00210 
00211 PersistenceUpdater::PersistenceUpdater()
00212   : persistence_file_(ACE_TEXT("InforepoPersist"))
00213   , reset_(false)
00214   , um_(0)
00215   , allocator_(0)
00216   , topic_index_(0)
00217   , participant_index_(0)
00218   , actor_index_(0)
00219 
00220 {}
00221 
00222 PersistenceUpdater::~PersistenceUpdater()
00223 {}
00224 
00225 // utility functions
00226 void* createIndex(const std::string& tag
00227                   , PersistenceUpdater::ALLOCATOR& allocator
00228                   , size_t size, bool& exists)
00229 {
00230   void* index = 0;
00231 
00232   // This is the easy case since if we find hash table in the
00233   // memory-mapped file we know it's already initialized.
00234   if (allocator.find(tag.c_str(), index) == 0) {
00235     exists = true;
00236     return index;
00237 
00238   } else {
00239     exists = false;
00240 
00241     ACE_ALLOCATOR_RETURN(index, allocator.malloc(size), 0);
00242 
00243     if (allocator.bind(tag.c_str(), index) == -1) {
00244       allocator.free(index);
00245       index = 0;
00246     }
00247   }
00248 
00249   return index;
00250 }
00251 
00252 template<typename I> void
00253 index_cleanup(I* index
00254               , PersistenceUpdater::ALLOCATOR* allocator)
00255 {
00256   for (typename I::ITERATOR iter = index->begin()
00257                                    ; iter != index->end();) {
00258     typename I::ITERATOR current_iter = iter;
00259     iter++;
00260 
00261     if (index->unbind((*current_iter).ext_id_, allocator) != 0) {
00262       ACE_ERROR((LM_ERROR, ACE_TEXT("Index unbind failed.\n")));
00263     }
00264   }
00265 }
00266 
00267 int
00268 PersistenceUpdater::init(int argc, ACE_TCHAR *argv[])
00269 {
00270   // discover the UpdateManager
00271   um_ = ACE_Dynamic_Service<Update::Manager>::instance
00272         ("UpdateManagerSvc");
00273 
00274   if (um_ == 0) {
00275     ACE_ERROR((LM_ERROR, ACE_TEXT("PersistenceUpdater initialization failed. ")
00276                ACE_TEXT("No UpdateManager discovered.\n")));
00277     return -1;
00278   }
00279 
00280   this->parse(argc, argv);
00281 
00282 #if defined ACE_HAS_MAC_OSX && defined __x86_64__ && __x86_64__
00283   ACE_MMAP_Memory_Pool::OPTIONS options((void*)0x200000000);
00284 #else
00285   ACE_MMAP_Memory_Pool::OPTIONS options(ACE_DEFAULT_BASE_ADDR);
00286 #endif
00287 
00288   // Create the allocator with the appropriate options.  The name used
00289   // for  the lock is the same as one used for the file.
00290   ACE_NEW_RETURN(allocator_,
00291                  ALLOCATOR(persistence_file_.c_str(),
00292                            persistence_file_.c_str(),
00293                            &options),
00294                  -1);
00295 
00296   std::string topic_tag("TopicIndex");
00297   std::string participant_tag("ParticipantIndex");
00298   std::string actor_tag("ActorIndex");
00299   bool exists = false, ex = false;
00300 
00301   char* topic_index = (char*)createIndex(topic_tag, *allocator_
00302                                          , sizeof(TopicIndex), ex);
00303 
00304   if (topic_index == 0) {
00305     ACE_DEBUG((LM_DEBUG, ACE_TEXT("Initial allocation/Bind failed 1.\n")));
00306     return -1;
00307   }
00308 
00309   exists = exists || ex;
00310 
00311   char* participant_index = (char*)createIndex(participant_tag, *allocator_
00312                                                , sizeof(ParticipantIndex), ex);
00313 
00314   if (participant_index == 0) {
00315     ACE_DEBUG((LM_DEBUG, ACE_TEXT("Initial allocation/Bind failed 2.\n")));
00316     return -1;
00317   }
00318 
00319   exists = exists || ex;
00320 
00321   char* actor_index = (char*)createIndex(actor_tag, *allocator_
00322                                          , sizeof(ActorIndex), ex);
00323 
00324   if (actor_index == 0) {
00325     ACE_DEBUG((LM_DEBUG, ACE_TEXT("Initial allocation/Bind failed 2.\n")));
00326     return -1;
00327   }
00328 
00329   exists = exists || ex;
00330 
00331   if (exists) {
00332     topic_index_ = reinterpret_cast<TopicIndex*>(topic_index);
00333     participant_index_ = reinterpret_cast<ParticipantIndex*>(participant_index);
00334     actor_index_ = reinterpret_cast<ActorIndex*>(actor_index);
00335 
00336     if (!(topic_index_ && participant_index_ && actor_index_)) {
00337       ACE_ERROR((LM_DEBUG, ACE_TEXT("Unable to narrow persistent indexes.\n")));
00338       return -1;
00339     }
00340 
00341   } else {
00342     topic_index_ = new(topic_index) TopicIndex(allocator_);
00343     participant_index_ = new(participant_index) ParticipantIndex(allocator_);
00344     actor_index_ = new(actor_index) ActorIndex(allocator_);
00345   }
00346 
00347   if (reset_) {
00348     index_cleanup(topic_index_, allocator_);
00349     index_cleanup(participant_index_, allocator_);
00350     index_cleanup(actor_index_, allocator_);
00351   }
00352 
00353   // lastly register the callback
00354   um_->add(this);
00355 
00356   return 0;
00357 }
00358 
00359 int
00360 PersistenceUpdater::parse(int argc, ACE_TCHAR *argv[])
00361 {
00362   for (ssize_t count = 0; count < argc; count++) {
00363     if (ACE_OS::strcasecmp(argv[count], ACE_TEXT("-file")) == 0) {
00364       if ((count + 1) < argc) {
00365         persistence_file_ = argv[count+1];
00366         count++;
00367       }
00368 
00369     } else if (ACE_OS::strcasecmp(argv[count], ACE_TEXT("-reset")) == 0) {
00370       if ((count + 1) < argc) {
00371         int val = ACE_OS::atoi(argv[count+1]);
00372         reset_ = true;
00373 
00374         if (val == 0) {
00375           reset_ = false;
00376         }
00377 
00378         count++;
00379       }
00380 
00381     } else {
00382       ACE_DEBUG((LM_DEBUG, ACE_TEXT("Unknown option %s\n")
00383                  , argv[count]));
00384       return -1;
00385     }
00386   }
00387 
00388   return 0;
00389 }
00390 
00391 int
00392 PersistenceUpdater::fini()
00393 {
00394   return 0;
00395 }
00396 
00397 int
00398 PersistenceUpdater::svc()
00399 {
00400   return 0;
00401 }
00402 
00403 void
00404 PersistenceUpdater::requestImage()
00405 {
00406   if (um_ == NULL) {
00407     return;
00408   }
00409 
00410   DImage image;
00411 
00412   // Allocate space to hold the QOS sequences.
00413   std::vector<ArrDelAdapter<char> > qos_sequences;
00414 
00415   for (ParticipantIndex::ITERATOR iter = participant_index_->begin();
00416        iter != participant_index_->end(); iter++) {
00417     const PersistenceUpdater::Participant* participant
00418     = (*iter).int_id_;
00419 
00420     size_t qos_len = participant->participantQos.second.first;
00421     char *buf;
00422     ACE_NEW_NORETURN(buf, char[qos_len]);
00423     qos_sequences.push_back(ArrDelAdapter<char>(buf));
00424 
00425     if (buf == 0) {
00426       ACE_ERROR((LM_ERROR,
00427                  ACE_TEXT("PersistenceUpdater::requestImage(): allocation failed.\n")));
00428       return;
00429     }
00430 
00431     ACE_OS::memcpy(buf, participant->participantQos.second.second, qos_len);
00432 
00433     BinSeq in_seq(qos_len, buf);
00434     QosSeq qos(ParticipantQos, in_seq);
00435     DParticipant dparticipant(participant->domainId
00436                               , participant->owner
00437                               , participant->participantId
00438                               , qos);
00439     image.participants.push_back(dparticipant);
00440   }
00441 
00442   for (TopicIndex::ITERATOR iter = topic_index_->begin();
00443        iter != topic_index_->end(); iter++) {
00444     const PersistenceUpdater::Topic* topic = (*iter).int_id_;
00445 
00446     size_t qos_len = topic->topicQos.second.first;
00447     char *buf;
00448     ACE_NEW_NORETURN(buf, char[qos_len]);
00449     qos_sequences.push_back(ArrDelAdapter<char>(buf));
00450 
00451     if (buf == 0) {
00452       ACE_ERROR((LM_ERROR,
00453                  ACE_TEXT("PersistenceUpdater::requestImage(): allocation failed.\n")));
00454       return;
00455     }
00456 
00457     ACE_OS::memcpy(buf, topic->topicQos.second.second, qos_len);
00458 
00459     BinSeq in_seq(qos_len, buf);
00460     QosSeq qos(TopicQos, in_seq);
00461     DTopic dTopic(topic->domainId, topic->topicId
00462                   , topic->participantId, topic->name.c_str()
00463                   , topic->dataType.c_str(), qos);
00464     image.topics.push_back(dTopic);
00465   }
00466 
00467   for (ActorIndex::ITERATOR iter = actor_index_->begin();
00468        iter != actor_index_->end(); iter++) {
00469     const PersistenceUpdater::RWActor* actor = (*iter).int_id_;
00470 
00471     size_t qos_len = actor->pubsubQos.second.first;
00472     char *buf;
00473     ACE_NEW_NORETURN(buf, char[qos_len]);
00474     qos_sequences.push_back(ArrDelAdapter<char>(buf));
00475 
00476     if (buf == 0) {
00477       ACE_ERROR((LM_ERROR,
00478                  ACE_TEXT("PersistenceUpdater::requestImage(): allocation failed.\n")));
00479       return;
00480     }
00481 
00482     ACE_OS::memcpy(buf, actor->pubsubQos.second.second, qos_len);
00483 
00484     BinSeq in_pubsub_seq(qos_len, buf);
00485     QosSeq pubsub_qos(actor->pubsubQos.first, in_pubsub_seq);
00486 
00487     qos_len = actor->drdwQos.second.first;
00488     ACE_NEW_NORETURN(buf, char[qos_len]);
00489     qos_sequences.push_back(ArrDelAdapter<char>(buf));
00490 
00491     if (buf == 0) {
00492       ACE_ERROR((LM_ERROR,
00493                  ACE_TEXT("PersistenceUpdater::requestImage(): allocation failed.\n")));
00494       return;
00495     }
00496 
00497     ACE_OS::memcpy(buf, actor->drdwQos.second.second, qos_len);
00498 
00499     BinSeq in_drdw_seq(qos_len, buf);
00500     QosSeq drdw_qos(actor->drdwQos.first, in_drdw_seq);
00501 
00502     qos_len = actor->transportInterfaceInfo.first;
00503     ACE_NEW_NORETURN(buf, char[qos_len]);
00504     qos_sequences.push_back(ArrDelAdapter<char>(buf));
00505 
00506     if (buf == 0) {
00507       ACE_ERROR((LM_ERROR,
00508                  ACE_TEXT("PersistenceUpdater::requestImage(): allocation failed.\n")));
00509       return;
00510     }
00511 
00512     ACE_OS::memcpy(buf, actor->transportInterfaceInfo.second, qos_len);
00513 
00514     BinSeq in_transport_seq(qos_len, buf);
00515 
00516     ContentSubscriptionBin in_csp_bin;
00517     if (actor->type == DataReader) {
00518       in_csp_bin.filterClassName = actor->contentSubscriptionProfile.filterClassName;
00519       in_csp_bin.filterExpr = actor->contentSubscriptionProfile.filterExpr;
00520       BinSeq& params = in_csp_bin.exprParams;
00521       ACE_NEW_NORETURN(params.second, char[params.first]);
00522       if (params.second == 0) {
00523         ACE_ERROR((LM_ERROR,
00524                    ACE_TEXT("PersistenceUpdater::requestImage(): allocation ")
00525                    ACE_TEXT("failed.\n")));
00526         return;
00527       }
00528       qos_sequences.push_back(ArrDelAdapter<char>(params.second));
00529       ACE_OS::memcpy(params.second,
00530         actor->contentSubscriptionProfile.exprParams.second, params.first);
00531     }
00532 
00533     DActor dActor(actor->domainId, actor->actorId, actor->topicId
00534                   , actor->participantId
00535                   , actor->type, actor->callback.c_str()
00536                   , pubsub_qos, drdw_qos, in_transport_seq, in_csp_bin);
00537     image.actors.push_back(dActor);
00538   }
00539 
00540   um_->pushImage(image);
00541 }
00542 
00543 void
00544 PersistenceUpdater::create(const UTopic& topic)
00545 {
00546   // serialize the Topic QOS
00547   TAO_OutputCDR outCdr;
00548   outCdr << topic.topicQos;
00549   ACE_Message_Block dst;
00550   ACE_CDR::consolidate(&dst, outCdr.begin());
00551 
00552   size_t len = dst.length();
00553   char *buf;
00554   ACE_NEW_NORETURN(buf, char[len]);
00555   ArrDelAdapter<char> guard(buf);
00556 
00557   if (buf == 0) {
00558     ACE_ERROR((LM_ERROR,
00559                ACE_TEXT("PersistenceUpdater::create( UTopic): allocation failed.\n")));
00560     return;
00561   }
00562 
00563   ACE_OS::memcpy(buf, dst.base(), len);
00564 
00565   BinSeq qos_bin(len, buf);
00566 
00567   QosSeq p(TopicQos, qos_bin);
00568   DTopic topic_data(topic.domainId, topic.topicId, topic.participantId
00569                     , topic.name.c_str(), topic.dataType.c_str(), p);
00570 
00571   // allocate memory for TopicData
00572   void* buffer;
00573   ACE_ALLOCATOR(buffer, allocator_->malloc
00574                 (sizeof(PersistenceUpdater::Topic)));
00575 
00576   // Initialize TopicData
00577   PersistenceUpdater::Topic* persistent_data
00578   = new(buffer) PersistenceUpdater::Topic(topic_data, allocator_);
00579 
00580   IdType_ExtId ext(topic_data.topicId);
00581 
00582   // bind TopicData with the topicId
00583   if (topic_index_->bind(ext, persistent_data, allocator_) != 0) {
00584     allocator_->free((void *) buffer);
00585     return;
00586   }
00587 }
00588 
00589 void
00590 PersistenceUpdater::create(const UParticipant& participant)
00591 {
00592   // serialize the Topic QOS
00593   TAO_OutputCDR outCdr;
00594   outCdr << participant.participantQos;
00595   ACE_Message_Block dst;
00596   ACE_CDR::consolidate(&dst, outCdr.begin());
00597 
00598   size_t len = dst.length();
00599   char *buf;
00600   ACE_NEW_NORETURN(buf, char[len]);
00601   ArrDelAdapter<char> guard(buf);
00602 
00603   if (buf == 0) {
00604     ACE_ERROR((LM_ERROR,
00605                ACE_TEXT("PersistenceUpdater::create( UParticipant): allocation failed.\n")));
00606     return;
00607   }
00608 
00609   ACE_OS::memcpy(buf, dst.base(), len);
00610 
00611   BinSeq qos_bin(len, buf);
00612 
00613   QosSeq p(ParticipantQos, qos_bin);
00614   DParticipant participant_data
00615   (participant.domainId, participant.owner, participant.participantId, p);
00616 
00617   // allocate memory for ParticipantData
00618   void* buffer;
00619   ACE_ALLOCATOR(buffer, allocator_->malloc
00620                 (sizeof(PersistenceUpdater::Participant)));
00621 
00622   // Initialize ParticipantData
00623   PersistenceUpdater::Participant* persistent_data
00624   = new(buffer) PersistenceUpdater::Participant(participant_data
00625                                                 , allocator_);
00626 
00627   IdType_ExtId ext(participant_data.participantId);
00628 
00629   // bind ParticipantData with the participantId
00630   if (participant_index_->bind(ext, persistent_data, allocator_) != 0) {
00631     allocator_->free((void *) buffer);
00632     return;
00633   }
00634 }
00635 
00636 void
00637 PersistenceUpdater::create(const URActor& actor)
00638 {
00639   TAO_OutputCDR outCdr;
00640   outCdr << actor.pubsubQos;
00641   ACE_Message_Block dst;
00642   ACE_CDR::consolidate(&dst, outCdr.begin());
00643 
00644   size_t len = dst.length();
00645   char *buf = new (std::nothrow) char[len];
00646 
00647   if (buf == 0) {
00648     ACE_ERROR((LM_ERROR, "PersistenceUpdater::create( subscription): allocation failed.\n"));
00649     return;
00650   }
00651 
00652   ArrDelAdapter<char> guard(buf);
00653 
00654   ACE_OS::memcpy(buf, dst.base(), len);
00655 
00656   BinSeq pubsub_qos_bin(len, buf);
00657   QosSeq pubsub_qos(SubscriberQos, pubsub_qos_bin);
00658 
00659   outCdr.reset();
00660   outCdr << actor.drdwQos;
00661   ACE_Message_Block dst2;
00662   ACE_CDR::consolidate(&dst2, outCdr.begin());
00663 
00664   len = dst2.length();
00665   char *buf2 = new (std::nothrow) char[len];
00666 
00667   if (buf2 == 0) {
00668     ACE_ERROR((LM_ERROR, "PersistenceUpdater::create( subscription): allocation failed.\n"));
00669     return;
00670   }
00671 
00672   ArrDelAdapter<char> guard2(buf2);
00673 
00674   ACE_OS::memcpy(buf2, dst2.base(), len);
00675 
00676   BinSeq dwdr_qos_bin(len, buf2);
00677   QosSeq dwdr_qos(DataReaderQos, dwdr_qos_bin);
00678 
00679   outCdr.reset();
00680   outCdr << actor.transportInterfaceInfo;
00681   ACE_Message_Block dst3;
00682   ACE_CDR::consolidate(&dst3, outCdr.begin());
00683 
00684   len = dst3.length();
00685   char *buf3 = new (std::nothrow) char[len];
00686 
00687   if (buf3 == 0) {
00688     ACE_ERROR((LM_ERROR, "PersistenceUpdater::create( subscription) allocation failed.\n"));
00689     return;
00690   }
00691 
00692   ArrDelAdapter<char> guard3(buf3);
00693 
00694   ACE_OS::memcpy(buf3, dst3.base(), len);
00695   BinSeq tr_bin(len, buf3);
00696 
00697   outCdr.reset();
00698   outCdr << actor.contentSubscriptionProfile.exprParams;
00699   ACE_Message_Block dst4;
00700   ACE_CDR::consolidate(&dst4, outCdr.begin());
00701   len = dst4.length();
00702   char* buf4 = new (std::nothrow) char[len];
00703   if (buf4 == 0) {
00704     ACE_ERROR((LM_ERROR, "PersistenceUpdater::create( subscription) allocation failed.\n"));
00705     return;
00706   }
00707   ArrDelAdapter<char> guard4(buf4);
00708 
00709   ACE_OS::memcpy(buf4, dst4.base(), len);
00710   ContentSubscriptionBin csp_bin;
00711   csp_bin.filterClassName = actor.contentSubscriptionProfile.filterClassName;
00712   csp_bin.filterExpr = actor.contentSubscriptionProfile.filterExpr;
00713   csp_bin.exprParams = std::make_pair(len, buf4);
00714 
00715   DActor actor_data(actor.domainId, actor.actorId, actor.topicId
00716                     , actor.participantId
00717                     , DataReader, actor.callback.c_str(), pubsub_qos
00718                     , dwdr_qos, tr_bin, csp_bin);
00719 
00720   // allocate memory for ActorData
00721   void* buffer;
00722   ACE_ALLOCATOR(buffer, allocator_->malloc
00723                 (sizeof(PersistenceUpdater::RWActor)));
00724 
00725   // Initialize ActorData
00726   PersistenceUpdater::RWActor* persistent_data =
00727     new(buffer) PersistenceUpdater::RWActor(actor_data
00728                                             , allocator_);
00729 
00730   IdType_ExtId ext(actor.actorId);
00731 
00732   // bind ActorData with the actorId
00733   if (actor_index_->bind(ext, persistent_data, allocator_) != 0) {
00734     allocator_->free((void *) buffer);
00735     return;
00736   }
00737 }
00738 
00739 void
00740 PersistenceUpdater::create(const UWActor& actor)
00741 {
00742   TAO_OutputCDR outCdr;
00743   outCdr << actor.pubsubQos;
00744   ACE_Message_Block dst;
00745   ACE_CDR::consolidate(&dst, outCdr.begin());
00746 
00747   size_t len = dst.length();
00748   char *buf = new (std::nothrow) char[len];
00749 
00750   if (buf == 0) {
00751     ACE_ERROR((LM_ERROR, "PersistenceUpdater::create( publication): allocation failed.\n"));
00752     return;
00753   }
00754 
00755   ArrDelAdapter<char> guard(buf);
00756 
00757   ACE_OS::memcpy(buf, dst.base(), len);
00758 
00759   BinSeq pubsub_qos_bin(len, buf);
00760   QosSeq pubsub_qos(PublisherQos, pubsub_qos_bin);
00761 
00762   outCdr.reset();
00763   outCdr << actor.drdwQos;
00764   ACE_Message_Block dst2;
00765   ACE_CDR::consolidate(&dst2, outCdr.begin());
00766 
00767   len = dst2.length();
00768   char *buf2 = new (std::nothrow) char[len];
00769 
00770   if (buf2 == 0) {
00771     ACE_ERROR((LM_ERROR, "PersistenceUpdater::create( publication): allocation failed.\n"));
00772     return;
00773   }
00774 
00775   ArrDelAdapter<char> guard2(buf2);
00776 
00777   ACE_OS::memcpy(buf2, dst2.base(), len);
00778 
00779   BinSeq dwdr_qos_bin(len, buf2);
00780   QosSeq dwdr_qos(DataWriterQos, dwdr_qos_bin);
00781 
00782   outCdr.reset();
00783   outCdr << actor.transportInterfaceInfo;
00784   ACE_Message_Block dst3;
00785   ACE_CDR::consolidate(&dst3, outCdr.begin());
00786 
00787   len = dst3.length();
00788   char *buf3 = new (std::nothrow) char[len];
00789 
00790   if (buf3 == 0) {
00791     ACE_ERROR((LM_ERROR, "PersistenceUpdater::create( publication): allocation failed.\n"));
00792     return;
00793   }
00794 
00795   ArrDelAdapter<char> guard3(buf3);
00796 
00797   ACE_OS::memcpy(buf3, dst3.base(), len);
00798   BinSeq tr_bin(len, buf3);
00799 
00800   DActor actor_data(actor.domainId, actor.actorId, actor.topicId
00801                     , actor.participantId
00802                     , DataWriter, actor.callback.c_str(), pubsub_qos
00803                     , dwdr_qos, tr_bin, ContentSubscriptionBin());
00804 
00805   // allocate memory for ActorData
00806   void* buffer;
00807   ACE_ALLOCATOR(buffer, allocator_->malloc
00808                 (sizeof(PersistenceUpdater::RWActor)));
00809 
00810   // Initialize ActorData
00811   PersistenceUpdater::RWActor* persistent_data =
00812     new(buffer) PersistenceUpdater::RWActor(actor_data
00813                                             , allocator_);
00814 
00815   IdType_ExtId ext(actor.actorId);
00816 
00817   // bind ActorData with the actorId
00818   if (actor_index_->bind(ext, persistent_data, allocator_) != 0) {
00819     allocator_->free((void *) buffer);
00820     return;
00821   }
00822 }
00823 
00824 void
00825 PersistenceUpdater::create(const OwnershipData& /* data */)
00826 {
00827   /* This method intentionally left unimplemented. */
00828 }
00829 
00830 void
00831 PersistenceUpdater::update(const IdPath& id, const DDS::DomainParticipantQos& qos)
00832 {
00833   IdType_ExtId ext(id.id);
00834   PersistenceUpdater::Participant* part_data = 0;
00835 
00836   if (this->participant_index_->find(ext, part_data, this->allocator_) ==  0) {
00837     TAO_OutputCDR outCdr;
00838     outCdr << qos;
00839     ACE_Message_Block dst;
00840     ACE_CDR::consolidate(&dst, outCdr.begin());
00841 
00842     this->storeUpdate(dst, part_data->participantQos.second);
00843 
00844   } else {
00845     OpenDDS::DCPS::RepoIdConverter converter(id.id);
00846     ACE_ERROR((LM_ERROR,
00847                ACE_TEXT("(%P|%t) PersistenceUpdater::update: ")
00848                ACE_TEXT("participant %C not found\n"),
00849                std::string(converter).c_str()));
00850   }
00851 }
00852 
00853 void
00854 PersistenceUpdater::update(const IdPath& id, const DDS::TopicQos& qos)
00855 {
00856   IdType_ExtId ext(id.id);
00857   PersistenceUpdater::Topic* topic_data = 0;
00858 
00859   if (this->topic_index_->find(ext, topic_data, this->allocator_) ==  0) {
00860     TAO_OutputCDR outCdr;
00861     outCdr << qos;
00862     ACE_Message_Block dst;
00863     ACE_CDR::consolidate(&dst, outCdr.begin());
00864 
00865     this->storeUpdate(dst, topic_data->topicQos.second);
00866 
00867   } else {
00868     OpenDDS::DCPS::RepoIdConverter converter(id.id);
00869     ACE_ERROR((LM_ERROR,
00870                ACE_TEXT("(%P|%t) PersistenceUpdater::update: ")
00871                ACE_TEXT("topic %C not found\n"),
00872                std::string(converter).c_str()));
00873   }
00874 }
00875 
00876 void
00877 PersistenceUpdater::update(const IdPath& id, const DDS::DataWriterQos& qos)
00878 {
00879   IdType_ExtId ext(id.id);
00880   PersistenceUpdater::RWActor* actor_data = 0;
00881 
00882   if (this->actor_index_->find(ext, actor_data, this->allocator_) ==  0) {
00883     TAO_OutputCDR outCdr;
00884     outCdr << qos;
00885     ACE_Message_Block dst;
00886     ACE_CDR::consolidate(&dst, outCdr.begin());
00887 
00888     this->storeUpdate(dst, actor_data->drdwQos.second);
00889 
00890   } else {
00891     OpenDDS::DCPS::RepoIdConverter converter(id.id);
00892     ACE_ERROR((LM_ERROR,
00893                ACE_TEXT("(%P|%t) PersistenceUpdater::update(writerQos): ")
00894                ACE_TEXT("publication %C not found\n"),
00895                std::string(converter).c_str()));
00896   }
00897 }
00898 
00899 void
00900 PersistenceUpdater::update(const IdPath& id, const DDS::PublisherQos& qos)
00901 {
00902   IdType_ExtId ext(id.id);
00903   PersistenceUpdater::RWActor* actor_data = 0;
00904 
00905   if (this->actor_index_->find(ext, actor_data, this->allocator_) ==  0) {
00906     TAO_OutputCDR outCdr;
00907     outCdr << qos;
00908     ACE_Message_Block dst;
00909     ACE_CDR::consolidate(&dst, outCdr.begin());
00910 
00911     this->storeUpdate(dst, actor_data->pubsubQos.second);
00912 
00913   } else {
00914     OpenDDS::DCPS::RepoIdConverter converter(id.id);
00915     ACE_ERROR((LM_ERROR,
00916                ACE_TEXT("(%P|%t) PersistenceUpdater::update(publisherQos): ")
00917                ACE_TEXT("publication %C not found\n"),
00918                std::string(converter).c_str()));
00919   }
00920 }
00921 
00922 void
00923 PersistenceUpdater::update(const IdPath& id, const DDS::DataReaderQos& qos)
00924 {
00925   IdType_ExtId ext(id.id);
00926   PersistenceUpdater::RWActor* actor_data = 0;
00927 
00928   if (this->actor_index_->find(ext, actor_data, this->allocator_) ==  0) {
00929     TAO_OutputCDR outCdr;
00930     outCdr << qos;
00931     ACE_Message_Block dst;
00932     ACE_CDR::consolidate(&dst, outCdr.begin());
00933 
00934     this->storeUpdate(dst, actor_data->drdwQos.second);
00935 
00936   } else {
00937     OpenDDS::DCPS::RepoIdConverter converter(id.id);
00938     ACE_ERROR((LM_ERROR,
00939                ACE_TEXT("(%P|%t) PersistenceUpdater::update(readerQos): ")
00940                ACE_TEXT("subscription %C not found\n"),
00941                std::string(converter).c_str()));
00942   }
00943 }
00944 
00945 void
00946 PersistenceUpdater::update(const IdPath& id, const DDS::SubscriberQos& qos)
00947 {
00948   IdType_ExtId ext(id.id);
00949   PersistenceUpdater::RWActor* actor_data = 0;
00950 
00951   if (this->actor_index_->find(ext, actor_data, this->allocator_) ==  0) {
00952     TAO_OutputCDR outCdr;
00953     outCdr << qos;
00954     ACE_Message_Block dst;
00955     ACE_CDR::consolidate(&dst, outCdr.begin());
00956 
00957     this->storeUpdate(dst, actor_data->pubsubQos.second);
00958 
00959   } else {
00960     OpenDDS::DCPS::RepoIdConverter converter(id.id);
00961     ACE_ERROR((LM_ERROR,
00962                ACE_TEXT("(%P|%t) PersistenceUpdater::update(subscriberQos): ")
00963                ACE_TEXT("subscription %C not found\n"),
00964                std::string(converter).c_str()));
00965   }
00966 }
00967 
00968 void
00969 PersistenceUpdater::update(const IdPath& id, const DDS::StringSeq& exprParams)
00970 {
00971   IdType_ExtId ext(id.id);
00972   PersistenceUpdater::RWActor* actor_data = 0;
00973 
00974   if (actor_index_->find(ext, actor_data, allocator_) ==  0) {
00975     TAO_OutputCDR outCdr;
00976     outCdr << exprParams;
00977     ACE_Message_Block dst;
00978     ACE_CDR::consolidate(&dst, outCdr.begin());
00979 
00980     storeUpdate(dst, actor_data->contentSubscriptionProfile.exprParams);
00981 
00982   } else {
00983     OpenDDS::DCPS::RepoIdConverter converter(id.id);
00984     ACE_ERROR((LM_ERROR,
00985                ACE_TEXT("(%P|%t) PersistenceUpdater::update(readerQos): ")
00986                ACE_TEXT("subscription %C not found\n"),
00987                std::string(converter).c_str()));
00988   }
00989 }
00990 
00991 void
00992 PersistenceUpdater::destroy(const IdPath& id, ItemType type, ActorType)
00993 {
00994   IdType_ExtId ext(id.id);
00995   PersistenceUpdater::Topic* topic = 0;
00996   PersistenceUpdater::Participant* participant = 0;
00997   PersistenceUpdater::RWActor* actor = 0;
00998 
00999   switch (type) {
01000   case Update::Topic:
01001 
01002     if (topic_index_->unbind(ext, topic, allocator_) == 0) {
01003       topic->cleanup(allocator_);
01004       allocator_->free((void *) topic);
01005     }
01006 
01007     break;
01008   case Update::Participant:
01009 
01010     if (participant_index_->unbind(ext, participant, allocator_) == 0) {
01011       participant->cleanup(allocator_);
01012       allocator_->free((void *) participant);
01013     }
01014 
01015     break;
01016   case Update::Actor:
01017 
01018     if (actor_index_->unbind(ext, actor, allocator_) == 0) {
01019       actor->cleanup(allocator_);
01020       allocator_->free((void *) actor);
01021     }
01022 
01023     break;
01024   default: {
01025     OpenDDS::DCPS::RepoIdConverter converter(id.id);
01026     ACE_ERROR((LM_ERROR,
01027                ACE_TEXT("(%P | %t) PersistenceUpdater::destroy: ")
01028                ACE_TEXT("unknown entity - %C.\n"),
01029                std::string(converter).c_str()));
01030   }
01031   }
01032 }
01033 
01034 void
01035 PersistenceUpdater::storeUpdate(const ACE_Message_Block& data, BinSeq& storage)
01036 {
01037   size_t len = data.length();
01038 
01039   void* buffer;
01040   ACE_ALLOCATOR(buffer, this->allocator_->malloc(len));
01041   ACE_OS::memcpy(buffer, data.base(), len);
01042 
01043   storage.first  = len;
01044   storage.second = static_cast<char*>(buffer);
01045 }
01046 
01047 } // namespace Update
01048 
01049 int
01050 PersistenceUpdaterSvc_Loader::init()
01051 {
01052   return ACE_Service_Config::process_directive
01053          (ace_svc_desc_PersistenceUpdaterSvc);
01054   return 0;
01055 }
01056 
01057 // from the "ACE Programmers Guide (P. 424)
01058 ACE_FACTORY_DEFINE(ACE_Local_Service, PersistenceUpdaterSvc)
01059 
01060 ACE_STATIC_SVC_DEFINE(PersistenceUpdaterSvc,
01061                       ACE_TEXT("PersistenceUpdaterSvc"),
01062                       ACE_SVC_OBJ_T,
01063                       &ACE_SVC_NAME(PersistenceUpdaterSvc),
01064                       ACE_Service_Type::DELETE_THIS |
01065                       ACE_Service_Type::DELETE_OBJ,
01066                       0)
01067 
01068 ACE_STATIC_SVC_REQUIRE(PersistenceUpdaterSvc)

Generated on Fri Feb 12 20:05:24 2016 for OpenDDS by  doxygen 1.4.7