UpdateManager.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DcpsInfo_pch.h"
00009 
00010 #include "UpdateManager.h"
00011 #include "Updater.h"
00012 #include "ArrDelAdapter.h"
00013 #include "DCPSInfo_i.h"
00014 
00015 #include "tao/CDR.h"
00016 
00017 #include <vector>
00018 
00019 namespace Update {
00020 
00021 Manager::Manager()
00022   : info_(0)
00023 {
00024 }
00025 
00026 Manager::~Manager()
00027 {
00028 }
00029 
00030 void
00031 Manager::add(TAO_DDS_DCPSInfo_i* info)
00032 {
00033   info_ = info;
00034 }
00035 
00036 void
00037 Manager::add(Updater* updater)
00038 {
00039   // push new element to the back.
00040   updaters_.insert(updater);
00041 }
00042 
00043 void
00044 Manager::remove()
00045 {
00046   // Clean the refrence to the InfoRepo.
00047   info_ = 0;
00048 }
00049 
00050 void
00051 Manager::remove(const Updater* updater)
00052 {
00053   // check if the Updaters is part of the list.
00054   Updaters::iterator iter = updaters_.find(const_cast<Updater*>(updater));
00055 
00056   if (iter == updaters_.end()) {
00057     return;
00058   }
00059 
00060   // remove the element
00061   updaters_.erase(iter);
00062 }
00063 
00064 int
00065 Manager::init(int , ACE_TCHAR *[])
00066 {
00067   return 0;
00068 }
00069 
00070 int
00071 Manager::fini()
00072 {
00073   return 0;
00074 }
00075 
00076 void
00077 Manager::requestImage()
00078 {
00079   for (Updaters::iterator iter = updaters_.begin();
00080        iter != updaters_.end();
00081        iter++) {
00082     (*iter)->requestImage();
00083   }
00084 }
00085 
00086 template <typename T>
00087 class SeqGuard {
00088 public:
00089   typedef std::vector<T*> Seq;
00090 
00091 public:
00092   ~SeqGuard() {
00093     for (typename Seq::iterator iter = seq_.begin();
00094          iter != seq_.end();) {
00095       typename Seq::iterator current = iter;
00096       iter++;
00097 
00098       delete(*current);
00099     }
00100   };
00101 
00102   Seq& seq() {
00103     return seq_;
00104   };
00105 
00106 private:
00107   Seq seq_;
00108 };
00109 
00110 void
00111 Manager::pushImage(const DImage& image)
00112 {
00113   if (info_ == NULL) {
00114     return;
00115   }
00116 
00117   // image to be propagated.
00118   UImage u_image;
00119 
00120   /***************************
00121   // The downstream image needs to be converted to a
00122   // format compatible with the upstream layers (Dimage -> UImage)
00123   // The Uimage cotains a lot of refrences to the complex data
00124   // types. These are collecetd in several buckets (see below) and
00125   // passed by reference. Usage of a custom guard class 'SeqGuard'
00126   // automates memory cleanup.
00127   ***************************/
00128 
00129   // Participant buckets
00130   SeqGuard<DDS::DomainParticipantQos> part_qos_guard;
00131   SeqGuard<DDS::DomainParticipantQos>::Seq& part_qos = part_qos_guard.seq();
00132 
00133   SeqGuard<UParticipant> part_guard;
00134   SeqGuard<UParticipant>::Seq& parts = part_guard.seq();
00135 
00136   for (DImage::ParticipantSeq::const_iterator iter = image.participants.begin();
00137        iter != image.participants.end(); iter++) {
00138     const DParticipant& part = *iter;
00139 
00140     TAO_InputCDR in_cdr(part.participantQos.second.second
00141                         , part.participantQos.second.first);
00142 
00143     DDS::DomainParticipantQos* qos;
00144     ACE_NEW_NORETURN(qos, DDS::DomainParticipantQos);
00145     in_cdr >> *qos;
00146     part_qos.push_back(qos);
00147 
00148     UParticipant* u_part;
00149     ACE_NEW_NORETURN(u_part, UParticipant(part.domainId
00150                                           , part.owner
00151                                           , part.participantId
00152                                           , *qos));
00153     parts.push_back(u_part);
00154 
00155     // push newly created UParticipant into UImage Participant bucket
00156     u_image.participants.push_back(u_part);
00157   }
00158 
00159   // Topic buckets
00160   SeqGuard<DDS::TopicQos> topics_qos_guard;
00161   SeqGuard<DDS::TopicQos>::Seq& topics_qos = topics_qos_guard.seq();
00162 
00163   SeqGuard<UTopic> topics_guard;
00164   SeqGuard<UTopic>::Seq& topics = topics_guard.seq();
00165 
00166   for (DImage::TopicSeq::const_iterator iter = image.topics.begin();
00167        iter != image.topics.end(); iter++) {
00168     const DTopic& topic = *iter;
00169 
00170     TAO_InputCDR in_cdr(topic.topicQos.second.second
00171                         , topic.topicQos.second.first);
00172 
00173     DDS::TopicQos* qos;
00174     ACE_NEW_NORETURN(qos, DDS::TopicQos);
00175     in_cdr >> *qos;
00176     topics_qos.push_back(qos);
00177 
00178     UTopic* u_topic;
00179     ACE_NEW_NORETURN(u_topic
00180                      , UTopic(topic.domainId, topic.topicId
00181                               , topic.participantId
00182                               , topic.name.c_str()
00183                               , topic.dataType.c_str(), *qos));
00184     topics.push_back(u_topic);
00185 
00186     // Push newly created UTopic into UImage Topic bucket
00187     u_image.topics.push_back(u_topic);
00188   }
00189 
00190   // Actor buckets
00191   SeqGuard<DDS::PublisherQos> pub_qos_guard;
00192   SeqGuard<DDS::PublisherQos>::Seq& pub_qos_seq = pub_qos_guard.seq();
00193   SeqGuard<DDS::DataWriterQos> dw_qos_guard;
00194   SeqGuard<DDS::DataWriterQos>::Seq& dw_qos_seq = dw_qos_guard.seq();
00195 
00196   SeqGuard<DDS::SubscriberQos> sub_qos_guard;
00197   SeqGuard<DDS::SubscriberQos>::Seq& sub_qos_seq = sub_qos_guard.seq();
00198   SeqGuard<DDS::DataReaderQos> dr_qos_guard;
00199   SeqGuard<DDS::DataReaderQos>::Seq& dr_qos_seq = dr_qos_guard.seq();
00200 
00201   SeqGuard<OpenDDS::DCPS::TransportLocatorSeq> trans_guard;
00202   SeqGuard<OpenDDS::DCPS::TransportLocatorSeq>::Seq& transports = trans_guard.seq();
00203 
00204   SeqGuard<URActor> reader_guard;
00205   SeqGuard<URActor>::Seq& readers = reader_guard.seq();
00206   SeqGuard<UWActor> writer_guard;
00207   SeqGuard<UWActor>::Seq& writers = writer_guard.seq();
00208 
00209   for (DImage::ReaderSeq::const_iterator iter = image.actors.begin();
00210        iter != image.actors.end(); iter++) {
00211     const DActor& actor = *iter;
00212 
00213     TAO_InputCDR in_cdr(actor.transportInterfaceInfo.second
00214                         , actor.transportInterfaceInfo.first);
00215     OpenDDS::DCPS::TransportLocatorSeq* trans;
00216     ACE_NEW_NORETURN(trans, OpenDDS::DCPS::TransportLocatorSeq);
00217     transports.push_back(trans);
00218     in_cdr >> *trans;
00219 
00220     DDS::PublisherQos* pub_qos = 0;
00221     DDS::DataWriterQos* writer_qos = 0;
00222     DDS::SubscriberQos* sub_qos = 0;
00223     DDS::DataReaderQos* reader_qos = 0;
00224 
00225     if (actor.type == DataReader) {
00226       TAO_InputCDR sub_cdr(actor.pubsubQos.second.second
00227                            , actor.pubsubQos.second.first);
00228       ACE_NEW_NORETURN(sub_qos, DDS::SubscriberQos);
00229       sub_qos_seq.push_back(sub_qos);
00230       sub_cdr >> *sub_qos;
00231 
00232       TAO_InputCDR read_cdr(actor.drdwQos.second.second
00233                             , actor.drdwQos.second.first);
00234       ACE_NEW_NORETURN(reader_qos, DDS::DataReaderQos);
00235       dr_qos_seq.push_back(reader_qos);
00236       read_cdr >> *reader_qos;
00237 
00238       ContentSubscriptionInfo* csi = 0;
00239       ACE_NEW_NORETURN(csi, ContentSubscriptionInfo);
00240       csi->filterClassName = actor.contentSubscriptionProfile.filterClassName.c_str();
00241       csi->filterExpr = actor.contentSubscriptionProfile.filterExpr.c_str();
00242       TAO_InputCDR csp_cdr(actor.contentSubscriptionProfile.exprParams.second,
00243                            actor.contentSubscriptionProfile.exprParams.first);
00244       csp_cdr >> csi->exprParams;
00245 
00246       URActor* reader;
00247       ACE_NEW_NORETURN(reader
00248                        , URActor(actor.domainId, actor.actorId
00249                                  , actor.topicId, actor.participantId
00250                                  , actor.type, actor.callback.c_str()
00251                                  , *sub_qos, *reader_qos
00252                                  , *trans, *csi));
00253       readers.push_back(reader);
00254       u_image.actors.push_back(reader);
00255 
00256     } else if (actor.type == DataWriter) {
00257       TAO_InputCDR pub_cdr(actor.pubsubQos.second.second
00258                            , actor.pubsubQos.second.first);
00259       ACE_NEW_NORETURN(pub_qos, DDS::PublisherQos);
00260       pub_qos_seq.push_back(pub_qos);
00261       pub_cdr >> *pub_qos;
00262 
00263       TAO_InputCDR write_cdr(actor.drdwQos.second.second
00264                              , actor.drdwQos.second.first);
00265       ACE_NEW_NORETURN(writer_qos, DDS::DataWriterQos);
00266       dw_qos_seq.push_back(writer_qos);
00267       write_cdr >> *writer_qos;
00268 
00269       ContentSubscriptionInfo csi; //writers have no info
00270 
00271       UWActor* writer;
00272       ACE_NEW_NORETURN(writer
00273                        , UWActor(actor.domainId, actor.actorId
00274                                  , actor.topicId, actor.participantId
00275                                  , actor.type, actor.callback.c_str()
00276                                  , *pub_qos, *writer_qos
00277                                  , *trans, csi));
00278       writers.push_back(writer);
00279       u_image.wActors.push_back(writer);
00280 
00281     } else {
00282       ACE_ERROR((LM_ERROR, "Update::Manager::pushImage> unknown "
00283                  "actor type.\n"));
00284     }
00285   }
00286 
00287   info_->receive_image(u_image);
00288 }
00289 
00290 void
00291 Manager::destroy(const IdPath& id, ItemType type, ActorType actor)
00292 {
00293   // Invoke remove on each of the iterators.
00294   for (Updaters::iterator iter = updaters_.begin();
00295        iter != updaters_.end();
00296        iter++) {
00297     (*iter)->destroy(id, type, actor);
00298   }
00299 }
00300 
00301 void
00302 Manager::add(const DTopic& topic)
00303 {
00304   if (info_ == NULL) {
00305     return;
00306   }
00307 
00308   // Demarshal QOS data
00309   TAO_InputCDR in_cdr(topic.topicQos.second.second
00310                       , topic.topicQos.second.first);
00311 
00312   DDS::TopicQos qos;
00313   in_cdr >> qos;
00314 
00315   // Pass topic info to infoRepo.
00316   info_->add_topic(topic.topicId, topic.domainId
00317                    , topic.participantId, topic.name.c_str()
00318                    , topic.dataType.c_str(), qos);
00319 }
00320 
00321 void
00322 Manager::add(const DParticipant& participant)
00323 {
00324   if (info_ == NULL) {
00325     return;
00326   }
00327 
00328   // Demarshal QOS data
00329   TAO_InputCDR in_cdr(participant.participantQos.second.second
00330                       , participant.participantQos.second.first);
00331 
00332   DDS::DomainParticipantQos qos;
00333   in_cdr >> qos;
00334 
00335   // Pass participant info to infoRepo.
00336   info_->add_domain_participant(participant.domainId
00337                                 , participant.participantId
00338                                 , qos);
00339 }
00340 
00341 void
00342 Manager::add(const DActor& actor)
00343 {
00344   if (info_ == NULL) {
00345     return;
00346   }
00347 
00348   // Demarshal QOS data
00349   TAO_InputCDR pubSubCdr(actor.pubsubQos.second.second
00350                          , actor.pubsubQos.second.first);
00351 
00352   TAO_InputCDR drdwCdr(actor.drdwQos.second.second
00353                        , actor.drdwQos.second.first);
00354 
00355   std::string callback(actor.callback.c_str());
00356 
00357   TAO_InputCDR transportCdr(actor.transportInterfaceInfo.second
00358                             , actor.transportInterfaceInfo.first);
00359 
00360   OpenDDS::DCPS::TransportLocatorSeq transport_info;
00361   transportCdr >> transport_info;
00362 
00363   if (actor.type == DataReader) {
00364     DDS::SubscriberQos sub_qos;
00365     DDS::DataReaderQos reader_qos;
00366 
00367     pubSubCdr >> sub_qos;
00368     drdwCdr >> reader_qos;
00369 
00370     Update::ContentSubscriptionInfo csi;
00371     csi.filterClassName = actor.contentSubscriptionProfile.filterClassName.c_str();
00372     csi.filterExpr = actor.contentSubscriptionProfile.filterExpr.c_str();
00373     TAO_InputCDR cspCdr(actor.contentSubscriptionProfile.exprParams.second,
00374                         actor.contentSubscriptionProfile.exprParams.first);
00375     cspCdr >> csi.exprParams;
00376 
00377     // Pass actor to InfoRepo.
00378     info_->add_subscription(actor.domainId, actor.participantId
00379                             , actor.topicId, actor.actorId
00380                             , callback.c_str(), reader_qos
00381                             , transport_info, sub_qos
00382                             , csi.filterClassName, csi.filterExpr, csi.exprParams);
00383 
00384   } else if (actor.type == DataWriter) {
00385     DDS::PublisherQos pub_qos;
00386     DDS::DataWriterQos writer_qos;
00387 
00388     pubSubCdr >> pub_qos;
00389     drdwCdr >> writer_qos;
00390 
00391     // Pass actor info to infoRepo.
00392     info_->add_publication(actor.domainId, actor.participantId
00393                            , actor.topicId, actor.actorId
00394                            , callback.c_str(), writer_qos
00395                            , transport_info, pub_qos);
00396   }
00397 }
00398 
00399 } // namespace Update
00400 
00401 int
00402 UpdateManagerSvc_Loader::init()
00403 {
00404   return ACE_Service_Config::process_directive
00405          (ace_svc_desc_UpdateManagerSvc);
00406   return 0;
00407 }
00408 
00409 ACE_FACTORY_DEFINE(ACE_Local_Service, UpdateManagerSvc)
00410 
00411 ACE_STATIC_SVC_DEFINE(UpdateManagerSvc,
00412                       ACE_TEXT("UpdateManagerSvc"),
00413                       ACE_SVC_OBJ_T,
00414                       &ACE_SVC_NAME(UpdateManagerSvc),
00415                       ACE_Service_Type::DELETE_THIS
00416                       | ACE_Service_Type::DELETE_OBJ,
00417                       0)
00418 
00419 //ACE_STATIC_SVC_REQUIRE (Update::UpdateManager)

Generated on Fri Feb 12 20:05:29 2016 for OpenDDS by  doxygen 1.4.7