00001
00002
00003
00004
00005
00006
00007
00008 #include "DcpsInfo_pch.h"
00009
00010 #include "UpdateManager.h"
00011 #include "Updater.h"
00012 #include "ArrDelAdapter.h"
00013 #include "DCPSInfo_i.h"
00014
00015 #include "tao/CDR.h"
00016
00017 #include <vector>
00018
00019 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00020
00021 namespace Update {
00022
00023 Manager::Manager()
00024 : info_(0)
00025 {
00026 }
00027
00028 Manager::~Manager()
00029 {
00030 }
00031
00032 void
00033 Manager::add(TAO_DDS_DCPSInfo_i* info)
00034 {
00035 info_ = info;
00036 }
00037
00038 void
00039 Manager::add(Updater* updater)
00040 {
00041
00042 updaters_.insert(updater);
00043 }
00044
00045 void
00046 Manager::remove()
00047 {
00048
00049 info_ = 0;
00050 }
00051
00052 void
00053 Manager::remove(const Updater* updater)
00054 {
00055
00056 Updaters::iterator iter = updaters_.find(const_cast<Updater*>(updater));
00057
00058 if (iter == updaters_.end()) {
00059 return;
00060 }
00061
00062
00063 updaters_.erase(iter);
00064 }
00065
00066 int
00067 Manager::init(int , ACE_TCHAR *[])
00068 {
00069 return 0;
00070 }
00071
00072 int
00073 Manager::fini()
00074 {
00075 return 0;
00076 }
00077
00078 void
00079 Manager::requestImage()
00080 {
00081 for (Updaters::iterator iter = updaters_.begin();
00082 iter != updaters_.end();
00083 iter++) {
00084 (*iter)->requestImage();
00085 }
00086 }
00087
00088 template <typename T>
00089 class SeqGuard {
00090 public:
00091 typedef std::vector<T*> Seq;
00092
00093 public:
00094 ~SeqGuard() {
00095 for (typename Seq::iterator iter = seq_.begin();
00096 iter != seq_.end();) {
00097 typename Seq::iterator current = iter;
00098 iter++;
00099
00100 delete(*current);
00101 }
00102 };
00103
00104 Seq& seq() {
00105 return seq_;
00106 };
00107
00108 private:
00109 Seq seq_;
00110 };
00111
00112 void
00113 Manager::pushImage(const DImage& image)
00114 {
00115 if (info_ == NULL) {
00116 return;
00117 }
00118
00119
00120 UImage u_image;
00121
00122
00123
00124
00125
00126
00127
00128
00129
00130
00131
00132 SeqGuard<DDS::DomainParticipantQos> part_qos_guard;
00133 SeqGuard<DDS::DomainParticipantQos>::Seq& part_qos = part_qos_guard.seq();
00134
00135 SeqGuard<UParticipant> part_guard;
00136 SeqGuard<UParticipant>::Seq& parts = part_guard.seq();
00137
00138 for (DImage::ParticipantSeq::const_iterator iter = image.participants.begin();
00139 iter != image.participants.end(); iter++) {
00140 const DParticipant& part = *iter;
00141
00142 TAO_InputCDR in_cdr(part.participantQos.second.second
00143 , part.participantQos.second.first);
00144
00145 DDS::DomainParticipantQos* qos;
00146 ACE_NEW_NORETURN(qos, DDS::DomainParticipantQos);
00147 in_cdr >> *qos;
00148 part_qos.push_back(qos);
00149
00150 UParticipant* u_part;
00151 ACE_NEW_NORETURN(u_part, UParticipant(part.domainId
00152 , part.owner
00153 , part.participantId
00154 , *qos));
00155 parts.push_back(u_part);
00156
00157
00158 u_image.participants.push_back(u_part);
00159 }
00160
00161
00162 SeqGuard<DDS::TopicQos> topics_qos_guard;
00163 SeqGuard<DDS::TopicQos>::Seq& topics_qos = topics_qos_guard.seq();
00164
00165 SeqGuard<UTopic> topics_guard;
00166 SeqGuard<UTopic>::Seq& topics = topics_guard.seq();
00167
00168 for (DImage::TopicSeq::const_iterator iter = image.topics.begin();
00169 iter != image.topics.end(); iter++) {
00170 const DTopic& topic = *iter;
00171
00172 TAO_InputCDR in_cdr(topic.topicQos.second.second
00173 , topic.topicQos.second.first);
00174
00175 DDS::TopicQos* qos;
00176 ACE_NEW_NORETURN(qos, DDS::TopicQos);
00177 in_cdr >> *qos;
00178 topics_qos.push_back(qos);
00179
00180 UTopic* u_topic;
00181 ACE_NEW_NORETURN(u_topic
00182 , UTopic(topic.domainId, topic.topicId
00183 , topic.participantId
00184 , topic.name.c_str()
00185 , topic.dataType.c_str(), *qos));
00186 topics.push_back(u_topic);
00187
00188
00189 u_image.topics.push_back(u_topic);
00190 }
00191
00192
00193 SeqGuard<DDS::PublisherQos> pub_qos_guard;
00194 SeqGuard<DDS::PublisherQos>::Seq& pub_qos_seq = pub_qos_guard.seq();
00195 SeqGuard<DDS::DataWriterQos> dw_qos_guard;
00196 SeqGuard<DDS::DataWriterQos>::Seq& dw_qos_seq = dw_qos_guard.seq();
00197
00198 SeqGuard<DDS::SubscriberQos> sub_qos_guard;
00199 SeqGuard<DDS::SubscriberQos>::Seq& sub_qos_seq = sub_qos_guard.seq();
00200 SeqGuard<DDS::DataReaderQos> dr_qos_guard;
00201 SeqGuard<DDS::DataReaderQos>::Seq& dr_qos_seq = dr_qos_guard.seq();
00202
00203 SeqGuard<OpenDDS::DCPS::TransportLocatorSeq> trans_guard;
00204 SeqGuard<OpenDDS::DCPS::TransportLocatorSeq>::Seq& transports = trans_guard.seq();
00205
00206 SeqGuard<URActor> reader_guard;
00207 SeqGuard<URActor>::Seq& readers = reader_guard.seq();
00208 SeqGuard<UWActor> writer_guard;
00209 SeqGuard<UWActor>::Seq& writers = writer_guard.seq();
00210
00211 ContentSubscriptionInfo csi;
00212
00213 for (DImage::ReaderSeq::const_iterator iter = image.actors.begin();
00214 iter != image.actors.end(); iter++) {
00215 const DActor& actor = *iter;
00216
00217 TAO_InputCDR in_cdr(actor.transportInterfaceInfo.second
00218 , actor.transportInterfaceInfo.first);
00219 OpenDDS::DCPS::TransportLocatorSeq* trans;
00220 ACE_NEW_NORETURN(trans, OpenDDS::DCPS::TransportLocatorSeq);
00221 transports.push_back(trans);
00222 in_cdr >> *trans;
00223
00224 DDS::PublisherQos* pub_qos = 0;
00225 DDS::DataWriterQos* writer_qos = 0;
00226 DDS::SubscriberQos* sub_qos = 0;
00227 DDS::DataReaderQos* reader_qos = 0;
00228
00229 if (actor.type == DataReader) {
00230 TAO_InputCDR sub_cdr(actor.pubsubQos.second.second
00231 , actor.pubsubQos.second.first);
00232 ACE_NEW_NORETURN(sub_qos, DDS::SubscriberQos);
00233 sub_qos_seq.push_back(sub_qos);
00234 sub_cdr >> *sub_qos;
00235
00236 TAO_InputCDR read_cdr(actor.drdwQos.second.second
00237 , actor.drdwQos.second.first);
00238 ACE_NEW_NORETURN(reader_qos, DDS::DataReaderQos);
00239 dr_qos_seq.push_back(reader_qos);
00240 read_cdr >> *reader_qos;
00241
00242 csi.filterClassName = actor.contentSubscriptionProfile.filterClassName.c_str();
00243 csi.filterExpr = actor.contentSubscriptionProfile.filterExpr.c_str();
00244 TAO_InputCDR csp_cdr(actor.contentSubscriptionProfile.exprParams.second,
00245 actor.contentSubscriptionProfile.exprParams.first);
00246 csp_cdr >> csi.exprParams;
00247
00248 URActor* reader;
00249 ACE_NEW_NORETURN(reader
00250 , URActor(actor.domainId, actor.actorId
00251 , actor.topicId, actor.participantId
00252 , actor.type, actor.callback.c_str()
00253 , *sub_qos, *reader_qos
00254 , *trans, csi));
00255 readers.push_back(reader);
00256 u_image.actors.push_back(reader);
00257
00258 } else if (actor.type == DataWriter) {
00259 TAO_InputCDR pub_cdr(actor.pubsubQos.second.second
00260 , actor.pubsubQos.second.first);
00261 ACE_NEW_NORETURN(pub_qos, DDS::PublisherQos);
00262 pub_qos_seq.push_back(pub_qos);
00263 pub_cdr >> *pub_qos;
00264
00265 TAO_InputCDR write_cdr(actor.drdwQos.second.second
00266 , actor.drdwQos.second.first);
00267 ACE_NEW_NORETURN(writer_qos, DDS::DataWriterQos);
00268 dw_qos_seq.push_back(writer_qos);
00269 write_cdr >> *writer_qos;
00270
00271 UWActor* writer;
00272 ACE_NEW_NORETURN(writer
00273 , UWActor(actor.domainId, actor.actorId
00274 , actor.topicId, actor.participantId
00275 , actor.type, actor.callback.c_str()
00276 , *pub_qos, *writer_qos
00277 , *trans, csi));
00278 writers.push_back(writer);
00279 u_image.wActors.push_back(writer);
00280
00281 } else {
00282 ACE_ERROR((LM_ERROR, "Update::Manager::pushImage> unknown "
00283 "actor type.\n"));
00284 }
00285 }
00286
00287 u_image.lastPartId = image.lastPartId;
00288
00289 info_->receive_image(u_image);
00290 }
00291
00292 void
00293 Manager::destroy(const IdPath& id, ItemType type, ActorType actor)
00294 {
00295
00296 for (Updaters::iterator iter = updaters_.begin();
00297 iter != updaters_.end();
00298 iter++) {
00299 (*iter)->destroy(id, type, actor);
00300 }
00301 }
00302
00303 void
00304 Manager::add(const DTopic& topic)
00305 {
00306 if (info_ == NULL) {
00307 return;
00308 }
00309
00310
00311 TAO_InputCDR in_cdr(topic.topicQos.second.second
00312 , topic.topicQos.second.first);
00313
00314 DDS::TopicQos qos;
00315 in_cdr >> qos;
00316
00317
00318 info_->add_topic(topic.topicId, topic.domainId
00319 , topic.participantId, topic.name.c_str()
00320 , topic.dataType.c_str(), qos);
00321 }
00322
00323 void
00324 Manager::add(const DParticipant& participant)
00325 {
00326 if (info_ == NULL) {
00327 return;
00328 }
00329
00330
00331 TAO_InputCDR in_cdr(participant.participantQos.second.second
00332 , participant.participantQos.second.first);
00333
00334 DDS::DomainParticipantQos qos;
00335 in_cdr >> qos;
00336
00337
00338 info_->add_domain_participant(participant.domainId
00339 , participant.participantId
00340 , qos);
00341 }
00342
00343 void
00344 Manager::add(const DActor& actor)
00345 {
00346 if (info_ == NULL) {
00347 return;
00348 }
00349
00350
00351 TAO_InputCDR pubSubCdr(actor.pubsubQos.second.second
00352 , actor.pubsubQos.second.first);
00353
00354 TAO_InputCDR drdwCdr(actor.drdwQos.second.second
00355 , actor.drdwQos.second.first);
00356
00357 std::string callback(actor.callback.c_str());
00358
00359 TAO_InputCDR transportCdr(actor.transportInterfaceInfo.second
00360 , actor.transportInterfaceInfo.first);
00361
00362 OpenDDS::DCPS::TransportLocatorSeq transport_info;
00363 transportCdr >> transport_info;
00364
00365 if (actor.type == DataReader) {
00366 DDS::SubscriberQos sub_qos;
00367 DDS::DataReaderQos reader_qos;
00368
00369 pubSubCdr >> sub_qos;
00370 drdwCdr >> reader_qos;
00371
00372 Update::ContentSubscriptionInfo csi;
00373 csi.filterClassName = actor.contentSubscriptionProfile.filterClassName.c_str();
00374 csi.filterExpr = actor.contentSubscriptionProfile.filterExpr.c_str();
00375 TAO_InputCDR cspCdr(actor.contentSubscriptionProfile.exprParams.second,
00376 actor.contentSubscriptionProfile.exprParams.first);
00377 cspCdr >> csi.exprParams;
00378
00379
00380 info_->add_subscription(actor.domainId, actor.participantId
00381 , actor.topicId, actor.actorId
00382 , callback.c_str(), reader_qos
00383 , transport_info, sub_qos
00384 , csi.filterClassName, csi.filterExpr, csi.exprParams);
00385
00386 } else if (actor.type == DataWriter) {
00387 DDS::PublisherQos pub_qos;
00388 DDS::DataWriterQos writer_qos;
00389
00390 pubSubCdr >> pub_qos;
00391 drdwCdr >> writer_qos;
00392
00393
00394 info_->add_publication(actor.domainId, actor.participantId
00395 , actor.topicId, actor.actorId
00396 , callback.c_str(), writer_qos
00397 , transport_info, pub_qos);
00398 }
00399 }
00400
00401 void Manager::updateLastPartId(PartIdType partId)
00402 {
00403 for (Updaters::iterator iter = updaters_.begin();
00404 iter != updaters_.end();
00405 iter++) {
00406 (*iter)->updateLastPartId(partId);
00407 }
00408 }
00409
00410 }
00411
00412
00413 int
00414 UpdateManagerSvc_Loader::init()
00415 {
00416 return ACE_Service_Config::process_directive
00417 (ace_svc_desc_UpdateManagerSvc);
00418 return 0;
00419 }
00420
00421 ACE_FACTORY_DEFINE(ACE_Local_Service, UpdateManagerSvc)
00422
00423 ACE_STATIC_SVC_DEFINE(UpdateManagerSvc,
00424 ACE_TEXT("UpdateManagerSvc"),
00425 ACE_SVC_OBJ_T,
00426 &ACE_SVC_NAME(UpdateManagerSvc),
00427 ACE_Service_Type::DELETE_THIS
00428 | ACE_Service_Type::DELETE_OBJ,
00429 0)
00430
00431 OPENDDS_END_VERSIONED_NAMESPACE_DECL