00001
00002
00003
00004
00005
00006
00007
00008 #include "DcpsInfo_pch.h"
00009
00010 #include "UpdateManager.h"
00011 #include "Updater.h"
00012 #include "ArrDelAdapter.h"
00013 #include "DCPSInfo_i.h"
00014
00015 #include "tao/CDR.h"
00016
00017 #include <vector>
00018
00019 namespace Update {
00020
00021 Manager::Manager()
00022 : info_(0)
00023 {
00024 }
00025
00026 Manager::~Manager()
00027 {
00028 }
00029
00030 void
00031 Manager::add(TAO_DDS_DCPSInfo_i* info)
00032 {
00033 info_ = info;
00034 }
00035
00036 void
00037 Manager::add(Updater* updater)
00038 {
00039
00040 updaters_.insert(updater);
00041 }
00042
00043 void
00044 Manager::remove()
00045 {
00046
00047 info_ = 0;
00048 }
00049
00050 void
00051 Manager::remove(const Updater* updater)
00052 {
00053
00054 Updaters::iterator iter = updaters_.find(const_cast<Updater*>(updater));
00055
00056 if (iter == updaters_.end()) {
00057 return;
00058 }
00059
00060
00061 updaters_.erase(iter);
00062 }
00063
00064 int
00065 Manager::init(int , ACE_TCHAR *[])
00066 {
00067 return 0;
00068 }
00069
00070 int
00071 Manager::fini()
00072 {
00073 return 0;
00074 }
00075
00076 void
00077 Manager::requestImage()
00078 {
00079 for (Updaters::iterator iter = updaters_.begin();
00080 iter != updaters_.end();
00081 iter++) {
00082 (*iter)->requestImage();
00083 }
00084 }
00085
00086 template <typename T>
00087 class SeqGuard {
00088 public:
00089 typedef std::vector<T*> Seq;
00090
00091 public:
00092 ~SeqGuard() {
00093 for (typename Seq::iterator iter = seq_.begin();
00094 iter != seq_.end();) {
00095 typename Seq::iterator current = iter;
00096 iter++;
00097
00098 delete(*current);
00099 }
00100 };
00101
00102 Seq& seq() {
00103 return seq_;
00104 };
00105
00106 private:
00107 Seq seq_;
00108 };
00109
00110 void
00111 Manager::pushImage(const DImage& image)
00112 {
00113 if (info_ == NULL) {
00114 return;
00115 }
00116
00117
00118 UImage u_image;
00119
00120
00121
00122
00123
00124
00125
00126
00127
00128
00129
00130 SeqGuard<DDS::DomainParticipantQos> part_qos_guard;
00131 SeqGuard<DDS::DomainParticipantQos>::Seq& part_qos = part_qos_guard.seq();
00132
00133 SeqGuard<UParticipant> part_guard;
00134 SeqGuard<UParticipant>::Seq& parts = part_guard.seq();
00135
00136 for (DImage::ParticipantSeq::const_iterator iter = image.participants.begin();
00137 iter != image.participants.end(); iter++) {
00138 const DParticipant& part = *iter;
00139
00140 TAO_InputCDR in_cdr(part.participantQos.second.second
00141 , part.participantQos.second.first);
00142
00143 DDS::DomainParticipantQos* qos;
00144 ACE_NEW_NORETURN(qos, DDS::DomainParticipantQos);
00145 in_cdr >> *qos;
00146 part_qos.push_back(qos);
00147
00148 UParticipant* u_part;
00149 ACE_NEW_NORETURN(u_part, UParticipant(part.domainId
00150 , part.owner
00151 , part.participantId
00152 , *qos));
00153 parts.push_back(u_part);
00154
00155
00156 u_image.participants.push_back(u_part);
00157 }
00158
00159
00160 SeqGuard<DDS::TopicQos> topics_qos_guard;
00161 SeqGuard<DDS::TopicQos>::Seq& topics_qos = topics_qos_guard.seq();
00162
00163 SeqGuard<UTopic> topics_guard;
00164 SeqGuard<UTopic>::Seq& topics = topics_guard.seq();
00165
00166 for (DImage::TopicSeq::const_iterator iter = image.topics.begin();
00167 iter != image.topics.end(); iter++) {
00168 const DTopic& topic = *iter;
00169
00170 TAO_InputCDR in_cdr(topic.topicQos.second.second
00171 , topic.topicQos.second.first);
00172
00173 DDS::TopicQos* qos;
00174 ACE_NEW_NORETURN(qos, DDS::TopicQos);
00175 in_cdr >> *qos;
00176 topics_qos.push_back(qos);
00177
00178 UTopic* u_topic;
00179 ACE_NEW_NORETURN(u_topic
00180 , UTopic(topic.domainId, topic.topicId
00181 , topic.participantId
00182 , topic.name.c_str()
00183 , topic.dataType.c_str(), *qos));
00184 topics.push_back(u_topic);
00185
00186
00187 u_image.topics.push_back(u_topic);
00188 }
00189
00190
00191 SeqGuard<DDS::PublisherQos> pub_qos_guard;
00192 SeqGuard<DDS::PublisherQos>::Seq& pub_qos_seq = pub_qos_guard.seq();
00193 SeqGuard<DDS::DataWriterQos> dw_qos_guard;
00194 SeqGuard<DDS::DataWriterQos>::Seq& dw_qos_seq = dw_qos_guard.seq();
00195
00196 SeqGuard<DDS::SubscriberQos> sub_qos_guard;
00197 SeqGuard<DDS::SubscriberQos>::Seq& sub_qos_seq = sub_qos_guard.seq();
00198 SeqGuard<DDS::DataReaderQos> dr_qos_guard;
00199 SeqGuard<DDS::DataReaderQos>::Seq& dr_qos_seq = dr_qos_guard.seq();
00200
00201 SeqGuard<OpenDDS::DCPS::TransportLocatorSeq> trans_guard;
00202 SeqGuard<OpenDDS::DCPS::TransportLocatorSeq>::Seq& transports = trans_guard.seq();
00203
00204 SeqGuard<URActor> reader_guard;
00205 SeqGuard<URActor>::Seq& readers = reader_guard.seq();
00206 SeqGuard<UWActor> writer_guard;
00207 SeqGuard<UWActor>::Seq& writers = writer_guard.seq();
00208
00209 for (DImage::ReaderSeq::const_iterator iter = image.actors.begin();
00210 iter != image.actors.end(); iter++) {
00211 const DActor& actor = *iter;
00212
00213 TAO_InputCDR in_cdr(actor.transportInterfaceInfo.second
00214 , actor.transportInterfaceInfo.first);
00215 OpenDDS::DCPS::TransportLocatorSeq* trans;
00216 ACE_NEW_NORETURN(trans, OpenDDS::DCPS::TransportLocatorSeq);
00217 transports.push_back(trans);
00218 in_cdr >> *trans;
00219
00220 DDS::PublisherQos* pub_qos = 0;
00221 DDS::DataWriterQos* writer_qos = 0;
00222 DDS::SubscriberQos* sub_qos = 0;
00223 DDS::DataReaderQos* reader_qos = 0;
00224
00225 if (actor.type == DataReader) {
00226 TAO_InputCDR sub_cdr(actor.pubsubQos.second.second
00227 , actor.pubsubQos.second.first);
00228 ACE_NEW_NORETURN(sub_qos, DDS::SubscriberQos);
00229 sub_qos_seq.push_back(sub_qos);
00230 sub_cdr >> *sub_qos;
00231
00232 TAO_InputCDR read_cdr(actor.drdwQos.second.second
00233 , actor.drdwQos.second.first);
00234 ACE_NEW_NORETURN(reader_qos, DDS::DataReaderQos);
00235 dr_qos_seq.push_back(reader_qos);
00236 read_cdr >> *reader_qos;
00237
00238 ContentSubscriptionInfo* csi = 0;
00239 ACE_NEW_NORETURN(csi, ContentSubscriptionInfo);
00240 csi->filterClassName = actor.contentSubscriptionProfile.filterClassName.c_str();
00241 csi->filterExpr = actor.contentSubscriptionProfile.filterExpr.c_str();
00242 TAO_InputCDR csp_cdr(actor.contentSubscriptionProfile.exprParams.second,
00243 actor.contentSubscriptionProfile.exprParams.first);
00244 csp_cdr >> csi->exprParams;
00245
00246 URActor* reader;
00247 ACE_NEW_NORETURN(reader
00248 , URActor(actor.domainId, actor.actorId
00249 , actor.topicId, actor.participantId
00250 , actor.type, actor.callback.c_str()
00251 , *sub_qos, *reader_qos
00252 , *trans, *csi));
00253 readers.push_back(reader);
00254 u_image.actors.push_back(reader);
00255
00256 } else if (actor.type == DataWriter) {
00257 TAO_InputCDR pub_cdr(actor.pubsubQos.second.second
00258 , actor.pubsubQos.second.first);
00259 ACE_NEW_NORETURN(pub_qos, DDS::PublisherQos);
00260 pub_qos_seq.push_back(pub_qos);
00261 pub_cdr >> *pub_qos;
00262
00263 TAO_InputCDR write_cdr(actor.drdwQos.second.second
00264 , actor.drdwQos.second.first);
00265 ACE_NEW_NORETURN(writer_qos, DDS::DataWriterQos);
00266 dw_qos_seq.push_back(writer_qos);
00267 write_cdr >> *writer_qos;
00268
00269 ContentSubscriptionInfo csi;
00270
00271 UWActor* writer;
00272 ACE_NEW_NORETURN(writer
00273 , UWActor(actor.domainId, actor.actorId
00274 , actor.topicId, actor.participantId
00275 , actor.type, actor.callback.c_str()
00276 , *pub_qos, *writer_qos
00277 , *trans, csi));
00278 writers.push_back(writer);
00279 u_image.wActors.push_back(writer);
00280
00281 } else {
00282 ACE_ERROR((LM_ERROR, "Update::Manager::pushImage> unknown "
00283 "actor type.\n"));
00284 }
00285 }
00286
00287 info_->receive_image(u_image);
00288 }
00289
00290 void
00291 Manager::destroy(const IdPath& id, ItemType type, ActorType actor)
00292 {
00293
00294 for (Updaters::iterator iter = updaters_.begin();
00295 iter != updaters_.end();
00296 iter++) {
00297 (*iter)->destroy(id, type, actor);
00298 }
00299 }
00300
00301 void
00302 Manager::add(const DTopic& topic)
00303 {
00304 if (info_ == NULL) {
00305 return;
00306 }
00307
00308
00309 TAO_InputCDR in_cdr(topic.topicQos.second.second
00310 , topic.topicQos.second.first);
00311
00312 DDS::TopicQos qos;
00313 in_cdr >> qos;
00314
00315
00316 info_->add_topic(topic.topicId, topic.domainId
00317 , topic.participantId, topic.name.c_str()
00318 , topic.dataType.c_str(), qos);
00319 }
00320
00321 void
00322 Manager::add(const DParticipant& participant)
00323 {
00324 if (info_ == NULL) {
00325 return;
00326 }
00327
00328
00329 TAO_InputCDR in_cdr(participant.participantQos.second.second
00330 , participant.participantQos.second.first);
00331
00332 DDS::DomainParticipantQos qos;
00333 in_cdr >> qos;
00334
00335
00336 info_->add_domain_participant(participant.domainId
00337 , participant.participantId
00338 , qos);
00339 }
00340
00341 void
00342 Manager::add(const DActor& actor)
00343 {
00344 if (info_ == NULL) {
00345 return;
00346 }
00347
00348
00349 TAO_InputCDR pubSubCdr(actor.pubsubQos.second.second
00350 , actor.pubsubQos.second.first);
00351
00352 TAO_InputCDR drdwCdr(actor.drdwQos.second.second
00353 , actor.drdwQos.second.first);
00354
00355 std::string callback(actor.callback.c_str());
00356
00357 TAO_InputCDR transportCdr(actor.transportInterfaceInfo.second
00358 , actor.transportInterfaceInfo.first);
00359
00360 OpenDDS::DCPS::TransportLocatorSeq transport_info;
00361 transportCdr >> transport_info;
00362
00363 if (actor.type == DataReader) {
00364 DDS::SubscriberQos sub_qos;
00365 DDS::DataReaderQos reader_qos;
00366
00367 pubSubCdr >> sub_qos;
00368 drdwCdr >> reader_qos;
00369
00370 Update::ContentSubscriptionInfo csi;
00371 csi.filterClassName = actor.contentSubscriptionProfile.filterClassName.c_str();
00372 csi.filterExpr = actor.contentSubscriptionProfile.filterExpr.c_str();
00373 TAO_InputCDR cspCdr(actor.contentSubscriptionProfile.exprParams.second,
00374 actor.contentSubscriptionProfile.exprParams.first);
00375 cspCdr >> csi.exprParams;
00376
00377
00378 info_->add_subscription(actor.domainId, actor.participantId
00379 , actor.topicId, actor.actorId
00380 , callback.c_str(), reader_qos
00381 , transport_info, sub_qos
00382 , csi.filterClassName, csi.filterExpr, csi.exprParams);
00383
00384 } else if (actor.type == DataWriter) {
00385 DDS::PublisherQos pub_qos;
00386 DDS::DataWriterQos writer_qos;
00387
00388 pubSubCdr >> pub_qos;
00389 drdwCdr >> writer_qos;
00390
00391
00392 info_->add_publication(actor.domainId, actor.participantId
00393 , actor.topicId, actor.actorId
00394 , callback.c_str(), writer_qos
00395 , transport_info, pub_qos);
00396 }
00397 }
00398
00399 }
00400
00401 int
00402 UpdateManagerSvc_Loader::init()
00403 {
00404 return ACE_Service_Config::process_directive
00405 (ace_svc_desc_UpdateManagerSvc);
00406 return 0;
00407 }
00408
00409 ACE_FACTORY_DEFINE(ACE_Local_Service, UpdateManagerSvc)
00410
00411 ACE_STATIC_SVC_DEFINE(UpdateManagerSvc,
00412 ACE_TEXT("UpdateManagerSvc"),
00413 ACE_SVC_OBJ_T,
00414 &ACE_SVC_NAME(UpdateManagerSvc),
00415 ACE_Service_Type::DELETE_THIS
00416 | ACE_Service_Type::DELETE_OBJ,
00417 0)
00418
00419