OpenDDS  Snapshot(2023/04/28-20:55)
PersistenceUpdater.cpp
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #include "DcpsInfo_pch.h"
9 
10 #include "PersistenceUpdater.h"
11 #include "UpdateManager.h"
12 
14 #include "dds/DCPS/GuidUtils.h"
15 #include "dds/DCPS/debug.h"
16 
17 #include "ace/Malloc_T.h"
18 #include "ace/MMAP_Memory_Pool.h"
19 #include "ace/OS_NS_strings.h"
20 #include "ace/Svc_Handler.h"
21 #include "ace/Dynamic_Service.h"
22 
23 #include <algorithm>
24 
26 
27 namespace {
28  void assign(Update::BinSeq& to, const Update::BinSeq& from,
30  {
31  const size_t len = from.first;
32  void* out_buf;
33  ACE_ALLOCATOR(out_buf, allocator->malloc(len));
34  ACE_OS::memcpy(out_buf, from.second, len);
35  to = std::make_pair(len, static_cast<char*>(out_buf));
36  }
37 
38  void assign(ACE_CString& to, const char* from,
40  {
41  const size_t len = ACE_OS::strlen (from) + 1;
42  void* out_buf;
43  ACE_ALLOCATOR(out_buf, allocator->malloc(len));
44  ACE_OS::memcpy(out_buf, from, len);
45  to.set(static_cast<char*>(out_buf), len - 1, false);
46  }
47 }
48 
49 namespace Update {
50 
51 // Template specializations with custom constructors
52 // and cleanup methods
53 template<>
61 
62  TopicStrt(const DTopic& topic,
64  : domainId(topic.domainId),
65  topicId(topic.topicId),
66  participantId(topic.participantId)
67  {
68  assign(name, topic.name.c_str(), allocator);
69  assign(dataType, topic.dataType.c_str(), allocator);
70 
71  topicQos.first = TopicQos;
72  assign(topicQos.second, topic.topicQos.second, allocator);
73  }
74 
76  {
77  if (name.length() > 0)
78  {
79  char* strMemory = const_cast<char*>(name.fast_rep());
80  name.fast_clear();
81  allocator->free(strMemory);
82  }
83  if (dataType.length() > 0)
84  {
85  char* strMemory = const_cast<char*>(dataType.fast_rep());
86  dataType.fast_clear();
87  allocator->free(strMemory);
88  }
89 
90  allocator->free(topicQos.second.second);
91  }
92 };
93 
94 template<>
97  long owner;
100 
102  long own,
103  const IdType& pId,
104  const QosSeq& pQos)
105  : domainId(dId),
106  owner(own),
107  participantId(pId),
108  participantQos(pQos) {}
109 
110  ParticipantStrt(const DParticipant& participant,
112  : domainId(participant.domainId),
113  owner(participant.owner),
114  participantId(participant.participantId)
115  {
116  participantQos.first = ParticipantQos;
117  assign(participantQos.second, participant.participantQos.second, allocator);
118  }
119 
121  {
122  allocator->free(participantQos.second.second);
123  }
124 };
125 
126 template<>
129  BinSeq> {
142 
143  ActorStrt(const DActor& actor,
145  : domainId(actor.domainId),
146  actorId(actor.actorId),
147  topicId(actor.topicId),
148  participantId(actor.participantId), type(actor.type)
149  , transportContext(actor.transportContext)
150  {
151  assign(callback, actor.callback.c_str(), allocator);
152 
153  pubsubQos.first = actor.pubsubQos.first;
154  assign(pubsubQos.second, actor.pubsubQos.second, allocator);
155 
156  drdwQos.first = actor.drdwQos.first;
157  assign(drdwQos.second, actor.drdwQos.second, allocator);
158 
159  assign(transportInterfaceInfo, actor.transportInterfaceInfo, allocator);
160 
161  contentSubscriptionProfile.filterClassName =
162  ACE_CString(actor.contentSubscriptionProfile.filterClassName.c_str(),
163  allocator);
164  contentSubscriptionProfile.filterExpr =
165  ACE_CString(actor.contentSubscriptionProfile.filterExpr.c_str(),
166  allocator);
167  assign(contentSubscriptionProfile.exprParams,
168  actor.contentSubscriptionProfile.exprParams, allocator);
169  }
170 
172  {
173  if (callback.length() > 0)
174  {
175  char* strMemory = const_cast<char*>(callback.fast_rep());
176  callback.fast_clear();
177  allocator->free(strMemory);
178  }
179 
180  allocator->free(pubsubQos.second.second);
181  allocator->free(drdwQos.second.second);
182  allocator->free(transportInterfaceInfo.second);
183  allocator->free(contentSubscriptionProfile.exprParams.second);
184  }
185 };
186 
188  : id_(OpenDDS::DCPS::GUID_UNKNOWN)
189 {}
190 
192  : id_(id)
193 {}
194 
196  : id_(ext.id_)
197 {}
198 
199 void
201 {
202  id_ = ext.id_;
203 }
204 
205 bool
207 {
208  return (id_ == ext.id_);
209 }
210 
211 unsigned long
213 {
215 };
216 
218  : persistence_file_(ACE_TEXT("InforepoPersist"))
219  , reset_(false)
220  , um_(0)
221  , topic_index_(0)
222  , participant_index_(0)
223  , actor_index_(0)
224  , last_part_id_(0)
225 {}
226 
228 {}
229 
230 // utility functions
231 void* createIndex(const std::string& tag
232  , PersistenceUpdater::ALLOCATOR& allocator
233  , size_t size, bool& exists)
234 {
235  void* index = 0;
236 
237  // This is the easy case since if we find hash table in the
238  // memory-mapped file we know it's already initialized.
239  if (allocator.find(tag.c_str(), index) == 0) {
240  exists = true;
241  return index;
242 
243  } else {
244  ACE_ALLOCATOR_RETURN(index, allocator.malloc(size), 0);
245 
246  if (allocator.bind(tag.c_str(), index) == -1) {
247  allocator.free(index);
248  index = 0;
249  }
250  }
251 
252  if (!index) {
254  ACE_TEXT("(%P|%t) ERROR: PersistenceUpdater::init: ")
255  ACE_TEXT("Initial allocation/Bind failed for %C.\n"),
256  tag.c_str()));
257  }
258 
259  return index;
260 }
261 
262 template<typename I> void
264  , PersistenceUpdater::ALLOCATOR* allocator)
265 {
266  for (typename I::ITERATOR iter = index->begin()
267  ; iter != index->end();) {
268  typename I::ITERATOR current_iter = iter;
269  iter++;
270 
271  if (index->unbind((*current_iter).ext_id_, allocator) != 0) {
273  ACE_TEXT("(%P|%t) ERROR: PersistenceUpdater::init: ")
274  ACE_TEXT("Index unbind failed.\n")));
275  }
276  }
277 }
278 
279 int
281 {
282  // discover the UpdateManager
284 
285  if (um_ == 0) {
286  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: PersistenceUpdater::init ")
287  ACE_TEXT("No UpdateManager discovered.\n")));
288  return -1;
289  }
290 
291  this->parse(argc, argv);
292 
293 #if defined ACE_HAS_MAC_OSX && defined __x86_64__ && __x86_64__
294  ACE_MMAP_Memory_Pool::OPTIONS options((void*)0x200000000);
295 #else
297 #endif
298 
299  // Create the allocator with the appropriate options. The name used
300  // for the lock is the same as one used for the file.
301  ALLOCATOR* allocator;
302  ACE_NEW_RETURN(allocator,
305  &options),
306  -1);
307  allocator_.reset(allocator);
308 
309  bool exists = false;
310 
311  char* topic_index = (char*)createIndex("TopicIndex", *allocator_
312  , sizeof(TopicIndex), exists);
313  if (!topic_index) {
314  return -1;
315  }
316 
317  char* participant_index = (char*)createIndex("ParticipantIndex", *allocator_
318  , sizeof(ParticipantIndex), exists);
319  if (!participant_index) {
320  return -1;
321  }
322 
323  char* actor_index = (char*)createIndex("ActorIndex", *allocator_
324  , sizeof(ActorIndex), exists);
325  if (!actor_index) {
326  return -1;
327  }
328 
329  void* last_part_id = createIndex(
330  "LastParticipantId", *allocator_, sizeof(PartIdType), exists);
331  if (!last_part_id) {
332  return -1;
333  }
334  last_part_id_ = reinterpret_cast<PartIdType*>(last_part_id);
335 
336  if (exists) {
337  topic_index_ = reinterpret_cast<TopicIndex*>(topic_index);
338  participant_index_ = reinterpret_cast<ParticipantIndex*>(participant_index);
339  actor_index_ = reinterpret_cast<ActorIndex*>(actor_index);
340  } else {
341  topic_index_ = new(topic_index) TopicIndex(allocator_.get());
342  participant_index_ = new(participant_index) ParticipantIndex(allocator_.get());
343  actor_index_ = new(actor_index) ActorIndex(allocator_.get());
344  *last_part_id_ = 0;
345  }
346 
347  if (reset_) {
351  *last_part_id_ = 0;
352  }
353 
354  // lastly register the callback
355  um_->add(this);
356 
357  return 0;
358 }
359 
360 int
362 {
363  for (ssize_t count = 0; count < argc; count++) {
364  if (ACE_OS::strcasecmp(argv[count], ACE_TEXT("-file")) == 0) {
365  if ((count + 1) < argc) {
366  persistence_file_ = argv[count+1];
367  count++;
368  }
369 
370  } else if (ACE_OS::strcasecmp(argv[count], ACE_TEXT("-reset")) == 0) {
371  if ((count + 1) < argc) {
372  int val = ACE_OS::atoi(argv[count+1]);
373  reset_ = true;
374 
375  if (val == 0) {
376  reset_ = false;
377  }
378 
379  count++;
380  }
381 
382  } else {
383  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) PersistenceUpdater::parse: Unknown option %s\n")
384  , argv[count]));
385  return -1;
386  }
387  }
388 
389  return 0;
390 }
391 
392 int
394 {
395  return 0;
396 }
397 
398 int
400 {
401  return 0;
402 }
403 
404 void
406 {
407  if (um_ == NULL) {
408  return;
409  }
410 
411  DImage image;
412 
414  iter != participant_index_->end(); iter++) {
415  const PersistenceUpdater::Participant* participant = iter->int_id_;
416 
417  const BinSeq& in_seq = participant->participantQos.second;
418  const QosSeq qos(ParticipantQos, in_seq);
419  const DParticipant dparticipant(participant->domainId
420  , participant->owner
421  , participant->participantId
422  , qos);
423  image.participants.push_back(dparticipant);
427  "(%P|%t) PersistenceUpdater::requestImage(): loaded participant %C\n",
428  OPENDDS_STRING(conv).c_str()));
429  }
430  }
431 
432  for (TopicIndex::ITERATOR iter = topic_index_->begin();
433  iter != topic_index_->end(); iter++) {
434  const PersistenceUpdater::Topic* topic = iter->int_id_;
435 
436  const BinSeq& in_seq = topic->topicQos.second;
437  const QosSeq qos(TopicQos, in_seq);
438  const DTopic dTopic(topic->domainId, topic->topicId
439  , topic->participantId, topic->name.c_str()
440  , topic->dataType.c_str(), qos);
441  image.topics.push_back(dTopic);
442  }
443 
444  for (ActorIndex::ITERATOR iter = actor_index_->begin();
445  iter != actor_index_->end(); iter++) {
446  const PersistenceUpdater::RWActor* actor = iter->int_id_;
447 
448  const BinSeq& in_pubsub_seq = actor->pubsubQos.second;
449  const QosSeq pubsub_qos(actor->pubsubQos.first, in_pubsub_seq);
450 
451  const BinSeq& in_drdw_seq = actor->drdwQos.second;
452  const QosSeq drdw_qos(actor->drdwQos.first, in_drdw_seq);
453 
454  const BinSeq& in_transport_seq = actor->transportInterfaceInfo;
455  const BinSeq& in_type_info = actor->serializedTypeInfo;
456 
457  ContentSubscriptionBin in_csp_bin;
458  if (actor->type == DataReader) {
462  }
463 
464  const DActor dActor(actor->domainId, actor->actorId, actor->topicId
465  , actor->participantId
466  , actor->type, actor->callback.c_str()
467  , pubsub_qos, drdw_qos, in_transport_seq, actor->transportContext, in_csp_bin
468  , in_type_info);
469  image.actors.push_back(dActor);
470  }
471 
472  image.lastPartId = *last_part_id_;
473 
474  um_->pushImage(image);
475 }
476 
477 void
479 {
480  // serialize the Topic QOS
481  TAO_OutputCDR outCdr;
482  outCdr << topic.topicQos;
483  ACE_Message_Block dst;
484  ACE_CDR::consolidate(&dst, outCdr.begin());
485 
486  const BinSeq qos_bin(dst.length(), dst.base());
487 
488  const QosSeq p(TopicQos, qos_bin);
489  const DTopic topic_data(topic.domainId, topic.topicId, topic.participantId
490  , topic.name.c_str(), topic.dataType.c_str(), p);
491 
492  // allocate memory for TopicData
493  void* buffer;
494  ACE_ALLOCATOR(buffer, allocator_->malloc
495  (sizeof(PersistenceUpdater::Topic)));
496 
497  // Initialize TopicData
498  PersistenceUpdater::Topic* persistent_data
499  = new(buffer) PersistenceUpdater::Topic(topic_data, allocator_.get());
500 
501  IdType_ExtId ext(topic_data.topicId);
502 
503  // bind TopicData with the topicId
504  if (topic_index_->bind(ext, persistent_data, allocator_.get()) != 0) {
505  allocator_->free((void *) buffer);
506  return;
507  }
508 }
509 
510 void
512 {
513  // serialize the Topic QOS
514  TAO_OutputCDR outCdr;
515  outCdr << participant.participantQos;
516  ACE_Message_Block dst;
517  ACE_CDR::consolidate(&dst, outCdr.begin());
518 
519  const BinSeq qos_bin(dst.length(), dst.base());
520 
521  QosSeq p(ParticipantQos, qos_bin);
522  DParticipant participant_data
523  (participant.domainId, participant.owner, participant.participantId, p);
524 
525  // allocate memory for ParticipantData
526  void* buffer;
527  ACE_ALLOCATOR(buffer, allocator_->malloc
529 
530  // Initialize ParticipantData
531  PersistenceUpdater::Participant* persistent_data
532  = new(buffer) PersistenceUpdater::Participant(participant_data
533  , allocator_.get());
534 
535  IdType_ExtId ext(participant_data.participantId);
536 
537  // bind ParticipantData with the participantId
538  if (participant_index_->bind(ext, persistent_data, allocator_.get()) != 0) {
539  allocator_->free((void *) buffer);
540  return;
541  }
542 }
543 
544 void
546 {
547  TAO_OutputCDR outCdr;
548  outCdr << actor.pubsubQos;
549  ACE_Message_Block dst;
550  ACE_CDR::consolidate(&dst, outCdr.begin());
551 
552  const BinSeq pubsub_qos_bin(dst.length(), dst.base());
553  const QosSeq pubsub_qos(SubscriberQos, pubsub_qos_bin);
554 
555  outCdr.reset();
556  outCdr << actor.drdwQos;
557  ACE_Message_Block dst2;
558  ACE_CDR::consolidate(&dst2, outCdr.begin());
559 
560  const BinSeq dwdr_qos_bin(dst2.length(), dst2.base());
561  const QosSeq dwdr_qos(DataReaderQos, dwdr_qos_bin);
562 
563  outCdr.reset();
564  outCdr << actor.transportInterfaceInfo;
565  ACE_Message_Block dst3;
566  ACE_CDR::consolidate(&dst3, outCdr.begin());
567 
568  const BinSeq tr_bin(dst3.length(), dst3.base());
569 
570  outCdr.reset();
571  outCdr << actor.contentSubscriptionProfile.exprParams;
572  ACE_Message_Block dst4;
573  ACE_CDR::consolidate(&dst4, outCdr.begin());
574 
575  ContentSubscriptionBin csp_bin;
576  csp_bin.filterClassName = actor.contentSubscriptionProfile.filterClassName;
577  csp_bin.filterExpr = actor.contentSubscriptionProfile.filterExpr;
578  csp_bin.exprParams = std::make_pair(dst4.length(), dst4.base());
579 
580  outCdr.reset();
581  outCdr << actor.serializedTypeInfo;
582  ACE_Message_Block dst5;
583  ACE_CDR::consolidate(&dst5, outCdr.begin());
584 
585  const BinSeq ti_seq(dst5.length(), dst5.base());
586 
587 
588  const DActor actor_data(actor.domainId, actor.actorId, actor.topicId
589  , actor.participantId
590  , DataReader, actor.callback.c_str(), pubsub_qos
591  , dwdr_qos, tr_bin, actor.transportContext, csp_bin
592  , ti_seq);
593 
594  // allocate memory for ActorData
595  void* buffer;
596  ACE_ALLOCATOR(buffer, allocator_->malloc
597  (sizeof(PersistenceUpdater::RWActor)));
598 
599  // Initialize ActorData
600  PersistenceUpdater::RWActor* persistent_data =
601  new(buffer) PersistenceUpdater::RWActor(actor_data
602  , allocator_.get());
603 
604  IdType_ExtId ext(actor.actorId);
605 
606  // bind ActorData with the actorId
607  if (actor_index_->bind(ext, persistent_data, allocator_.get()) != 0) {
608  allocator_->free((void *) buffer);
609  return;
610  }
611 }
612 
613 void
615 {
616  TAO_OutputCDR outCdr;
617  outCdr << actor.pubsubQos;
618  ACE_Message_Block dst;
619  ACE_CDR::consolidate(&dst, outCdr.begin());
620 
621  const BinSeq pubsub_qos_bin(dst.length(), dst.base());
622  const QosSeq pubsub_qos(PublisherQos, pubsub_qos_bin);
623 
624  outCdr.reset();
625  outCdr << actor.drdwQos;
626  ACE_Message_Block dst2;
627  ACE_CDR::consolidate(&dst2, outCdr.begin());
628 
629  const BinSeq dwdr_qos_bin(dst2.length(), dst2.base());
630  const QosSeq dwdr_qos(DataWriterQos, dwdr_qos_bin);
631 
632  outCdr.reset();
633  outCdr << actor.transportInterfaceInfo;
634  ACE_Message_Block dst3;
635  ACE_CDR::consolidate(&dst3, outCdr.begin());
636 
637  const BinSeq tr_bin(dst3.length(), dst3.base());
638 
639  outCdr.reset();
640  outCdr << actor.serializedTypeInfo;
641  ACE_Message_Block dst4;
642  ACE_CDR::consolidate(&dst4, outCdr.begin());
643 
644  const BinSeq ti_seq(dst4.length(), dst4.base());
645 
646  const DActor actor_data(actor.domainId, actor.actorId, actor.topicId
647  , actor.participantId
648  , DataWriter, actor.callback.c_str(), pubsub_qos
649  , dwdr_qos, tr_bin, actor.transportContext, ContentSubscriptionBin()
650  , ti_seq);
651 
652  // allocate memory for ActorData
653  void* buffer;
654  ACE_ALLOCATOR(buffer, allocator_->malloc
655  (sizeof(PersistenceUpdater::RWActor)));
656 
657  // Initialize ActorData
658  PersistenceUpdater::RWActor* persistent_data =
659  new(buffer) PersistenceUpdater::RWActor(actor_data
660  , allocator_.get());
661 
662  IdType_ExtId ext(actor.actorId);
663 
664  // bind ActorData with the actorId
665  if (actor_index_->bind(ext, persistent_data, allocator_.get()) != 0) {
666  allocator_->free((void *) buffer);
667  return;
668  }
669 }
670 
671 void
673 {
674  /* This method intentionally left unimplemented. */
675 }
676 
677 void
679 {
680  IdType_ExtId ext(id.id);
681  PersistenceUpdater::Participant* part_data = 0;
682 
683  if (this->participant_index_->find(ext, part_data, this->allocator_.get()) == 0) {
684  TAO_OutputCDR outCdr;
685  outCdr << qos;
686  ACE_Message_Block dst;
687  ACE_CDR::consolidate(&dst, outCdr.begin());
688 
689  this->storeUpdate(dst, part_data->participantQos.second);
690 
691  } else {
692  OpenDDS::DCPS::RepoIdConverter converter(id.id);
694  ACE_TEXT("(%P|%t) PersistenceUpdater::update: ")
695  ACE_TEXT("participant %C not found\n"),
696  std::string(converter).c_str()));
697  }
698 }
699 
700 void
702 {
703  IdType_ExtId ext(id.id);
704  PersistenceUpdater::Topic* topic_data = 0;
705 
706  if (this->topic_index_->find(ext, topic_data, this->allocator_.get()) == 0) {
707  TAO_OutputCDR outCdr;
708  outCdr << qos;
709  ACE_Message_Block dst;
710  ACE_CDR::consolidate(&dst, outCdr.begin());
711 
712  this->storeUpdate(dst, topic_data->topicQos.second);
713 
714  } else {
715  OpenDDS::DCPS::RepoIdConverter converter(id.id);
717  ACE_TEXT("(%P|%t) PersistenceUpdater::update: ")
718  ACE_TEXT("topic %C not found\n"),
719  std::string(converter).c_str()));
720  }
721 }
722 
723 void
725 {
726  IdType_ExtId ext(id.id);
727  PersistenceUpdater::RWActor* actor_data = 0;
728 
729  if (this->actor_index_->find(ext, actor_data, this->allocator_.get()) == 0) {
730  TAO_OutputCDR outCdr;
731  outCdr << qos;
732  ACE_Message_Block dst;
733  ACE_CDR::consolidate(&dst, outCdr.begin());
734 
735  this->storeUpdate(dst, actor_data->drdwQos.second);
736 
737  } else {
738  OpenDDS::DCPS::RepoIdConverter converter(id.id);
740  ACE_TEXT("(%P|%t) PersistenceUpdater::update(writerQos): ")
741  ACE_TEXT("publication %C not found\n"),
742  std::string(converter).c_str()));
743  }
744 }
745 
746 void
748 {
749  IdType_ExtId ext(id.id);
750  PersistenceUpdater::RWActor* actor_data = 0;
751 
752  if (this->actor_index_->find(ext, actor_data, this->allocator_.get()) == 0) {
753  TAO_OutputCDR outCdr;
754  outCdr << qos;
755  ACE_Message_Block dst;
756  ACE_CDR::consolidate(&dst, outCdr.begin());
757 
758  this->storeUpdate(dst, actor_data->pubsubQos.second);
759 
760  } else {
761  OpenDDS::DCPS::RepoIdConverter converter(id.id);
763  ACE_TEXT("(%P|%t) PersistenceUpdater::update(publisherQos): ")
764  ACE_TEXT("publication %C not found\n"),
765  std::string(converter).c_str()));
766  }
767 }
768 
769 void
771 {
772  IdType_ExtId ext(id.id);
773  PersistenceUpdater::RWActor* actor_data = 0;
774 
775  if (this->actor_index_->find(ext, actor_data, this->allocator_.get()) == 0) {
776  TAO_OutputCDR outCdr;
777  outCdr << qos;
778  ACE_Message_Block dst;
779  ACE_CDR::consolidate(&dst, outCdr.begin());
780 
781  this->storeUpdate(dst, actor_data->drdwQos.second);
782 
783  } else {
784  OpenDDS::DCPS::RepoIdConverter converter(id.id);
786  ACE_TEXT("(%P|%t) PersistenceUpdater::update(readerQos): ")
787  ACE_TEXT("subscription %C not found\n"),
788  std::string(converter).c_str()));
789  }
790 }
791 
792 void
794 {
795  IdType_ExtId ext(id.id);
796  PersistenceUpdater::RWActor* actor_data = 0;
797 
798  if (this->actor_index_->find(ext, actor_data, this->allocator_.get()) == 0) {
799  TAO_OutputCDR outCdr;
800  outCdr << qos;
801  ACE_Message_Block dst;
802  ACE_CDR::consolidate(&dst, outCdr.begin());
803 
804  this->storeUpdate(dst, actor_data->pubsubQos.second);
805 
806  } else {
807  OpenDDS::DCPS::RepoIdConverter converter(id.id);
809  ACE_TEXT("(%P|%t) PersistenceUpdater::update(subscriberQos): ")
810  ACE_TEXT("subscription %C not found\n"),
811  std::string(converter).c_str()));
812  }
813 }
814 
815 void
816 PersistenceUpdater::update(const IdPath& id, const DDS::StringSeq& exprParams)
817 {
818  IdType_ExtId ext(id.id);
819  PersistenceUpdater::RWActor* actor_data = 0;
820 
821  if (actor_index_->find(ext, actor_data, allocator_.get()) == 0) {
822  TAO_OutputCDR outCdr;
823  outCdr << exprParams;
824  ACE_Message_Block dst;
825  ACE_CDR::consolidate(&dst, outCdr.begin());
826 
828 
829  } else {
830  OpenDDS::DCPS::RepoIdConverter converter(id.id);
832  ACE_TEXT("(%P|%t) PersistenceUpdater::update(readerQos): ")
833  ACE_TEXT("subscription %C not found\n"),
834  std::string(converter).c_str()));
835  }
836 }
837 
838 void
840 {
841  IdType_ExtId ext(id.id);
842  PersistenceUpdater::Topic* topic = 0;
843  PersistenceUpdater::Participant* participant = 0;
844  PersistenceUpdater::RWActor* actor = 0;
845 
846  switch (type) {
847  case Update::Topic:
848 
849  if (topic_index_->unbind(ext, topic, allocator_.get()) == 0) {
850  topic->cleanup(allocator_.get());
851  allocator_->free((void *) topic);
852  }
853 
854  break;
855  case Update::Participant:
856 
857  if (participant_index_->unbind(ext, participant, allocator_.get()) == 0) {
858  participant->cleanup(allocator_.get());
859  allocator_->free((void *) participant);
860  }
861 
862  break;
863  case Update::Actor:
864 
865  if (actor_index_->unbind(ext, actor, allocator_.get()) == 0) {
866  actor->cleanup(allocator_.get());
867  allocator_->free((void *) actor);
868  }
869 
870  break;
871  default: {
872  OpenDDS::DCPS::RepoIdConverter converter(id.id);
874  ACE_TEXT("(%P|%t) PersistenceUpdater::destroy: ")
875  ACE_TEXT("unknown entity - %C.\n"),
876  std::string(converter).c_str()));
877  }
878  }
879 }
880 
881 void
883 {
884  size_t len = data.length();
885 
886  void* buffer;
887  ACE_ALLOCATOR(buffer, this->allocator_->malloc(len));
888  ACE_OS::memcpy(buffer, data.base(), len);
889 
890  storage.first = len;
891  storage.second = static_cast<char*>(buffer);
892 }
893 
895 {
896  *last_part_id_ = partId;
897 }
898 
899 } // namespace Update
900 
901 int
903 {
905  (ace_svc_desc_PersistenceUpdaterSvc);
906 }
907 
908 // from the "ACE Programmers Guide (P. 424)
909 ACE_FACTORY_DEFINE(ACE_Local_Service, PersistenceUpdaterSvc)
910 
912  ACE_TEXT("PersistenceUpdaterSvc"),
917  0)
918 
920 
#define ACE_DEBUG(X)
ParticipantStrt(const DParticipant &participant, PersistenceUpdater::ALLOCATOR *allocator)
#define ACE_ERROR(X)
const char * c_str(void) const
#define ACE_STATIC_SVC_REQUIRE(SERVICE_CLASS)
void reset(void)
ACE_CDR::ULong transportContext
size_t length(void) const
ACE_FACTORY_DEFINE(OpenDDS_InfoRepoDiscovery, IRDiscoveryLoader)
virtual int fini()
ACE_Task_Base finish method.
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
ActorStrt(const DActor &actor, PersistenceUpdater::ALLOCATOR *allocator)
int unbind(const EXT_ID &, INT_ID &, ACE_Allocator *alloc)
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
#define I(x, y, z)
Definition: Hash.cpp:77
& ACE_SVC_NAME(TAO_AV_TCP_Factory)
TopicIndex * topic_index_
Persisted Topics.
static int process_directive(const ACE_TCHAR directive[])
void index_cleanup(I *index, PersistenceUpdater::ALLOCATOR *allocator)
void * memcpy(void *t, const void *s, size_t len)
PartIdType * last_part_id_
What the last participant id is/was.
long PartIdType
ACE_Hash_Map_With_Allocator< IdType_ExtId, Topic * > TopicIndex
ParticipantStrt(const DDS::DomainId_t &dId, long own, const IdType &pId, const QosSeq &pQos)
int bind(const EXT_ID &, const INT_ID &, ACE_Allocator *alloc)
static TYPE * instance(const ACE_TCHAR *name)
int find(const EXT_ID &, INT_ID &, ACE_Allocator *alloc)
void * createIndex(const std::string &tag, PersistenceUpdater::ALLOCATOR &allocator, size_t size, bool &exists)
std::pair< size_t, char * > BinSeq
int ssize_t
void cleanup(PersistenceUpdater::ALLOCATOR *allocator)
ACE_STATIC_SVC_DEFINE(ACE_Logging_Strategy, ACE_TEXT("Logging_Strategy"), ACE_Service_Type::SERVICE_OBJECT, &ACE_SVC_NAME(ACE_Logging_Strategy), ACE_Service_Type::DELETE_THIS|ACE_Service_Type::DELETE_OBJ, 0) extern "C" int _get_dll_unload_policy()
ACE_SVC_OBJ_T
ACE_Allocator_Adapter< ACE_Malloc< ACE_MMAP_MEMORY_POOL, TAO_SYNCH_MUTEX > > ALLOCATOR
virtual void * malloc(size_t nbytes)
virtual void create(const UTopic &topic)
Add topic to be persisted.
TopicStrt(const DTopic &topic, PersistenceUpdater::ALLOCATOR *allocator)
size_t strlen(const char *s)
virtual int find(const char *name, void *&pointer)
void cleanup(PersistenceUpdater::ALLOCATOR *allocator)
virtual int svc()
ACE_Task_Base start method.
long checksum() const
Calculate the CRC32 checksum.
#define OPENDDS_STRING
ACE_Hash_Map_With_Allocator< IdType_ExtId, Participant * > ParticipantIndex
bool operator==(const IdType_ExtId &ext) const
DOMAINID_TYPE_NATIVE DomainId_t
LM_DEBUG
void add(TAO_DDS_DCPSInfo_i *info)
std::pair< SpecificQos, BinSeq > QosSeq
static int consolidate(ACE_Message_Block *dst, const ACE_Message_Block *src)
char ACE_TCHAR
#define ACE_ALLOCATOR_RETURN(POINTER, ALLOCATOR, RET_VAL)
void assign(ACE_CString &to, const char *from, Update::PersistenceUpdater::ALLOCATOR *allocator)
void set(const char *s, bool release=true)
ParticipantIndex * participant_index_
Persisted Participants.
virtual void updateLastPartId(PartIdType partId)
Update Last Participant Id for repo.
PartIdType lastPartId
What the last participant id is/was.
ParticipantSeq participants
virtual void update(const IdPath &id, const DDS::DomainParticipantQos &qos)
Persist updated Qos parameters for a Participant.
ACE_UINT32 ULong
DomainIdType domainId
ACE_Hash_Map_With_Allocator< IdType_ExtId, RWActor * > ActorIndex
#define ACE_ALLOCATOR(POINTER, ALLOCATOR)
ACE_TEXT("TCP_Factory")
int atoi(const char *s)
virtual int bind(const char *name, void *pointer, int duplicates=0)
ActorIndex * actor_index_
Persisted Readers and Writers.
OpenDDS::DCPS::unique_ptr< ALLOCATOR > allocator_
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
int strcasecmp(const char *s, const char *t)
const char * fast_rep(void) const
const ACE_Message_Block * begin(void) const
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
void storeUpdate(const ACE_Message_Block &data, BinSeq &storage)
virtual void free(void *ptr)
#define ACE_NEW_RETURN(POINTER, CONSTRUCTOR, RET_VAL)
virtual void destroy(const IdPath &id, ItemType type, ActorType actor)
Remove an entity (but not children) from persistence.
#define ACE_DEFAULT_BASE_ADDR
void pushImage(const DImage &image)
Downstream request to push image.
size_type length(void) const
virtual int init(int argc, ACE_TCHAR *argv[])
Service object initialization.
DomainIdType domainId
LM_ERROR
char * base(void) const
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
int parse(int argc, ACE_TCHAR *argv[])
sequence< string > StringSeq
Definition: DdsDcpsCore.idl:50