UpdateManager.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DcpsInfo_pch.h"
00009 
00010 #include "UpdateManager.h"
00011 #include "Updater.h"
00012 #include "ArrDelAdapter.h"
00013 #include "DCPSInfo_i.h"
00014 
00015 #include "tao/CDR.h"
00016 
00017 #include <vector>
00018 
00019 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00020 
00021 namespace Update {
00022 
00023 Manager::Manager()
00024   : info_(0)
00025 {
00026 }
00027 
00028 Manager::~Manager()
00029 {
00030 }
00031 
00032 void
00033 Manager::add(TAO_DDS_DCPSInfo_i* info)
00034 {
00035   info_ = info;
00036 }
00037 
00038 void
00039 Manager::add(Updater* updater)
00040 {
00041   // push new element to the back.
00042   updaters_.insert(updater);
00043 }
00044 
00045 void
00046 Manager::remove()
00047 {
00048   // Clean the refrence to the InfoRepo.
00049   info_ = 0;
00050 }
00051 
00052 void
00053 Manager::remove(const Updater* updater)
00054 {
00055   // check if the Updaters is part of the list.
00056   Updaters::iterator iter = updaters_.find(const_cast<Updater*>(updater));
00057 
00058   if (iter == updaters_.end()) {
00059     return;
00060   }
00061 
00062   // remove the element
00063   updaters_.erase(iter);
00064 }
00065 
00066 int
00067 Manager::init(int , ACE_TCHAR *[])
00068 {
00069   return 0;
00070 }
00071 
00072 int
00073 Manager::fini()
00074 {
00075   return 0;
00076 }
00077 
00078 void
00079 Manager::requestImage()
00080 {
00081   for (Updaters::iterator iter = updaters_.begin();
00082        iter != updaters_.end();
00083        iter++) {
00084     (*iter)->requestImage();
00085   }
00086 }
00087 
00088 template <typename T>
00089 class SeqGuard {
00090 public:
00091   typedef std::vector<T*> Seq;
00092 
00093 public:
00094   ~SeqGuard() {
00095     for (typename Seq::iterator iter = seq_.begin();
00096          iter != seq_.end();) {
00097       typename Seq::iterator current = iter;
00098       iter++;
00099 
00100       delete(*current);
00101     }
00102   };
00103 
00104   Seq& seq() {
00105     return seq_;
00106   };
00107 
00108 private:
00109   Seq seq_;
00110 };
00111 
00112 void
00113 Manager::pushImage(const DImage& image)
00114 {
00115   if (info_ == NULL) {
00116     return;
00117   }
00118 
00119   // image to be propagated.
00120   UImage u_image;
00121 
00122   /***************************
00123   // The downstream image needs to be converted to a
00124   // format compatible with the upstream layers (Dimage -> UImage)
00125   // The Uimage cotains a lot of refrences to the complex data
00126   // types. These are collecetd in several buckets (see below) and
00127   // passed by reference. Usage of a custom guard class 'SeqGuard'
00128   // automates memory cleanup.
00129   ***************************/
00130 
00131   // Participant buckets
00132   SeqGuard<DDS::DomainParticipantQos> part_qos_guard;
00133   SeqGuard<DDS::DomainParticipantQos>::Seq& part_qos = part_qos_guard.seq();
00134 
00135   SeqGuard<UParticipant> part_guard;
00136   SeqGuard<UParticipant>::Seq& parts = part_guard.seq();
00137 
00138   for (DImage::ParticipantSeq::const_iterator iter = image.participants.begin();
00139        iter != image.participants.end(); iter++) {
00140     const DParticipant& part = *iter;
00141 
00142     TAO_InputCDR in_cdr(part.participantQos.second.second
00143                         , part.participantQos.second.first);
00144 
00145     DDS::DomainParticipantQos* qos;
00146     ACE_NEW_NORETURN(qos, DDS::DomainParticipantQos);
00147     in_cdr >> *qos;
00148     part_qos.push_back(qos);
00149 
00150     UParticipant* u_part;
00151     ACE_NEW_NORETURN(u_part, UParticipant(part.domainId
00152                                           , part.owner
00153                                           , part.participantId
00154                                           , *qos));
00155     parts.push_back(u_part);
00156 
00157     // push newly created UParticipant into UImage Participant bucket
00158     u_image.participants.push_back(u_part);
00159   }
00160 
00161   // Topic buckets
00162   SeqGuard<DDS::TopicQos> topics_qos_guard;
00163   SeqGuard<DDS::TopicQos>::Seq& topics_qos = topics_qos_guard.seq();
00164 
00165   SeqGuard<UTopic> topics_guard;
00166   SeqGuard<UTopic>::Seq& topics = topics_guard.seq();
00167 
00168   for (DImage::TopicSeq::const_iterator iter = image.topics.begin();
00169        iter != image.topics.end(); iter++) {
00170     const DTopic& topic = *iter;
00171 
00172     TAO_InputCDR in_cdr(topic.topicQos.second.second
00173                         , topic.topicQos.second.first);
00174 
00175     DDS::TopicQos* qos;
00176     ACE_NEW_NORETURN(qos, DDS::TopicQos);
00177     in_cdr >> *qos;
00178     topics_qos.push_back(qos);
00179 
00180     UTopic* u_topic;
00181     ACE_NEW_NORETURN(u_topic
00182                      , UTopic(topic.domainId, topic.topicId
00183                               , topic.participantId
00184                               , topic.name.c_str()
00185                               , topic.dataType.c_str(), *qos));
00186     topics.push_back(u_topic);
00187 
00188     // Push newly created UTopic into UImage Topic bucket
00189     u_image.topics.push_back(u_topic);
00190   }
00191 
00192   // Actor buckets
00193   SeqGuard<DDS::PublisherQos> pub_qos_guard;
00194   SeqGuard<DDS::PublisherQos>::Seq& pub_qos_seq = pub_qos_guard.seq();
00195   SeqGuard<DDS::DataWriterQos> dw_qos_guard;
00196   SeqGuard<DDS::DataWriterQos>::Seq& dw_qos_seq = dw_qos_guard.seq();
00197 
00198   SeqGuard<DDS::SubscriberQos> sub_qos_guard;
00199   SeqGuard<DDS::SubscriberQos>::Seq& sub_qos_seq = sub_qos_guard.seq();
00200   SeqGuard<DDS::DataReaderQos> dr_qos_guard;
00201   SeqGuard<DDS::DataReaderQos>::Seq& dr_qos_seq = dr_qos_guard.seq();
00202 
00203   SeqGuard<OpenDDS::DCPS::TransportLocatorSeq> trans_guard;
00204   SeqGuard<OpenDDS::DCPS::TransportLocatorSeq>::Seq& transports = trans_guard.seq();
00205 
00206   SeqGuard<URActor> reader_guard;
00207   SeqGuard<URActor>::Seq& readers = reader_guard.seq();
00208   SeqGuard<UWActor> writer_guard;
00209   SeqGuard<UWActor>::Seq& writers = writer_guard.seq();
00210 
00211   ContentSubscriptionInfo csi;
00212 
00213   for (DImage::ReaderSeq::const_iterator iter = image.actors.begin();
00214        iter != image.actors.end(); iter++) {
00215     const DActor& actor = *iter;
00216 
00217     TAO_InputCDR in_cdr(actor.transportInterfaceInfo.second
00218                         , actor.transportInterfaceInfo.first);
00219     OpenDDS::DCPS::TransportLocatorSeq* trans;
00220     ACE_NEW_NORETURN(trans, OpenDDS::DCPS::TransportLocatorSeq);
00221     transports.push_back(trans);
00222     in_cdr >> *trans;
00223 
00224     DDS::PublisherQos* pub_qos = 0;
00225     DDS::DataWriterQos* writer_qos = 0;
00226     DDS::SubscriberQos* sub_qos = 0;
00227     DDS::DataReaderQos* reader_qos = 0;
00228 
00229     if (actor.type == DataReader) {
00230       TAO_InputCDR sub_cdr(actor.pubsubQos.second.second
00231                            , actor.pubsubQos.second.first);
00232       ACE_NEW_NORETURN(sub_qos, DDS::SubscriberQos);
00233       sub_qos_seq.push_back(sub_qos);
00234       sub_cdr >> *sub_qos;
00235 
00236       TAO_InputCDR read_cdr(actor.drdwQos.second.second
00237                             , actor.drdwQos.second.first);
00238       ACE_NEW_NORETURN(reader_qos, DDS::DataReaderQos);
00239       dr_qos_seq.push_back(reader_qos);
00240       read_cdr >> *reader_qos;
00241 
00242       csi.filterClassName = actor.contentSubscriptionProfile.filterClassName.c_str();
00243       csi.filterExpr = actor.contentSubscriptionProfile.filterExpr.c_str();
00244       TAO_InputCDR csp_cdr(actor.contentSubscriptionProfile.exprParams.second,
00245                            actor.contentSubscriptionProfile.exprParams.first);
00246       csp_cdr >> csi.exprParams;
00247 
00248       URActor* reader;
00249       ACE_NEW_NORETURN(reader
00250                        , URActor(actor.domainId, actor.actorId
00251                                  , actor.topicId, actor.participantId
00252                                  , actor.type, actor.callback.c_str()
00253                                  , *sub_qos, *reader_qos
00254                                  , *trans, csi));
00255       readers.push_back(reader);
00256       u_image.actors.push_back(reader);
00257 
00258     } else if (actor.type == DataWriter) {
00259       TAO_InputCDR pub_cdr(actor.pubsubQos.second.second
00260                            , actor.pubsubQos.second.first);
00261       ACE_NEW_NORETURN(pub_qos, DDS::PublisherQos);
00262       pub_qos_seq.push_back(pub_qos);
00263       pub_cdr >> *pub_qos;
00264 
00265       TAO_InputCDR write_cdr(actor.drdwQos.second.second
00266                              , actor.drdwQos.second.first);
00267       ACE_NEW_NORETURN(writer_qos, DDS::DataWriterQos);
00268       dw_qos_seq.push_back(writer_qos);
00269       write_cdr >> *writer_qos;
00270 
00271       UWActor* writer;
00272       ACE_NEW_NORETURN(writer
00273                        , UWActor(actor.domainId, actor.actorId
00274                                  , actor.topicId, actor.participantId
00275                                  , actor.type, actor.callback.c_str()
00276                                  , *pub_qos, *writer_qos
00277                                  , *trans, csi));
00278       writers.push_back(writer);
00279       u_image.wActors.push_back(writer);
00280 
00281     } else {
00282       ACE_ERROR((LM_ERROR, "Update::Manager::pushImage> unknown "
00283                  "actor type.\n"));
00284     }
00285   }
00286 
00287   u_image.lastPartId = image.lastPartId;
00288 
00289   info_->receive_image(u_image);
00290 }
00291 
00292 void
00293 Manager::destroy(const IdPath& id, ItemType type, ActorType actor)
00294 {
00295   // Invoke remove on each of the iterators.
00296   for (Updaters::iterator iter = updaters_.begin();
00297        iter != updaters_.end();
00298        iter++) {
00299     (*iter)->destroy(id, type, actor);
00300   }
00301 }
00302 
00303 void
00304 Manager::add(const DTopic& topic)
00305 {
00306   if (info_ == NULL) {
00307     return;
00308   }
00309 
00310   // Demarshal QOS data
00311   TAO_InputCDR in_cdr(topic.topicQos.second.second
00312                       , topic.topicQos.second.first);
00313 
00314   DDS::TopicQos qos;
00315   in_cdr >> qos;
00316 
00317   // Pass topic info to infoRepo.
00318   info_->add_topic(topic.topicId, topic.domainId
00319                    , topic.participantId, topic.name.c_str()
00320                    , topic.dataType.c_str(), qos);
00321 }
00322 
00323 void
00324 Manager::add(const DParticipant& participant)
00325 {
00326   if (info_ == NULL) {
00327     return;
00328   }
00329 
00330   // Demarshal QOS data
00331   TAO_InputCDR in_cdr(participant.participantQos.second.second
00332                       , participant.participantQos.second.first);
00333 
00334   DDS::DomainParticipantQos qos;
00335   in_cdr >> qos;
00336 
00337   // Pass participant info to infoRepo.
00338   info_->add_domain_participant(participant.domainId
00339                                 , participant.participantId
00340                                 , qos);
00341 }
00342 
00343 void
00344 Manager::add(const DActor& actor)
00345 {
00346   if (info_ == NULL) {
00347     return;
00348   }
00349 
00350   // Demarshal QOS data
00351   TAO_InputCDR pubSubCdr(actor.pubsubQos.second.second
00352                          , actor.pubsubQos.second.first);
00353 
00354   TAO_InputCDR drdwCdr(actor.drdwQos.second.second
00355                        , actor.drdwQos.second.first);
00356 
00357   std::string callback(actor.callback.c_str());
00358 
00359   TAO_InputCDR transportCdr(actor.transportInterfaceInfo.second
00360                             , actor.transportInterfaceInfo.first);
00361 
00362   OpenDDS::DCPS::TransportLocatorSeq transport_info;
00363   transportCdr >> transport_info;
00364 
00365   if (actor.type == DataReader) {
00366     DDS::SubscriberQos sub_qos;
00367     DDS::DataReaderQos reader_qos;
00368 
00369     pubSubCdr >> sub_qos;
00370     drdwCdr >> reader_qos;
00371 
00372     Update::ContentSubscriptionInfo csi;
00373     csi.filterClassName = actor.contentSubscriptionProfile.filterClassName.c_str();
00374     csi.filterExpr = actor.contentSubscriptionProfile.filterExpr.c_str();
00375     TAO_InputCDR cspCdr(actor.contentSubscriptionProfile.exprParams.second,
00376                         actor.contentSubscriptionProfile.exprParams.first);
00377     cspCdr >> csi.exprParams;
00378 
00379     // Pass actor to InfoRepo.
00380     info_->add_subscription(actor.domainId, actor.participantId
00381                             , actor.topicId, actor.actorId
00382                             , callback.c_str(), reader_qos
00383                             , transport_info, sub_qos
00384                             , csi.filterClassName, csi.filterExpr, csi.exprParams);
00385 
00386   } else if (actor.type == DataWriter) {
00387     DDS::PublisherQos pub_qos;
00388     DDS::DataWriterQos writer_qos;
00389 
00390     pubSubCdr >> pub_qos;
00391     drdwCdr >> writer_qos;
00392 
00393     // Pass actor info to infoRepo.
00394     info_->add_publication(actor.domainId, actor.participantId
00395                            , actor.topicId, actor.actorId
00396                            , callback.c_str(), writer_qos
00397                            , transport_info, pub_qos);
00398   }
00399 }
00400 
00401 void Manager::updateLastPartId(PartIdType partId)
00402 {
00403   for (Updaters::iterator iter = updaters_.begin();
00404        iter != updaters_.end();
00405        iter++) {
00406     (*iter)->updateLastPartId(partId);
00407   }
00408 }
00409 
00410 } // namespace Update
00411 
00412 
00413 int
00414 UpdateManagerSvc_Loader::init()
00415 {
00416   return ACE_Service_Config::process_directive
00417          (ace_svc_desc_UpdateManagerSvc);
00418   return 0;
00419 }
00420 
00421 ACE_FACTORY_DEFINE(ACE_Local_Service, UpdateManagerSvc)
00422 
00423 ACE_STATIC_SVC_DEFINE(UpdateManagerSvc,
00424                       ACE_TEXT("UpdateManagerSvc"),
00425                       ACE_SVC_OBJ_T,
00426                       &ACE_SVC_NAME(UpdateManagerSvc),
00427                       ACE_Service_Type::DELETE_THIS
00428                       | ACE_Service_Type::DELETE_OBJ,
00429                       0)
00430 
00431 OPENDDS_END_VERSIONED_NAMESPACE_DECL
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1