OpenDDS  Snapshot(2023/04/28-20:55)
UpdateManager.cpp
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #include "DcpsInfo_pch.h"
9 
10 #include "UpdateManager.h"
11 #include "Updater.h"
12 #include "DCPSInfo_i.h"
13 
14 #include "tao/CDR.h"
15 
16 #include <vector>
17 
19 
20 namespace Update {
21 
23  : info_(0)
24 {
25 }
26 
28 {
29 }
30 
31 void
33 {
34  info_ = info;
35 }
36 
37 void
39 {
40  // push new element to the back.
41  updaters_.insert(updater);
42 }
43 
44 void
46 {
47  // Clean the reference to the InfoRepo.
48  info_ = 0;
49 }
50 
51 void
52 Manager::remove(const Updater* updater)
53 {
54  // check if the Updaters is part of the list.
55  Updaters::iterator iter = updaters_.find(const_cast<Updater*>(updater));
56 
57  if (iter == updaters_.end()) {
58  return;
59  }
60 
61  // remove the element
62  updaters_.erase(iter);
63 }
64 
65 int
67 {
68  return 0;
69 }
70 
71 int
73 {
74  return 0;
75 }
76 
77 void
79 {
80  for (Updaters::iterator iter = updaters_.begin();
81  iter != updaters_.end();
82  iter++) {
83  (*iter)->requestImage();
84  }
85 }
86 
87 template <typename T>
88 class SeqGuard {
89 public:
90  typedef std::vector<T*> Seq;
91 
92 public:
94  for (typename Seq::iterator iter = seq_.begin();
95  iter != seq_.end();) {
96  typename Seq::iterator current = iter;
97  iter++;
98 
99  delete(*current);
100  }
101  };
102 
103  Seq& seq() {
104  return seq_;
105  };
106 
107 private:
108  Seq seq_;
109 };
110 
111 void
113 {
114  if (info_ == NULL) {
115  return;
116  }
117 
118  // image to be propagated.
119  UImage u_image;
120 
121  /***************************
122  // The downstream image needs to be converted to a
123  // format compatible with the upstream layers (Dimage -> UImage)
124  // The Uimage cotains a lot of references to the complex data
125  // types. These are collecetd in several buckets (see below) and
126  // passed by reference. Usage of a custom guard class 'SeqGuard'
127  // automates memory cleanup.
128  ***************************/
129 
130  // Participant buckets
132  SeqGuard<DDS::DomainParticipantQos>::Seq& part_qos = part_qos_guard.seq();
133 
134  SeqGuard<UParticipant> part_guard;
135  SeqGuard<UParticipant>::Seq& parts = part_guard.seq();
136 
137  for (DImage::ParticipantSeq::const_iterator iter = image.participants.begin();
138  iter != image.participants.end(); iter++) {
139  const DParticipant& part = *iter;
140 
141  TAO_InputCDR in_cdr(part.participantQos.second.second
142  , part.participantQos.second.first);
143 
146  in_cdr >> *qos;
147  part_qos.push_back(qos);
148 
149  UParticipant* u_part;
151  , part.owner
152  , part.participantId
153  , *qos));
154  parts.push_back(u_part);
155 
156  // push newly created UParticipant into UImage Participant bucket
157  u_image.participants.push_back(u_part);
158  }
159 
160  // Topic buckets
161  SeqGuard<DDS::TopicQos> topics_qos_guard;
162  SeqGuard<DDS::TopicQos>::Seq& topics_qos = topics_qos_guard.seq();
163 
164  SeqGuard<UTopic> topics_guard;
165  SeqGuard<UTopic>::Seq& topics = topics_guard.seq();
166 
167  for (DImage::TopicSeq::const_iterator iter = image.topics.begin();
168  iter != image.topics.end(); iter++) {
169  const DTopic& topic = *iter;
170 
171  TAO_InputCDR in_cdr(topic.topicQos.second.second
172  , topic.topicQos.second.first);
173 
174  DDS::TopicQos* qos;
176  in_cdr >> *qos;
177  topics_qos.push_back(qos);
178 
179  UTopic* u_topic;
180  ACE_NEW_NORETURN(u_topic
181  , UTopic(topic.domainId, topic.topicId
182  , topic.participantId
183  , topic.name.c_str()
184  , topic.dataType.c_str(), *qos));
185  topics.push_back(u_topic);
186 
187  // Push newly created UTopic into UImage Topic bucket
188  u_image.topics.push_back(u_topic);
189  }
190 
191  // Actor buckets
192  SeqGuard<DDS::PublisherQos> pub_qos_guard;
193  SeqGuard<DDS::PublisherQos>::Seq& pub_qos_seq = pub_qos_guard.seq();
194  SeqGuard<DDS::DataWriterQos> dw_qos_guard;
195  SeqGuard<DDS::DataWriterQos>::Seq& dw_qos_seq = dw_qos_guard.seq();
196 
197  SeqGuard<DDS::SubscriberQos> sub_qos_guard;
198  SeqGuard<DDS::SubscriberQos>::Seq& sub_qos_seq = sub_qos_guard.seq();
199  SeqGuard<DDS::DataReaderQos> dr_qos_guard;
200  SeqGuard<DDS::DataReaderQos>::Seq& dr_qos_seq = dr_qos_guard.seq();
201 
203  SeqGuard<OpenDDS::DCPS::TransportLocatorSeq>::Seq& transports = trans_guard.seq();
204 
205  SeqGuard<DDS::OctetSeq> type_info_guard;
206  SeqGuard<DDS::OctetSeq>::Seq& serializedTypeInfoSeq = type_info_guard.seq();
207 
208  SeqGuard<URActor> reader_guard;
209  SeqGuard<URActor>::Seq& readers = reader_guard.seq();
210  SeqGuard<UWActor> writer_guard;
211  SeqGuard<UWActor>::Seq& writers = writer_guard.seq();
212 
214 
215  for (DImage::ReaderSeq::const_iterator iter = image.actors.begin();
216  iter != image.actors.end(); iter++) {
217  const DActor& actor = *iter;
218 
219  TAO_InputCDR in_cdr(actor.transportInterfaceInfo.second
220  , actor.transportInterfaceInfo.first);
223  transports.push_back(trans);
224  in_cdr >> *trans;
225 
226  TAO_InputCDR ti_cdr(actor.serializedTypeInfo.second
227  , actor.serializedTypeInfo.first);
228  DDS::OctetSeq* type_info_ptr;
229  ACE_NEW_NORETURN(type_info_ptr, DDS::OctetSeq);
230  serializedTypeInfoSeq.push_back(type_info_ptr);
231  ti_cdr >> *type_info_ptr;
232 
233  DDS::PublisherQos* pub_qos = 0;
234  DDS::DataWriterQos* writer_qos = 0;
235  DDS::SubscriberQos* sub_qos = 0;
236  DDS::DataReaderQos* reader_qos = 0;
237 
238  if (actor.type == DataReader) {
239  TAO_InputCDR sub_cdr(actor.pubsubQos.second.second
240  , actor.pubsubQos.second.first);
242  sub_qos_seq.push_back(sub_qos);
243  sub_cdr >> *sub_qos;
244 
245  TAO_InputCDR read_cdr(actor.drdwQos.second.second
246  , actor.drdwQos.second.first);
248  dr_qos_seq.push_back(reader_qos);
249  read_cdr >> *reader_qos;
250 
251  csi.filterClassName = actor.contentSubscriptionProfile.filterClassName.c_str();
252  csi.filterExpr = actor.contentSubscriptionProfile.filterExpr.c_str();
253  TAO_InputCDR csp_cdr(actor.contentSubscriptionProfile.exprParams.second,
254  actor.contentSubscriptionProfile.exprParams.first);
255  csp_cdr >> csi.exprParams;
256 
257  URActor* reader;
258  ACE_NEW_NORETURN(reader
259  , URActor(actor.domainId, actor.actorId
260  , actor.topicId, actor.participantId
261  , actor.type, actor.callback.c_str()
262  , *sub_qos, *reader_qos
263  , *trans, actor.transportContext, csi
264  , *type_info_ptr));
265  readers.push_back(reader);
266  u_image.actors.push_back(reader);
267 
268  } else if (actor.type == DataWriter) {
269  TAO_InputCDR pub_cdr(actor.pubsubQos.second.second
270  , actor.pubsubQos.second.first);
272  pub_qos_seq.push_back(pub_qos);
273  pub_cdr >> *pub_qos;
274 
275  TAO_InputCDR write_cdr(actor.drdwQos.second.second
276  , actor.drdwQos.second.first);
278  dw_qos_seq.push_back(writer_qos);
279  write_cdr >> *writer_qos;
280 
281  UWActor* writer;
282  ACE_NEW_NORETURN(writer
283  , UWActor(actor.domainId, actor.actorId
284  , actor.topicId, actor.participantId
285  , actor.type, actor.callback.c_str()
286  , *pub_qos, *writer_qos
287  , *trans, actor.transportContext, csi
288  , *type_info_ptr));
289  writers.push_back(writer);
290  u_image.wActors.push_back(writer);
291 
292  } else {
293  ACE_ERROR((LM_ERROR, "Update::Manager::pushImage> unknown "
294  "actor type.\n"));
295  }
296  }
297 
298  u_image.lastPartId = image.lastPartId;
299 
300  info_->receive_image(u_image);
301 }
302 
303 void
304 Manager::destroy(const IdPath& id, ItemType type, ActorType actor)
305 {
306  // Invoke remove on each of the iterators.
307  for (Updaters::iterator iter = updaters_.begin();
308  iter != updaters_.end();
309  iter++) {
310  (*iter)->destroy(id, type, actor);
311  }
312 }
313 
314 void
315 Manager::add(const DTopic& topic)
316 {
317  if (info_ == NULL) {
318  return;
319  }
320 
321  // Demarshal QOS data
322  TAO_InputCDR in_cdr(topic.topicQos.second.second
323  , topic.topicQos.second.first);
324 
325  DDS::TopicQos qos;
326  in_cdr >> qos;
327 
328  // Pass topic info to infoRepo.
329  info_->add_topic(topic.topicId, topic.domainId
330  , topic.participantId, topic.name.c_str()
331  , topic.dataType.c_str(), qos);
332 }
333 
334 void
335 Manager::add(const DParticipant& participant)
336 {
337  if (info_ == NULL) {
338  return;
339  }
340 
341  // Demarshal QOS data
342  TAO_InputCDR in_cdr(participant.participantQos.second.second
343  , participant.participantQos.second.first);
344 
346  in_cdr >> qos;
347 
348  // Pass participant info to infoRepo.
350  , participant.participantId
351  , qos);
352 }
353 
354 void
355 Manager::add(const DActor& actor)
356 {
357  if (info_ == NULL) {
358  return;
359  }
360 
361  // Demarshal QOS data
362  TAO_InputCDR pubSubCdr(actor.pubsubQos.second.second
363  , actor.pubsubQos.second.first);
364 
365  TAO_InputCDR drdwCdr(actor.drdwQos.second.second
366  , actor.drdwQos.second.first);
367 
368  std::string callback(actor.callback.c_str());
369 
370  TAO_InputCDR transportCdr(actor.transportInterfaceInfo.second
371  , actor.transportInterfaceInfo.first);
372 
373  OpenDDS::DCPS::TransportLocatorSeq transport_info;
374  transportCdr >> transport_info;
375 
376  TAO_InputCDR typeInfoCdr(actor.serializedTypeInfo.second, actor.serializedTypeInfo.first);
377 
378  DDS::OctetSeq serializedTypeInfo;
379  typeInfoCdr >> serializedTypeInfo;
380 
381  if (actor.type == DataReader) {
382  DDS::SubscriberQos sub_qos;
383  DDS::DataReaderQos reader_qos;
384 
385  pubSubCdr >> sub_qos;
386  drdwCdr >> reader_qos;
387 
389  csi.filterClassName = actor.contentSubscriptionProfile.filterClassName.c_str();
390  csi.filterExpr = actor.contentSubscriptionProfile.filterExpr.c_str();
391  TAO_InputCDR cspCdr(actor.contentSubscriptionProfile.exprParams.second,
392  actor.contentSubscriptionProfile.exprParams.first);
393  cspCdr >> csi.exprParams;
394 
395  // Pass actor to InfoRepo.
397  , actor.topicId, actor.actorId
398  , callback.c_str(), reader_qos
399  , transport_info, actor.transportContext, sub_qos
400  , csi.filterClassName, csi.filterExpr, csi.exprParams
401  , serializedTypeInfo);
402 
403  } else if (actor.type == DataWriter) {
404  DDS::PublisherQos pub_qos;
405  DDS::DataWriterQos writer_qos;
406 
407  pubSubCdr >> pub_qos;
408  drdwCdr >> writer_qos;
409 
410  // Pass actor info to infoRepo.
412  , actor.topicId, actor.actorId
413  , callback.c_str(), writer_qos
414  , transport_info, actor.transportContext, pub_qos
415  , serializedTypeInfo);
416  }
417 }
418 
420 {
421  for (Updaters::iterator iter = updaters_.begin();
422  iter != updaters_.end();
423  iter++) {
424  (*iter)->updateLastPartId(partId);
425  }
426 }
427 
428 } // namespace Update
429 
430 
431 int
433 {
435  (ace_svc_desc_UpdateManagerSvc);
436  return 0;
437 }
438 
439 ACE_FACTORY_DEFINE(ACE_Local_Service, UpdateManagerSvc)
440 
442  ACE_TEXT("UpdateManagerSvc"),
447  0)
448 
#define ACE_ERROR(X)
ACE_CDR::ULong transportContext
ACE_FACTORY_DEFINE(OpenDDS_InfoRepoDiscovery, IRDiscoveryLoader)
& ACE_SVC_NAME(TAO_AV_TCP_Factory)
std::vector< T * > Seq
static int process_directive(const ACE_TCHAR directive[])
long PartIdType
void destroy(const IdPath &id, ItemType type, ActorType actor=DataWriter)
bool receive_image(const Update::UImage &image)
sequence< TransportLocator > TransportLocatorSeq
ACE_STATIC_SVC_DEFINE(ACE_Logging_Strategy, ACE_TEXT("Logging_Strategy"), ACE_Service_Type::SERVICE_OBJECT, &ACE_SVC_NAME(ACE_Logging_Strategy), ACE_Service_Type::DELETE_THIS|ACE_Service_Type::DELETE_OBJ, 0) extern "C" int _get_dll_unload_policy()
ACE_SVC_OBJ_T
virtual OpenDDS::DCPS::GUID_t add_subscription(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const OpenDDS::DCPS::GUID_t &topicId, OpenDDS::DCPS::DataReaderRemote_ptr subscription, const DDS::DataReaderQos &qos, const OpenDDS::DCPS::TransportLocatorSeq &transInfo, const DDS::SubscriberQos &subscriberQos, const char *filterClassName, const char *filterExpression, const DDS::StringSeq &exprParams, const DDS::OctetSeq &serializedTypeInfo)
Definition: DCPSInfo_i.cpp:681
OPENDDS_END_VERSIONED_NAMESPACE_DECL OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL typedef Update::Manager UpdateManagerSvc
void add(TAO_DDS_DCPSInfo_i *info)
char ACE_TCHAR
#define ACE_NEW_NORETURN(POINTER, CONSTRUCTOR)
bool add_topic(const OpenDDS::DCPS::GUID_t &topicId, DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const char *topicName, const char *dataTypeName, const DDS::TopicQos &qos)
Add a previously existing topic to the repository.
Definition: DCPSInfo_i.cpp:232
void requestImage()
Force a clean shutdown.
PartIdType lastPartId
What the last participant id is/was.
ParticipantSeq participants
sequence< octet > OctetSeq
Definition: DdsDcpsCore.idl:64
DomainIdType domainId
struct ParticipantStrt< DDS::DomainParticipantQos &> UParticipant
Implementation of the DCPSInfo.
Definition: DCPSInfo_i.h:53
virtual OpenDDS::DCPS::GUID_t add_publication(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const OpenDDS::DCPS::GUID_t &topicId, OpenDDS::DCPS::DataWriterRemote_ptr publication, const DDS::DataWriterQos &qos, const OpenDDS::DCPS::TransportLocatorSeq &transInfo, const DDS::PublisherQos &publisherQos, const DDS::OctetSeq &serializedTypeInfo)
Definition: DCPSInfo_i.cpp:374
Updaters updaters_
Definition: UpdateManager.h:89
virtual OpenDDS::DCPS::AddDomainStatus add_domain_participant(DDS::DomainId_t domain, const DDS::DomainParticipantQos &qos)
ACE_TEXT("TCP_Factory")
virtual int fini()
Shared object finalizer.
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
virtual ~Manager()
virtual int info(ACE_TCHAR **info_string, size_t length=0) const
void pushImage(const DImage &image)
Downstream request to push image.
struct TopicStrt< DDS::TopicQos &, std::string > UTopic
TAO_DDS_DCPSInfo_i * info_
Definition: UpdateManager.h:88
struct ActorStrt< DDS::SubscriberQos &, DDS::DataReaderQos &, std::string, OpenDDS::DCPS::TransportLocatorSeq &, ContentSubscriptionInfo &, DDS::OctetSeq &> URActor
DomainIdType domainId
virtual int init(int argc, ACE_TCHAR *argv[])
Shared object initializer.
virtual void updateLastPartId(PartIdType partId)
Update Last Participant Id for the repo.
LM_ERROR
struct ActorStrt< DDS::PublisherQos &, DDS::DataWriterQos &, std::string, OpenDDS::DCPS::TransportLocatorSeq &, ContentSubscriptionInfo &, DDS::OctetSeq &> UWActor