OpenDDS  Snapshot(2023/04/28-20:55)
Classes | Public Types | Public Member Functions | Private Member Functions | Private Attributes | List of all members
Update::PersistenceUpdater Class Reference

#include <PersistenceUpdater.h>

Inheritance diagram for Update::PersistenceUpdater:
Inheritance graph
[legend]
Collaboration diagram for Update::PersistenceUpdater:
Collaboration graph
[legend]

Classes

class  IdType_ExtId
 

Public Types

typedef ACE_Allocator_Adapter< ACE_Malloc< ACE_MMAP_MEMORY_POOL, TAO_SYNCH_MUTEX > > ALLOCATOR
 
typedef struct TopicStrt< QosSeq, ACE_CStringTopic
 Persisted topic data structure. More...
 
typedef struct ParticipantStrt< QosSeqParticipant
 Persisted participant data structure. More...
 
typedef struct ActorStrt< QosSeq, QosSeq, ACE_CString, BinSeq, ContentSubscriptionBin, BinSeqRWActor
 Persisted actor data structure. More...
 
typedef ACE_Hash_Map_With_Allocator< IdType_ExtId, Topic * > TopicIndex
 
typedef ACE_Hash_Map_With_Allocator< IdType_ExtId, Participant * > ParticipantIndex
 
typedef ACE_Hash_Map_With_Allocator< IdType_ExtId, RWActor * > ActorIndex
 
- Public Types inherited from ACE_Event_Handler
typedef long Reference_Count
 

Public Member Functions

 PersistenceUpdater ()
 
virtual ~PersistenceUpdater ()
 
virtual int init (int argc, ACE_TCHAR *argv[])
 Service object initialization. More...
 
virtual int fini ()
 ACE_Task_Base finish method. More...
 
virtual int svc ()
 ACE_Task_Base start method. More...
 
virtual void requestImage ()
 
virtual void create (const UTopic &topic)
 Add topic to be persisted. More...
 
virtual void create (const UParticipant &participant)
 Add participant to be persisted. More...
 
virtual void create (const URActor &actor)
 Add DataReader to be persisted. More...
 
virtual void create (const UWActor &actor)
 Add DataWriter to be persisted. More...
 
virtual void create (const OwnershipData &data)
 Add ownership data to be persisted. More...
 
virtual void update (const IdPath &id, const DDS::DomainParticipantQos &qos)
 Persist updated Qos parameters for a Participant. More...
 
virtual void update (const IdPath &id, const DDS::TopicQos &qos)
 Persist updated Qos parameters for a Topic. More...
 
virtual void update (const IdPath &id, const DDS::DataWriterQos &qos)
 Persist updated Qos parameters for a DataWriter. More...
 
virtual void update (const IdPath &id, const DDS::PublisherQos &qos)
 Persist updated Qos parameters for a Publisher. More...
 
virtual void update (const IdPath &id, const DDS::DataReaderQos &qos)
 Persist updated Qos parameters for a DataReader. More...
 
virtual void update (const IdPath &id, const DDS::SubscriberQos &qos)
 Persist updated Qos parameters for a Subscriber. More...
 
virtual void update (const IdPath &id, const DDS::StringSeq &exprParams)
 Persist updated subscription exprParams. More...
 
virtual void destroy (const IdPath &id, ItemType type, ActorType actor)
 Remove an entity (but not children) from persistence. More...
 
virtual void updateLastPartId (PartIdType partId)
 Update Last Participant Id for repo. More...
 
- Public Member Functions inherited from Update::Updater
virtual ~Updater ()
 
- Public Member Functions inherited from ACE_Task_Base
 ACE_Task_Base (ACE_Thread_Manager *=0)
 
virtual ~ACE_Task_Base (void)
 
virtual int open (void *args=0)
 
virtual int close (u_long flags=0)
 
virtual int module_closed (void)
 
virtual int put (ACE_Message_Block *, ACE_Time_Value *=0)
 
virtual int activate (long flags=THR_NEW_LWP|THR_JOINABLE|THR_INHERIT_SCHED, int n_threads=1, int force_active=0, long priority=ACE_DEFAULT_THREAD_PRIORITY, int grp_id=-1, ACE_Task_Base *task=0, ACE_hthread_t thread_handles[]=0, void *stack[]=0, size_t stack_size[]=0, ACE_thread_t thread_ids[]=0, const char *thr_name[]=0)
 
virtual int wait (void)
 
virtual int suspend (void)
 
virtual int resume (void)
 
int grp_id (void) const
 
void grp_id (int)
 
ACE_Thread_Managerthr_mgr (void) const
 
void thr_mgr (ACE_Thread_Manager *)
 
int is_reader (void) const
 
int is_writer (void) const
 
size_t thr_count (void) const
 
ACE_thread_t last_thread (void) const
 
- Public Member Functions inherited from ACE_Service_Object
 ACE_Service_Object (ACE_Reactor *=0)
 
virtual ~ACE_Service_Object (void)
 
- Public Member Functions inherited from ACE_Event_Handler
virtual ~ACE_Event_Handler (void)
 
virtual ACE_HANDLE get_handle (void) const
 
virtual void set_handle (ACE_HANDLE)
 
virtual int priority (void) const
 
virtual void priority (int priority)
 
virtual int handle_input (ACE_HANDLE fd=ACE_INVALID_HANDLE)
 
virtual int handle_output (ACE_HANDLE fd=ACE_INVALID_HANDLE)
 
virtual int handle_exception (ACE_HANDLE fd=ACE_INVALID_HANDLE)
 
virtual int handle_timeout (const ACE_Time_Value &current_time, const void *act=0)
 
virtual int handle_exit (ACE_Process *)
 
virtual int handle_close (ACE_HANDLE handle, ACE_Reactor_Mask close_mask)
 
virtual int handle_signal (int signum, siginfo_t *=0, ucontext_t *=0)
 
virtual int resume_handler (void)
 
virtual int handle_qos (ACE_HANDLE=ACE_INVALID_HANDLE)
 
virtual int handle_group_qos (ACE_HANDLE=ACE_INVALID_HANDLE)
 
virtual void reactor (ACE_Reactor *reactor)
 
virtual ACE_Reactorreactor (void) const
 
virtual ACE_Reactor_Timer_Interfacereactor_timer_interface (void) const
 
virtual Reference_Count add_reference (void)
 
virtual Reference_Count remove_reference (void)
 
Reference_Counting_Policyreference_counting_policy (void)
 
- Public Member Functions inherited from ACE_Shared_Object
 ACE_Shared_Object (void)
 
virtual ~ACE_Shared_Object (void)
 
virtual int info (ACE_TCHAR **info_string, size_t length=0) const
 

Private Member Functions

int parse (int argc, ACE_TCHAR *argv[])
 
void storeUpdate (const ACE_Message_Block &data, BinSeq &storage)
 

Private Attributes

ACE_TString persistence_file_
 
bool reset_
 
Managerum_
 
OpenDDS::DCPS::unique_ptr< ALLOCATORallocator_
 
TopicIndextopic_index_
 Persisted Topics. More...
 
ParticipantIndexparticipant_index_
 Persisted Participants. More...
 
ActorIndexactor_index_
 Persisted Readers and Writers. More...
 
PartIdTypelast_part_id_
 What the last participant id is/was. More...
 

Additional Inherited Members

- Static Public Member Functions inherited from ACE_Task_Base
static ACE_THR_FUNC_RETURN svc_run (void *)
 
static void cleanup (void *object, void *params)
 
- Static Public Member Functions inherited from ACE_Event_Handler
static ACE_THR_FUNC_RETURN read_adapter (void *event_handler)
 
static int register_stdin_handler (ACE_Event_Handler *eh, ACE_Reactor *reactor, ACE_Thread_Manager *thr_mgr, int flags=THR_DETACHED)
 
static int remove_stdin_handler (ACE_Reactor *reactor, ACE_Thread_Manager *thr_mgr)
 
- Public Attributes inherited from ACE_Service_Object
 ACE_ALLOC_HOOK_DECLARE
 
- Public Attributes inherited from ACE_Event_Handler
 LO_PRIORITY
 
 HI_PRIORITY
 
 NULL_MASK
 
 READ_MASK
 
 WRITE_MASK
 
 EXCEPT_MASK
 
 ACCEPT_MASK
 
 CONNECT_MASK
 
 TIMER_MASK
 
 QOS_MASK
 
 GROUP_QOS_MASK
 
 SIGNAL_MASK
 
 ALL_EVENTS_MASK
 
 RWE_MASK
 
 DONT_CALL
 
 ACE_EVENT_HANDLER_NOT_RESUMED
 
 ACE_REACTOR_RESUMES_HANDLER
 
 ACE_APPLICATION_RESUMES_HANDLER
 
- Protected Types inherited from ACE_Event_Handler
typedef ACE_Atomic_Op< ACE_SYNCH_MUTEX, Reference_CountAtomic_Reference_Count
 
- Protected Member Functions inherited from ACE_Event_Handler
 ACE_Event_Handler (ACE_Reactor *=0, int priority=ACE_Event_Handler::LO_PRIORITY)
 
- Protected Attributes inherited from ACE_Task_Base
size_t thr_count_
 
ACE_Thread_Managerthr_mgr_
 
u_long flags_
 
int grp_id_
 
ACE_thread_t last_thread_id_
 
- Protected Attributes inherited from ACE_Event_Handler
Atomic_Reference_Count reference_count_
 

Detailed Description

Definition at line 33 of file PersistenceUpdater.h.

Member Typedef Documentation

◆ ActorIndex

Definition at line 70 of file PersistenceUpdater.h.

◆ ALLOCATOR

Definition at line 55 of file PersistenceUpdater.h.

◆ Participant

Persisted participant data structure.

Definition at line 61 of file PersistenceUpdater.h.

◆ ParticipantIndex

Definition at line 69 of file PersistenceUpdater.h.

◆ RWActor

Persisted actor data structure.

Definition at line 64 of file PersistenceUpdater.h.

◆ Topic

Persisted topic data structure.

Definition at line 58 of file PersistenceUpdater.h.

◆ TopicIndex

Definition at line 68 of file PersistenceUpdater.h.

Constructor & Destructor Documentation

◆ PersistenceUpdater()

Update::PersistenceUpdater::PersistenceUpdater ( )

Definition at line 217 of file PersistenceUpdater.cpp.

References ACE_TEXT().

218  : persistence_file_(ACE_TEXT("InforepoPersist"))
219  , reset_(false)
220  , um_(0)
221  , topic_index_(0)
222  , participant_index_(0)
223  , actor_index_(0)
224  , last_part_id_(0)
225 {}
TopicIndex * topic_index_
Persisted Topics.
PartIdType * last_part_id_
What the last participant id is/was.
ParticipantIndex * participant_index_
Persisted Participants.
ACE_TEXT("TCP_Factory")
ActorIndex * actor_index_
Persisted Readers and Writers.

◆ ~PersistenceUpdater()

Update::PersistenceUpdater::~PersistenceUpdater ( )
virtual

Definition at line 227 of file PersistenceUpdater.cpp.

228 {}

Member Function Documentation

◆ create() [1/5]

void Update::PersistenceUpdater::create ( const UTopic topic)
virtual

Add topic to be persisted.

Implements Update::Updater.

Definition at line 478 of file PersistenceUpdater.cpp.

References ACE_ALLOCATOR, allocator_, ACE_Message_Block::base(), ACE_OutputCDR::begin(), ACE_Hash_Map_With_Allocator< class, class >::bind(), ACE_CDR::consolidate(), Update::TopicStrt< Q, S >::dataType, Update::TopicStrt< Q, S >::domainId, ACE_Message_Block::length(), Update::TopicStrt< Q, S >::name, Update::TopicStrt< Q, S >::participantId, topic_index_, Update::TopicStrt< Q, S >::topicId, Update::TopicQos, and Update::TopicStrt< Q, S >::topicQos.

Referenced by create().

479 {
480  // serialize the Topic QOS
481  TAO_OutputCDR outCdr;
482  outCdr << topic.topicQos;
483  ACE_Message_Block dst;
484  ACE_CDR::consolidate(&dst, outCdr.begin());
485 
486  const BinSeq qos_bin(dst.length(), dst.base());
487 
488  const QosSeq p(TopicQos, qos_bin);
489  const DTopic topic_data(topic.domainId, topic.topicId, topic.participantId
490  , topic.name.c_str(), topic.dataType.c_str(), p);
491 
492  // allocate memory for TopicData
493  void* buffer;
494  ACE_ALLOCATOR(buffer, allocator_->malloc
495  (sizeof(PersistenceUpdater::Topic)));
496 
497  // Initialize TopicData
498  PersistenceUpdater::Topic* persistent_data
499  = new(buffer) PersistenceUpdater::Topic(topic_data, allocator_.get());
500 
501  IdType_ExtId ext(topic_data.topicId);
502 
503  // bind TopicData with the topicId
504  if (topic_index_->bind(ext, persistent_data, allocator_.get()) != 0) {
505  allocator_->free((void *) buffer);
506  return;
507  }
508 }
struct TopicStrt< QosSeq, std::string > DTopic
size_t length(void) const
TopicIndex * topic_index_
Persisted Topics.
int bind(const EXT_ID &, const INT_ID &, ACE_Allocator *alloc)
std::pair< size_t, char * > BinSeq
std::pair< SpecificQos, BinSeq > QosSeq
static int consolidate(ACE_Message_Block *dst, const ACE_Message_Block *src)
#define ACE_ALLOCATOR(POINTER, ALLOCATOR)
OpenDDS::DCPS::unique_ptr< ALLOCATOR > allocator_
const ACE_Message_Block * begin(void) const
char * base(void) const
struct TopicStrt< QosSeq, ACE_CString > Topic
Persisted topic data structure.

◆ create() [2/5]

void Update::PersistenceUpdater::create ( const UParticipant participant)
virtual

Add participant to be persisted.

Implements Update::Updater.

Definition at line 511 of file PersistenceUpdater.cpp.

References ACE_ALLOCATOR, allocator_, ACE_Message_Block::base(), ACE_OutputCDR::begin(), ACE_Hash_Map_With_Allocator< class, class >::bind(), ACE_CDR::consolidate(), Update::ParticipantStrt< Q >::domainId, ACE_Message_Block::length(), Update::ParticipantStrt< Q >::owner, participant_index_, Update::ParticipantStrt< Q >::participantId, Update::ParticipantStrt< QosSeq >::participantId, Update::ParticipantQos, and Update::ParticipantStrt< Q >::participantQos.

512 {
513  // serialize the Topic QOS
514  TAO_OutputCDR outCdr;
515  outCdr << participant.participantQos;
516  ACE_Message_Block dst;
517  ACE_CDR::consolidate(&dst, outCdr.begin());
518 
519  const BinSeq qos_bin(dst.length(), dst.base());
520 
521  QosSeq p(ParticipantQos, qos_bin);
522  DParticipant participant_data
523  (participant.domainId, participant.owner, participant.participantId, p);
524 
525  // allocate memory for ParticipantData
526  void* buffer;
527  ACE_ALLOCATOR(buffer, allocator_->malloc
529 
530  // Initialize ParticipantData
531  PersistenceUpdater::Participant* persistent_data
532  = new(buffer) PersistenceUpdater::Participant(participant_data
533  , allocator_.get());
534 
535  IdType_ExtId ext(participant_data.participantId);
536 
537  // bind ParticipantData with the participantId
538  if (participant_index_->bind(ext, persistent_data, allocator_.get()) != 0) {
539  allocator_->free((void *) buffer);
540  return;
541  }
542 }
size_t length(void) const
struct ParticipantStrt< QosSeq > DParticipant
int bind(const EXT_ID &, const INT_ID &, ACE_Allocator *alloc)
std::pair< size_t, char * > BinSeq
std::pair< SpecificQos, BinSeq > QosSeq
static int consolidate(ACE_Message_Block *dst, const ACE_Message_Block *src)
ParticipantIndex * participant_index_
Persisted Participants.
struct ParticipantStrt< QosSeq > Participant
Persisted participant data structure.
#define ACE_ALLOCATOR(POINTER, ALLOCATOR)
OpenDDS::DCPS::unique_ptr< ALLOCATOR > allocator_
const ACE_Message_Block * begin(void) const
char * base(void) const

◆ create() [3/5]

void Update::PersistenceUpdater::create ( const URActor actor)
virtual

Add DataReader to be persisted.

Implements Update::Updater.

Definition at line 545 of file PersistenceUpdater.cpp.

References ACE_ALLOCATOR, actor_index_, Update::ActorStrt< PSQ, RWQ, C, T, CSP, STI >::actorId, allocator_, ACE_Message_Block::base(), ACE_OutputCDR::begin(), ACE_Hash_Map_With_Allocator< class, class >::bind(), Update::ActorStrt< PSQ, RWQ, C, T, CSP, STI >::callback, ACE_CDR::consolidate(), Update::ActorStrt< PSQ, RWQ, C, T, CSP, STI >::contentSubscriptionProfile, create(), Update::DataReader, Update::DataReaderQos, Update::DataWriter, Update::DataWriterQos, Update::ActorStrt< PSQ, RWQ, C, T, CSP, STI >::domainId, Update::ActorStrt< PSQ, RWQ, C, T, CSP, STI >::drdwQos, Update::ContentSubscriptionBin::filterClassName, ACE_Message_Block::length(), Update::ActorStrt< PSQ, RWQ, C, T, CSP, STI >::participantId, Update::PublisherQos, Update::ActorStrt< PSQ, RWQ, C, T, CSP, STI >::pubsubQos, ACE_OutputCDR::reset(), Update::ActorStrt< PSQ, RWQ, C, T, CSP, STI >::serializedTypeInfo, Update::SubscriberQos, Update::ActorStrt< PSQ, RWQ, C, T, CSP, STI >::topicId, Update::ActorStrt< PSQ, RWQ, C, T, CSP, STI >::transportContext, and Update::ActorStrt< PSQ, RWQ, C, T, CSP, STI >::transportInterfaceInfo.

546 {
547  TAO_OutputCDR outCdr;
548  outCdr << actor.pubsubQos;
549  ACE_Message_Block dst;
550  ACE_CDR::consolidate(&dst, outCdr.begin());
551 
552  const BinSeq pubsub_qos_bin(dst.length(), dst.base());
553  const QosSeq pubsub_qos(SubscriberQos, pubsub_qos_bin);
554 
555  outCdr.reset();
556  outCdr << actor.drdwQos;
557  ACE_Message_Block dst2;
558  ACE_CDR::consolidate(&dst2, outCdr.begin());
559 
560  const BinSeq dwdr_qos_bin(dst2.length(), dst2.base());
561  const QosSeq dwdr_qos(DataReaderQos, dwdr_qos_bin);
562 
563  outCdr.reset();
564  outCdr << actor.transportInterfaceInfo;
565  ACE_Message_Block dst3;
566  ACE_CDR::consolidate(&dst3, outCdr.begin());
567 
568  const BinSeq tr_bin(dst3.length(), dst3.base());
569 
570  outCdr.reset();
571  outCdr << actor.contentSubscriptionProfile.exprParams;
572  ACE_Message_Block dst4;
573  ACE_CDR::consolidate(&dst4, outCdr.begin());
574 
575  ContentSubscriptionBin csp_bin;
576  csp_bin.filterClassName = actor.contentSubscriptionProfile.filterClassName;
577  csp_bin.filterExpr = actor.contentSubscriptionProfile.filterExpr;
578  csp_bin.exprParams = std::make_pair(dst4.length(), dst4.base());
579 
580  outCdr.reset();
581  outCdr << actor.serializedTypeInfo;
582  ACE_Message_Block dst5;
583  ACE_CDR::consolidate(&dst5, outCdr.begin());
584 
585  const BinSeq ti_seq(dst5.length(), dst5.base());
586 
587 
588  const DActor actor_data(actor.domainId, actor.actorId, actor.topicId
589  , actor.participantId
590  , DataReader, actor.callback.c_str(), pubsub_qos
591  , dwdr_qos, tr_bin, actor.transportContext, csp_bin
592  , ti_seq);
593 
594  // allocate memory for ActorData
595  void* buffer;
596  ACE_ALLOCATOR(buffer, allocator_->malloc
597  (sizeof(PersistenceUpdater::RWActor)));
598 
599  // Initialize ActorData
600  PersistenceUpdater::RWActor* persistent_data =
601  new(buffer) PersistenceUpdater::RWActor(actor_data
602  , allocator_.get());
603 
604  IdType_ExtId ext(actor.actorId);
605 
606  // bind ActorData with the actorId
607  if (actor_index_->bind(ext, persistent_data, allocator_.get()) != 0) {
608  allocator_->free((void *) buffer);
609  return;
610  }
611 }
void reset(void)
size_t length(void) const
int bind(const EXT_ID &, const INT_ID &, ACE_Allocator *alloc)
std::pair< size_t, char * > BinSeq
struct ActorStrt< QosSeq, QosSeq, std::string, BinSeq, ContentSubscriptionBin, BinSeq > DActor
std::pair< SpecificQos, BinSeq > QosSeq
static int consolidate(ACE_Message_Block *dst, const ACE_Message_Block *src)
struct ActorStrt< QosSeq, QosSeq, ACE_CString, BinSeq, ContentSubscriptionBin, BinSeq > RWActor
Persisted actor data structure.
#define ACE_ALLOCATOR(POINTER, ALLOCATOR)
ActorIndex * actor_index_
Persisted Readers and Writers.
OpenDDS::DCPS::unique_ptr< ALLOCATOR > allocator_
const ACE_Message_Block * begin(void) const
char * base(void) const

◆ create() [4/5]

virtual void Update::PersistenceUpdater::create ( const UWActor actor)
virtual

Add DataWriter to be persisted.

Implements Update::Updater.

◆ create() [5/5]

void Update::PersistenceUpdater::create ( const OwnershipData data)
virtual

Add ownership data to be persisted.

Implements Update::Updater.

Definition at line 672 of file PersistenceUpdater.cpp.

673 {
674  /* This method intentionally left unimplemented. */
675 }

◆ destroy()

void Update::PersistenceUpdater::destroy ( const IdPath id,
ItemType  type,
ActorType  actor 
)
virtual

Remove an entity (but not children) from persistence.

Implements Update::Updater.

Definition at line 839 of file PersistenceUpdater.cpp.

References ACE_ERROR, ACE_TEXT(), Update::Actor, actor_index_, allocator_, Update::TopicStrt< QosSeq, ACE_CString >::cleanup(), Update::ParticipantStrt< QosSeq >::cleanup(), Update::ActorStrt< QosSeq, QosSeq, ACE_CString, BinSeq, ContentSubscriptionBin, BinSeq >::cleanup(), LM_ERROR, Update::Participant, participant_index_, Update::Topic, topic_index_, and ACE_Hash_Map_With_Allocator< class, class >::unbind().

840 {
841  IdType_ExtId ext(id.id);
842  PersistenceUpdater::Topic* topic = 0;
843  PersistenceUpdater::Participant* participant = 0;
844  PersistenceUpdater::RWActor* actor = 0;
845 
846  switch (type) {
847  case Update::Topic:
848 
849  if (topic_index_->unbind(ext, topic, allocator_.get()) == 0) {
850  topic->cleanup(allocator_.get());
851  allocator_->free((void *) topic);
852  }
853 
854  break;
855  case Update::Participant:
856 
857  if (participant_index_->unbind(ext, participant, allocator_.get()) == 0) {
858  participant->cleanup(allocator_.get());
859  allocator_->free((void *) participant);
860  }
861 
862  break;
863  case Update::Actor:
864 
865  if (actor_index_->unbind(ext, actor, allocator_.get()) == 0) {
866  actor->cleanup(allocator_.get());
867  allocator_->free((void *) actor);
868  }
869 
870  break;
871  default: {
872  OpenDDS::DCPS::RepoIdConverter converter(id.id);
873  ACE_ERROR((LM_ERROR,
874  ACE_TEXT("(%P|%t) PersistenceUpdater::destroy: ")
875  ACE_TEXT("unknown entity - %C.\n"),
876  std::string(converter).c_str()));
877  }
878  }
879 }
#define ACE_ERROR(X)
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
int unbind(const EXT_ID &, INT_ID &, ACE_Allocator *alloc)
TopicIndex * topic_index_
Persisted Topics.
struct ActorStrt< QosSeq, QosSeq, ACE_CString, BinSeq, ContentSubscriptionBin, BinSeq > RWActor
Persisted actor data structure.
ParticipantIndex * participant_index_
Persisted Participants.
struct ParticipantStrt< QosSeq > Participant
Persisted participant data structure.
ACE_TEXT("TCP_Factory")
ActorIndex * actor_index_
Persisted Readers and Writers.
OpenDDS::DCPS::unique_ptr< ALLOCATOR > allocator_
struct TopicStrt< QosSeq, ACE_CString > Topic
Persisted topic data structure.

◆ fini()

int Update::PersistenceUpdater::fini ( void  )
virtual

ACE_Task_Base finish method.

Reimplemented from ACE_Shared_Object.

Definition at line 393 of file PersistenceUpdater.cpp.

394 {
395  return 0;
396 }

◆ init()

int Update::PersistenceUpdater::init ( int  argc,
ACE_TCHAR argv[] 
)
virtual

Service object initialization.

Reimplemented from ACE_Shared_Object.

Definition at line 280 of file PersistenceUpdater.cpp.

References ACE_DEFAULT_BASE_ADDR, ACE_ERROR, ACE_NEW_RETURN, ACE_TEXT(), actor_index_, Update::Manager::add(), allocator_, ACE_String_Base< char >::c_str(), Update::createIndex(), Update::index_cleanup(), ACE_Dynamic_Service< class >::instance(), last_part_id_, LM_ERROR, parse(), participant_index_, persistence_file_, reset_, topic_index_, and um_.

281 {
282  // discover the UpdateManager
284 
285  if (um_ == 0) {
286  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: PersistenceUpdater::init ")
287  ACE_TEXT("No UpdateManager discovered.\n")));
288  return -1;
289  }
290 
291  this->parse(argc, argv);
292 
293 #if defined ACE_HAS_MAC_OSX && defined __x86_64__ && __x86_64__
294  ACE_MMAP_Memory_Pool::OPTIONS options((void*)0x200000000);
295 #else
296  ACE_MMAP_Memory_Pool::OPTIONS options(ACE_DEFAULT_BASE_ADDR);
297 #endif
298 
299  // Create the allocator with the appropriate options. The name used
300  // for the lock is the same as one used for the file.
301  ALLOCATOR* allocator;
302  ACE_NEW_RETURN(allocator,
305  &options),
306  -1);
307  allocator_.reset(allocator);
308 
309  bool exists = false;
310 
311  char* topic_index = (char*)createIndex("TopicIndex", *allocator_
312  , sizeof(TopicIndex), exists);
313  if (!topic_index) {
314  return -1;
315  }
316 
317  char* participant_index = (char*)createIndex("ParticipantIndex", *allocator_
318  , sizeof(ParticipantIndex), exists);
319  if (!participant_index) {
320  return -1;
321  }
322 
323  char* actor_index = (char*)createIndex("ActorIndex", *allocator_
324  , sizeof(ActorIndex), exists);
325  if (!actor_index) {
326  return -1;
327  }
328 
329  void* last_part_id = createIndex(
330  "LastParticipantId", *allocator_, sizeof(PartIdType), exists);
331  if (!last_part_id) {
332  return -1;
333  }
334  last_part_id_ = reinterpret_cast<PartIdType*>(last_part_id);
335 
336  if (exists) {
337  topic_index_ = reinterpret_cast<TopicIndex*>(topic_index);
338  participant_index_ = reinterpret_cast<ParticipantIndex*>(participant_index);
339  actor_index_ = reinterpret_cast<ActorIndex*>(actor_index);
340  } else {
341  topic_index_ = new(topic_index) TopicIndex(allocator_.get());
342  participant_index_ = new(participant_index) ParticipantIndex(allocator_.get());
343  actor_index_ = new(actor_index) ActorIndex(allocator_.get());
344  *last_part_id_ = 0;
345  }
346 
347  if (reset_) {
351  *last_part_id_ = 0;
352  }
353 
354  // lastly register the callback
355  um_->add(this);
356 
357  return 0;
358 }
#define ACE_ERROR(X)
const char * c_str(void) const
TopicIndex * topic_index_
Persisted Topics.
void index_cleanup(I *index, PersistenceUpdater::ALLOCATOR *allocator)
PartIdType * last_part_id_
What the last participant id is/was.
long PartIdType
ACE_Hash_Map_With_Allocator< IdType_ExtId, Topic * > TopicIndex
static TYPE * instance(const ACE_TCHAR *name)
void * createIndex(const std::string &tag, PersistenceUpdater::ALLOCATOR &allocator, size_t size, bool &exists)
ACE_Allocator_Adapter< ACE_Malloc< ACE_MMAP_MEMORY_POOL, TAO_SYNCH_MUTEX > > ALLOCATOR
ACE_Hash_Map_With_Allocator< IdType_ExtId, Participant * > ParticipantIndex
void add(TAO_DDS_DCPSInfo_i *info)
ParticipantIndex * participant_index_
Persisted Participants.
ACE_Hash_Map_With_Allocator< IdType_ExtId, RWActor * > ActorIndex
ACE_TEXT("TCP_Factory")
ActorIndex * actor_index_
Persisted Readers and Writers.
OpenDDS::DCPS::unique_ptr< ALLOCATOR > allocator_
#define ACE_NEW_RETURN(POINTER, CONSTRUCTOR, RET_VAL)
int parse(int argc, ACE_TCHAR *argv[])

◆ parse()

int Update::PersistenceUpdater::parse ( int  argc,
ACE_TCHAR argv[] 
)
private

Definition at line 361 of file PersistenceUpdater.cpp.

References ACE_DEBUG, ACE_TEXT(), ACE_OS::atoi(), LM_DEBUG, persistence_file_, reset_, and ACE_OS::strcasecmp().

Referenced by init().

362 {
363  for (ssize_t count = 0; count < argc; count++) {
364  if (ACE_OS::strcasecmp(argv[count], ACE_TEXT("-file")) == 0) {
365  if ((count + 1) < argc) {
366  persistence_file_ = argv[count+1];
367  count++;
368  }
369 
370  } else if (ACE_OS::strcasecmp(argv[count], ACE_TEXT("-reset")) == 0) {
371  if ((count + 1) < argc) {
372  int val = ACE_OS::atoi(argv[count+1]);
373  reset_ = true;
374 
375  if (val == 0) {
376  reset_ = false;
377  }
378 
379  count++;
380  }
381 
382  } else {
383  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) PersistenceUpdater::parse: Unknown option %s\n")
384  , argv[count]));
385  return -1;
386  }
387  }
388 
389  return 0;
390 }
#define ACE_DEBUG(X)
int ssize_t
ACE_TEXT("TCP_Factory")
int atoi(const char *s)
int strcasecmp(const char *s, const char *t)

◆ requestImage()

void Update::PersistenceUpdater::requestImage ( )
virtual

Request an image refresh to be sent upstream. This is currently done synchronously. TBD: Move to an asynchronous model

Implements Update::Updater.

Definition at line 405 of file PersistenceUpdater.cpp.

References ACE_DEBUG, actor_index_, Update::ActorStrt< QosSeq, QosSeq, ACE_CString, BinSeq, ContentSubscriptionBin, BinSeq >::actorId, Update::ImageData< T, P, A, W >::actors, ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, ACE_Hash< EXT_ID >, ACE_Equal_To< EXT_ID >, ACE_Null_Mutex >::begin(), ACE_String_Base< char >::c_str(), Update::ActorStrt< QosSeq, QosSeq, ACE_CString, BinSeq, ContentSubscriptionBin, BinSeq >::callback, Update::ActorStrt< QosSeq, QosSeq, ACE_CString, BinSeq, ContentSubscriptionBin, BinSeq >::contentSubscriptionProfile, Update::DataReader, Update::TopicStrt< QosSeq, ACE_CString >::dataType, OpenDDS::DCPS::DCPS_debug_level, Update::TopicStrt< QosSeq, ACE_CString >::domainId, Update::ParticipantStrt< QosSeq >::domainId, Update::ActorStrt< QosSeq, QosSeq, ACE_CString, BinSeq, ContentSubscriptionBin, BinSeq >::domainId, Update::ActorStrt< QosSeq, QosSeq, ACE_CString, BinSeq, ContentSubscriptionBin, BinSeq >::drdwQos, ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, ACE_Hash< EXT_ID >, ACE_Equal_To< EXT_ID >, ACE_Null_Mutex >::end(), Update::ContentSubscriptionBin::exprParams, Update::ContentSubscriptionBin::filterClassName, Update::ContentSubscriptionBin::filterExpr, last_part_id_, Update::ImageData< T, P, A, W >::lastPartId, LM_DEBUG, Update::TopicStrt< QosSeq, ACE_CString >::name, OPENDDS_STRING, Update::ParticipantStrt< QosSeq >::owner, participant_index_, Update::TopicStrt< QosSeq, ACE_CString >::participantId, Update::ParticipantStrt< QosSeq >::participantId, Update::ActorStrt< QosSeq, QosSeq, ACE_CString, BinSeq, ContentSubscriptionBin, BinSeq >::participantId, Update::ParticipantQos, Update::ParticipantStrt< QosSeq >::participantQos, Update::ImageData< T, P, A, W >::participants, Update::ActorStrt< QosSeq, QosSeq, ACE_CString, BinSeq, ContentSubscriptionBin, BinSeq >::pubsubQos, Update::Manager::pushImage(), Update::ActorStrt< QosSeq, QosSeq, ACE_CString, BinSeq, ContentSubscriptionBin, BinSeq >::serializedTypeInfo, topic_index_, Update::TopicStrt< QosSeq, ACE_CString >::topicId, Update::ActorStrt< QosSeq, QosSeq, ACE_CString, BinSeq, ContentSubscriptionBin, BinSeq >::topicId, Update::TopicQos, Update::TopicStrt< QosSeq, ACE_CString >::topicQos, Update::ImageData< T, P, A, W >::topics, Update::ActorStrt< QosSeq, QosSeq, ACE_CString, BinSeq, ContentSubscriptionBin, BinSeq >::transportContext, Update::ActorStrt< QosSeq, QosSeq, ACE_CString, BinSeq, ContentSubscriptionBin, BinSeq >::transportInterfaceInfo, Update::ActorStrt< QosSeq, QosSeq, ACE_CString, BinSeq, ContentSubscriptionBin, BinSeq >::type, and um_.

406 {
407  if (um_ == NULL) {
408  return;
409  }
410 
411  DImage image;
412 
414  iter != participant_index_->end(); iter++) {
415  const PersistenceUpdater::Participant* participant = iter->int_id_;
416 
417  const BinSeq& in_seq = participant->participantQos.second;
418  const QosSeq qos(ParticipantQos, in_seq);
419  const DParticipant dparticipant(participant->domainId
420  , participant->owner
421  , participant->participantId
422  , qos);
423  image.participants.push_back(dparticipant);
425  OpenDDS::DCPS::RepoIdConverter conv(participant->participantId);
426  ACE_DEBUG((LM_DEBUG,
427  "(%P|%t) PersistenceUpdater::requestImage(): loaded participant %C\n",
428  OPENDDS_STRING(conv).c_str()));
429  }
430  }
431 
432  for (TopicIndex::ITERATOR iter = topic_index_->begin();
433  iter != topic_index_->end(); iter++) {
434  const PersistenceUpdater::Topic* topic = iter->int_id_;
435 
436  const BinSeq& in_seq = topic->topicQos.second;
437  const QosSeq qos(TopicQos, in_seq);
438  const DTopic dTopic(topic->domainId, topic->topicId
439  , topic->participantId, topic->name.c_str()
440  , topic->dataType.c_str(), qos);
441  image.topics.push_back(dTopic);
442  }
443 
444  for (ActorIndex::ITERATOR iter = actor_index_->begin();
445  iter != actor_index_->end(); iter++) {
446  const PersistenceUpdater::RWActor* actor = iter->int_id_;
447 
448  const BinSeq& in_pubsub_seq = actor->pubsubQos.second;
449  const QosSeq pubsub_qos(actor->pubsubQos.first, in_pubsub_seq);
450 
451  const BinSeq& in_drdw_seq = actor->drdwQos.second;
452  const QosSeq drdw_qos(actor->drdwQos.first, in_drdw_seq);
453 
454  const BinSeq& in_transport_seq = actor->transportInterfaceInfo;
455  const BinSeq& in_type_info = actor->serializedTypeInfo;
456 
457  ContentSubscriptionBin in_csp_bin;
458  if (actor->type == DataReader) {
459  in_csp_bin.filterClassName = actor->contentSubscriptionProfile.filterClassName;
460  in_csp_bin.filterExpr = actor->contentSubscriptionProfile.filterExpr;
461  in_csp_bin.exprParams = actor->contentSubscriptionProfile.exprParams;
462  }
463 
464  const DActor dActor(actor->domainId, actor->actorId, actor->topicId
465  , actor->participantId
466  , actor->type, actor->callback.c_str()
467  , pubsub_qos, drdw_qos, in_transport_seq, actor->transportContext, in_csp_bin
468  , in_type_info);
469  image.actors.push_back(dActor);
470  }
471 
472  image.lastPartId = *last_part_id_;
473 
474  um_->pushImage(image);
475 }
#define ACE_DEBUG(X)
struct TopicStrt< QosSeq, std::string > DTopic
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
struct ParticipantStrt< QosSeq > DParticipant
TopicIndex * topic_index_
Persisted Topics.
PartIdType * last_part_id_
What the last participant id is/was.
std::pair< size_t, char * > BinSeq
struct ActorStrt< QosSeq, QosSeq, std::string, BinSeq, ContentSubscriptionBin, BinSeq > DActor
#define OPENDDS_STRING
std::pair< SpecificQos, BinSeq > QosSeq
struct ActorStrt< QosSeq, QosSeq, ACE_CString, BinSeq, ContentSubscriptionBin, BinSeq > RWActor
Persisted actor data structure.
ParticipantIndex * participant_index_
Persisted Participants.
struct ParticipantStrt< QosSeq > Participant
Persisted participant data structure.
ActorIndex * actor_index_
Persisted Readers and Writers.
struct ImageData< DTopic, DParticipant, DActor, DActor > DImage
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_Hash_Map_Iterator_Ex< EXT_ID, INT_ID, ACE_Hash< EXT_ID >, ACE_Equal_To< EXT_ID >, ACE_Null_Mutex > ITERATOR
void pushImage(const DImage &image)
Downstream request to push image.
struct TopicStrt< QosSeq, ACE_CString > Topic
Persisted topic data structure.

◆ storeUpdate()

void Update::PersistenceUpdater::storeUpdate ( const ACE_Message_Block data,
BinSeq storage 
)
private

Definition at line 882 of file PersistenceUpdater.cpp.

References ACE_ALLOCATOR, allocator_, ACE_Message_Block::base(), ACE_Message_Block::length(), and ACE_OS::memcpy().

Referenced by update().

883 {
884  size_t len = data.length();
885 
886  void* buffer;
887  ACE_ALLOCATOR(buffer, this->allocator_->malloc(len));
888  ACE_OS::memcpy(buffer, data.base(), len);
889 
890  storage.first = len;
891  storage.second = static_cast<char*>(buffer);
892 }
size_t length(void) const
void * memcpy(void *t, const void *s, size_t len)
#define ACE_ALLOCATOR(POINTER, ALLOCATOR)
OpenDDS::DCPS::unique_ptr< ALLOCATOR > allocator_
char * base(void) const

◆ svc()

int Update::PersistenceUpdater::svc ( void  )
virtual

ACE_Task_Base start method.

Reimplemented from ACE_Task_Base.

Definition at line 399 of file PersistenceUpdater.cpp.

400 {
401  return 0;
402 }

◆ update() [1/7]

void Update::PersistenceUpdater::update ( const IdPath id,
const DDS::DomainParticipantQos qos 
)
virtual

Persist updated Qos parameters for a Participant.

Implements Update::Updater.

Definition at line 678 of file PersistenceUpdater.cpp.

References ACE_ERROR, ACE_TEXT(), ACE_OutputCDR::begin(), ACE_CDR::consolidate(), ACE_Hash_Map_With_Allocator< class, class >::find(), LM_ERROR, participant_index_, Update::ParticipantStrt< QosSeq >::participantQos, and storeUpdate().

679 {
680  IdType_ExtId ext(id.id);
681  PersistenceUpdater::Participant* part_data = 0;
682 
683  if (this->participant_index_->find(ext, part_data, this->allocator_.get()) == 0) {
684  TAO_OutputCDR outCdr;
685  outCdr << qos;
686  ACE_Message_Block dst;
687  ACE_CDR::consolidate(&dst, outCdr.begin());
688 
689  this->storeUpdate(dst, part_data->participantQos.second);
690 
691  } else {
692  OpenDDS::DCPS::RepoIdConverter converter(id.id);
693  ACE_ERROR((LM_ERROR,
694  ACE_TEXT("(%P|%t) PersistenceUpdater::update: ")
695  ACE_TEXT("participant %C not found\n"),
696  std::string(converter).c_str()));
697  }
698 }
#define ACE_ERROR(X)
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
int find(const EXT_ID &, INT_ID &, ACE_Allocator *alloc)
static int consolidate(ACE_Message_Block *dst, const ACE_Message_Block *src)
ParticipantIndex * participant_index_
Persisted Participants.
struct ParticipantStrt< QosSeq > Participant
Persisted participant data structure.
ACE_TEXT("TCP_Factory")
const ACE_Message_Block * begin(void) const
void storeUpdate(const ACE_Message_Block &data, BinSeq &storage)

◆ update() [2/7]

void Update::PersistenceUpdater::update ( const IdPath id,
const DDS::TopicQos qos 
)
virtual

Persist updated Qos parameters for a Topic.

Implements Update::Updater.

Definition at line 701 of file PersistenceUpdater.cpp.

References ACE_ERROR, ACE_TEXT(), ACE_OutputCDR::begin(), ACE_CDR::consolidate(), ACE_Hash_Map_With_Allocator< class, class >::find(), LM_ERROR, storeUpdate(), topic_index_, and Update::TopicStrt< QosSeq, ACE_CString >::topicQos.

702 {
703  IdType_ExtId ext(id.id);
704  PersistenceUpdater::Topic* topic_data = 0;
705 
706  if (this->topic_index_->find(ext, topic_data, this->allocator_.get()) == 0) {
707  TAO_OutputCDR outCdr;
708  outCdr << qos;
709  ACE_Message_Block dst;
710  ACE_CDR::consolidate(&dst, outCdr.begin());
711 
712  this->storeUpdate(dst, topic_data->topicQos.second);
713 
714  } else {
715  OpenDDS::DCPS::RepoIdConverter converter(id.id);
716  ACE_ERROR((LM_ERROR,
717  ACE_TEXT("(%P|%t) PersistenceUpdater::update: ")
718  ACE_TEXT("topic %C not found\n"),
719  std::string(converter).c_str()));
720  }
721 }
#define ACE_ERROR(X)
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
TopicIndex * topic_index_
Persisted Topics.
int find(const EXT_ID &, INT_ID &, ACE_Allocator *alloc)
static int consolidate(ACE_Message_Block *dst, const ACE_Message_Block *src)
ACE_TEXT("TCP_Factory")
const ACE_Message_Block * begin(void) const
void storeUpdate(const ACE_Message_Block &data, BinSeq &storage)
struct TopicStrt< QosSeq, ACE_CString > Topic
Persisted topic data structure.

◆ update() [3/7]

void Update::PersistenceUpdater::update ( const IdPath id,
const DDS::DataWriterQos qos 
)
virtual

Persist updated Qos parameters for a DataWriter.

Implements Update::Updater.

Definition at line 724 of file PersistenceUpdater.cpp.

References ACE_ERROR, ACE_TEXT(), actor_index_, ACE_OutputCDR::begin(), ACE_CDR::consolidate(), Update::ActorStrt< QosSeq, QosSeq, ACE_CString, BinSeq, ContentSubscriptionBin, BinSeq >::drdwQos, ACE_Hash_Map_With_Allocator< class, class >::find(), LM_ERROR, and storeUpdate().

725 {
726  IdType_ExtId ext(id.id);
727  PersistenceUpdater::RWActor* actor_data = 0;
728 
729  if (this->actor_index_->find(ext, actor_data, this->allocator_.get()) == 0) {
730  TAO_OutputCDR outCdr;
731  outCdr << qos;
732  ACE_Message_Block dst;
733  ACE_CDR::consolidate(&dst, outCdr.begin());
734 
735  this->storeUpdate(dst, actor_data->drdwQos.second);
736 
737  } else {
738  OpenDDS::DCPS::RepoIdConverter converter(id.id);
739  ACE_ERROR((LM_ERROR,
740  ACE_TEXT("(%P|%t) PersistenceUpdater::update(writerQos): ")
741  ACE_TEXT("publication %C not found\n"),
742  std::string(converter).c_str()));
743  }
744 }
#define ACE_ERROR(X)
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
int find(const EXT_ID &, INT_ID &, ACE_Allocator *alloc)
static int consolidate(ACE_Message_Block *dst, const ACE_Message_Block *src)
struct ActorStrt< QosSeq, QosSeq, ACE_CString, BinSeq, ContentSubscriptionBin, BinSeq > RWActor
Persisted actor data structure.
ACE_TEXT("TCP_Factory")
ActorIndex * actor_index_
Persisted Readers and Writers.
const ACE_Message_Block * begin(void) const
void storeUpdate(const ACE_Message_Block &data, BinSeq &storage)

◆ update() [4/7]

void Update::PersistenceUpdater::update ( const IdPath id,
const DDS::PublisherQos qos 
)
virtual

Persist updated Qos parameters for a Publisher.

Implements Update::Updater.

Definition at line 747 of file PersistenceUpdater.cpp.

References ACE_ERROR, ACE_TEXT(), actor_index_, ACE_OutputCDR::begin(), ACE_CDR::consolidate(), ACE_Hash_Map_With_Allocator< class, class >::find(), LM_ERROR, Update::ActorStrt< QosSeq, QosSeq, ACE_CString, BinSeq, ContentSubscriptionBin, BinSeq >::pubsubQos, and storeUpdate().

748 {
749  IdType_ExtId ext(id.id);
750  PersistenceUpdater::RWActor* actor_data = 0;
751 
752  if (this->actor_index_->find(ext, actor_data, this->allocator_.get()) == 0) {
753  TAO_OutputCDR outCdr;
754  outCdr << qos;
755  ACE_Message_Block dst;
756  ACE_CDR::consolidate(&dst, outCdr.begin());
757 
758  this->storeUpdate(dst, actor_data->pubsubQos.second);
759 
760  } else {
761  OpenDDS::DCPS::RepoIdConverter converter(id.id);
762  ACE_ERROR((LM_ERROR,
763  ACE_TEXT("(%P|%t) PersistenceUpdater::update(publisherQos): ")
764  ACE_TEXT("publication %C not found\n"),
765  std::string(converter).c_str()));
766  }
767 }
#define ACE_ERROR(X)
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
int find(const EXT_ID &, INT_ID &, ACE_Allocator *alloc)
static int consolidate(ACE_Message_Block *dst, const ACE_Message_Block *src)
struct ActorStrt< QosSeq, QosSeq, ACE_CString, BinSeq, ContentSubscriptionBin, BinSeq > RWActor
Persisted actor data structure.
ACE_TEXT("TCP_Factory")
ActorIndex * actor_index_
Persisted Readers and Writers.
const ACE_Message_Block * begin(void) const
void storeUpdate(const ACE_Message_Block &data, BinSeq &storage)

◆ update() [5/7]

void Update::PersistenceUpdater::update ( const IdPath id,
const DDS::DataReaderQos qos 
)
virtual

Persist updated Qos parameters for a DataReader.

Implements Update::Updater.

Definition at line 770 of file PersistenceUpdater.cpp.

References ACE_ERROR, ACE_TEXT(), actor_index_, ACE_OutputCDR::begin(), ACE_CDR::consolidate(), Update::ActorStrt< QosSeq, QosSeq, ACE_CString, BinSeq, ContentSubscriptionBin, BinSeq >::drdwQos, ACE_Hash_Map_With_Allocator< class, class >::find(), LM_ERROR, and storeUpdate().

771 {
772  IdType_ExtId ext(id.id);
773  PersistenceUpdater::RWActor* actor_data = 0;
774 
775  if (this->actor_index_->find(ext, actor_data, this->allocator_.get()) == 0) {
776  TAO_OutputCDR outCdr;
777  outCdr << qos;
778  ACE_Message_Block dst;
779  ACE_CDR::consolidate(&dst, outCdr.begin());
780 
781  this->storeUpdate(dst, actor_data->drdwQos.second);
782 
783  } else {
784  OpenDDS::DCPS::RepoIdConverter converter(id.id);
785  ACE_ERROR((LM_ERROR,
786  ACE_TEXT("(%P|%t) PersistenceUpdater::update(readerQos): ")
787  ACE_TEXT("subscription %C not found\n"),
788  std::string(converter).c_str()));
789  }
790 }
#define ACE_ERROR(X)
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
int find(const EXT_ID &, INT_ID &, ACE_Allocator *alloc)
static int consolidate(ACE_Message_Block *dst, const ACE_Message_Block *src)
struct ActorStrt< QosSeq, QosSeq, ACE_CString, BinSeq, ContentSubscriptionBin, BinSeq > RWActor
Persisted actor data structure.
ACE_TEXT("TCP_Factory")
ActorIndex * actor_index_
Persisted Readers and Writers.
const ACE_Message_Block * begin(void) const
void storeUpdate(const ACE_Message_Block &data, BinSeq &storage)

◆ update() [6/7]

void Update::PersistenceUpdater::update ( const IdPath id,
const DDS::SubscriberQos qos 
)
virtual

Persist updated Qos parameters for a Subscriber.

Implements Update::Updater.

Definition at line 793 of file PersistenceUpdater.cpp.

References ACE_ERROR, ACE_TEXT(), actor_index_, ACE_OutputCDR::begin(), ACE_CDR::consolidate(), ACE_Hash_Map_With_Allocator< class, class >::find(), LM_ERROR, Update::ActorStrt< QosSeq, QosSeq, ACE_CString, BinSeq, ContentSubscriptionBin, BinSeq >::pubsubQos, and storeUpdate().

794 {
795  IdType_ExtId ext(id.id);
796  PersistenceUpdater::RWActor* actor_data = 0;
797 
798  if (this->actor_index_->find(ext, actor_data, this->allocator_.get()) == 0) {
799  TAO_OutputCDR outCdr;
800  outCdr << qos;
801  ACE_Message_Block dst;
802  ACE_CDR::consolidate(&dst, outCdr.begin());
803 
804  this->storeUpdate(dst, actor_data->pubsubQos.second);
805 
806  } else {
807  OpenDDS::DCPS::RepoIdConverter converter(id.id);
808  ACE_ERROR((LM_ERROR,
809  ACE_TEXT("(%P|%t) PersistenceUpdater::update(subscriberQos): ")
810  ACE_TEXT("subscription %C not found\n"),
811  std::string(converter).c_str()));
812  }
813 }
#define ACE_ERROR(X)
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
int find(const EXT_ID &, INT_ID &, ACE_Allocator *alloc)
static int consolidate(ACE_Message_Block *dst, const ACE_Message_Block *src)
struct ActorStrt< QosSeq, QosSeq, ACE_CString, BinSeq, ContentSubscriptionBin, BinSeq > RWActor
Persisted actor data structure.
ACE_TEXT("TCP_Factory")
ActorIndex * actor_index_
Persisted Readers and Writers.
const ACE_Message_Block * begin(void) const
void storeUpdate(const ACE_Message_Block &data, BinSeq &storage)

◆ update() [7/7]

void Update::PersistenceUpdater::update ( const IdPath id,
const DDS::StringSeq exprParams 
)
virtual

Persist updated subscription exprParams.

Implements Update::Updater.

Definition at line 816 of file PersistenceUpdater.cpp.

References ACE_ERROR, ACE_TEXT(), actor_index_, allocator_, ACE_OutputCDR::begin(), ACE_CDR::consolidate(), Update::ActorStrt< QosSeq, QosSeq, ACE_CString, BinSeq, ContentSubscriptionBin, BinSeq >::contentSubscriptionProfile, Update::ContentSubscriptionBin::exprParams, ACE_Hash_Map_With_Allocator< class, class >::find(), LM_ERROR, and storeUpdate().

817 {
818  IdType_ExtId ext(id.id);
819  PersistenceUpdater::RWActor* actor_data = 0;
820 
821  if (actor_index_->find(ext, actor_data, allocator_.get()) == 0) {
822  TAO_OutputCDR outCdr;
823  outCdr << exprParams;
824  ACE_Message_Block dst;
825  ACE_CDR::consolidate(&dst, outCdr.begin());
826 
827  storeUpdate(dst, actor_data->contentSubscriptionProfile.exprParams);
828 
829  } else {
830  OpenDDS::DCPS::RepoIdConverter converter(id.id);
831  ACE_ERROR((LM_ERROR,
832  ACE_TEXT("(%P|%t) PersistenceUpdater::update(readerQos): ")
833  ACE_TEXT("subscription %C not found\n"),
834  std::string(converter).c_str()));
835  }
836 }
#define ACE_ERROR(X)
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
int find(const EXT_ID &, INT_ID &, ACE_Allocator *alloc)
static int consolidate(ACE_Message_Block *dst, const ACE_Message_Block *src)
struct ActorStrt< QosSeq, QosSeq, ACE_CString, BinSeq, ContentSubscriptionBin, BinSeq > RWActor
Persisted actor data structure.
ACE_TEXT("TCP_Factory")
ActorIndex * actor_index_
Persisted Readers and Writers.
OpenDDS::DCPS::unique_ptr< ALLOCATOR > allocator_
const ACE_Message_Block * begin(void) const
void storeUpdate(const ACE_Message_Block &data, BinSeq &storage)

◆ updateLastPartId()

void Update::PersistenceUpdater::updateLastPartId ( PartIdType  partId)
virtual

Update Last Participant Id for repo.

Reimplemented from Update::Updater.

Definition at line 894 of file PersistenceUpdater.cpp.

References last_part_id_.

895 {
896  *last_part_id_ = partId;
897 }
PartIdType * last_part_id_
What the last participant id is/was.

Member Data Documentation

◆ actor_index_

ActorIndex* Update::PersistenceUpdater::actor_index_
private

Persisted Readers and Writers.

Definition at line 150 of file PersistenceUpdater.h.

Referenced by create(), destroy(), init(), requestImage(), and update().

◆ allocator_

OpenDDS::DCPS::unique_ptr<ALLOCATOR> Update::PersistenceUpdater::allocator_
private

Definition at line 141 of file PersistenceUpdater.h.

Referenced by create(), destroy(), init(), storeUpdate(), and update().

◆ last_part_id_

PartIdType* Update::PersistenceUpdater::last_part_id_
private

What the last participant id is/was.

Definition at line 153 of file PersistenceUpdater.h.

Referenced by init(), requestImage(), and updateLastPartId().

◆ participant_index_

ParticipantIndex* Update::PersistenceUpdater::participant_index_
private

Persisted Participants.

Definition at line 147 of file PersistenceUpdater.h.

Referenced by create(), destroy(), init(), requestImage(), and update().

◆ persistence_file_

ACE_TString Update::PersistenceUpdater::persistence_file_
private

Definition at line 136 of file PersistenceUpdater.h.

Referenced by init(), and parse().

◆ reset_

bool Update::PersistenceUpdater::reset_
private

Definition at line 137 of file PersistenceUpdater.h.

Referenced by init(), and parse().

◆ topic_index_

TopicIndex* Update::PersistenceUpdater::topic_index_
private

Persisted Topics.

Definition at line 144 of file PersistenceUpdater.h.

Referenced by create(), destroy(), init(), requestImage(), and update().

◆ um_

Manager* Update::PersistenceUpdater::um_
private

Definition at line 139 of file PersistenceUpdater.h.

Referenced by init(), and requestImage().


The documentation for this class was generated from the following files: