#include <UpdateManager.h>
Collaboration diagram for Update::Manager:
Public Member Functions | |
Manager () | |
virtual | ~Manager () |
virtual int | init (int argc, ACE_TCHAR *argv[]) |
Shared object initializer. | |
virtual int | fini () |
Shared object finalizer. | |
void | add (TAO_DDS_DCPSInfo_i *info) |
void | add (Updater *updater) |
void | remove () |
void | remove (const Updater *updater) |
void | requestImage () |
Force a clean shutdown. | |
void | pushImage (const DImage &image) |
Downstream request to push image. | |
template<class UType> | |
void | create (const UType &info) |
template<class QosType> | |
void | update (const IdPath &id, const QosType &qos) |
void | destroy (const IdPath &id, ItemType type, ActorType actor=DataWriter) |
void | add (const DTopic &topic) |
void | add (const DParticipant &participant) |
void | add (const DActor &actor) |
Private Types | |
typedef std::set< Updater * > | Updaters |
Private Attributes | |
TAO_DDS_DCPSInfo_i * | info_ |
Updaters | updaters_ |
Definition at line 30 of file UpdateManager.h.
typedef std::set<Updater*> Update::Manager::Updaters [private] |
Definition at line 78 of file UpdateManager.h.
Update::Manager::Manager | ( | ) |
Update::Manager::~Manager | ( | ) | [virtual] |
void Update::Manager::add | ( | const DActor & | actor | ) |
Definition at line 342 of file UpdateManager.cpp.
References Update::ActorStrt< PSQ, RWQ, C, T, CSP >::actorId, TAO_DDS_DCPSInfo_i::add_publication(), TAO_DDS_DCPSInfo_i::add_subscription(), Update::ActorStrt< PSQ, RWQ, C, T, CSP >::callback, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::contentSubscriptionProfile, Update::DataReader, Update::DataWriter, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::domainId, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::drdwQos, Update::ContentSubscriptionInfo::exprParams, Update::ContentSubscriptionInfo::filterClassName, Update::ContentSubscriptionInfo::filterExpr, info_, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::participantId, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::pubsubQos, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::topicId, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::transportInterfaceInfo, and Update::ActorStrt< PSQ, RWQ, C, T, CSP >::type.
00343 { 00344 if (info_ == NULL) { 00345 return; 00346 } 00347 00348 // Demarshal QOS data 00349 TAO_InputCDR pubSubCdr(actor.pubsubQos.second.second 00350 , actor.pubsubQos.second.first); 00351 00352 TAO_InputCDR drdwCdr(actor.drdwQos.second.second 00353 , actor.drdwQos.second.first); 00354 00355 std::string callback(actor.callback.c_str()); 00356 00357 TAO_InputCDR transportCdr(actor.transportInterfaceInfo.second 00358 , actor.transportInterfaceInfo.first); 00359 00360 OpenDDS::DCPS::TransportLocatorSeq transport_info; 00361 transportCdr >> transport_info; 00362 00363 if (actor.type == DataReader) { 00364 DDS::SubscriberQos sub_qos; 00365 DDS::DataReaderQos reader_qos; 00366 00367 pubSubCdr >> sub_qos; 00368 drdwCdr >> reader_qos; 00369 00370 Update::ContentSubscriptionInfo csi; 00371 csi.filterClassName = actor.contentSubscriptionProfile.filterClassName.c_str(); 00372 csi.filterExpr = actor.contentSubscriptionProfile.filterExpr.c_str(); 00373 TAO_InputCDR cspCdr(actor.contentSubscriptionProfile.exprParams.second, 00374 actor.contentSubscriptionProfile.exprParams.first); 00375 cspCdr >> csi.exprParams; 00376 00377 // Pass actor to InfoRepo. 00378 info_->add_subscription(actor.domainId, actor.participantId 00379 , actor.topicId, actor.actorId 00380 , callback.c_str(), reader_qos 00381 , transport_info, sub_qos 00382 , csi.filterClassName, csi.filterExpr, csi.exprParams); 00383 00384 } else if (actor.type == DataWriter) { 00385 DDS::PublisherQos pub_qos; 00386 DDS::DataWriterQos writer_qos; 00387 00388 pubSubCdr >> pub_qos; 00389 drdwCdr >> writer_qos; 00390 00391 // Pass actor info to infoRepo. 00392 info_->add_publication(actor.domainId, actor.participantId 00393 , actor.topicId, actor.actorId 00394 , callback.c_str(), writer_qos 00395 , transport_info, pub_qos); 00396 } 00397 }
void Update::Manager::add | ( | const DParticipant & | participant | ) |
Definition at line 322 of file UpdateManager.cpp.
References TAO_DDS_DCPSInfo_i::add_domain_participant(), Update::ParticipantStrt< QosSeq >::domainId, info_, Update::ParticipantStrt< QosSeq >::participantId, and Update::ParticipantStrt< QosSeq >::participantQos.
00323 { 00324 if (info_ == NULL) { 00325 return; 00326 } 00327 00328 // Demarshal QOS data 00329 TAO_InputCDR in_cdr(participant.participantQos.second.second 00330 , participant.participantQos.second.first); 00331 00332 DDS::DomainParticipantQos qos; 00333 in_cdr >> qos; 00334 00335 // Pass participant info to infoRepo. 00336 info_->add_domain_participant(participant.domainId 00337 , participant.participantId 00338 , qos); 00339 }
void Update::Manager::add | ( | const DTopic & | topic | ) |
Definition at line 302 of file UpdateManager.cpp.
References TAO_DDS_DCPSInfo_i::add_topic(), Update::TopicStrt< Q, S >::dataType, Update::TopicStrt< Q, S >::domainId, info_, Update::TopicStrt< Q, S >::name, Update::TopicStrt< Q, S >::participantId, Update::TopicStrt< Q, S >::topicId, and Update::TopicStrt< Q, S >::topicQos.
00303 { 00304 if (info_ == NULL) { 00305 return; 00306 } 00307 00308 // Demarshal QOS data 00309 TAO_InputCDR in_cdr(topic.topicQos.second.second 00310 , topic.topicQos.second.first); 00311 00312 DDS::TopicQos qos; 00313 in_cdr >> qos; 00314 00315 // Pass topic info to infoRepo. 00316 info_->add_topic(topic.topicId, topic.domainId 00317 , topic.participantId, topic.name.c_str() 00318 , topic.dataType.c_str(), qos); 00319 }
void Update::Manager::add | ( | Updater * | updater | ) |
Definition at line 37 of file UpdateManager.cpp.
References updaters_.
00038 { 00039 // push new element to the back. 00040 updaters_.insert(updater); 00041 }
void Update::Manager::add | ( | TAO_DDS_DCPSInfo_i * | info | ) |
Definition at line 31 of file UpdateManager.cpp.
References info_.
Referenced by TAO_DDS_DCPSInfo_i::add(), Update::PersistenceUpdater::init(), and TAO_DDS_DCPSInfo_i::init_persistence().
00032 { 00033 info_ = info; 00034 }
void Update::Manager::create | ( | const UType & | info | ) |
Definition at line 12 of file UpdateManager_T.cpp.
References updaters_.
Referenced by TAO_DDS_DCPSInfo_i::add_domain_participant(), TAO_DDS_DCPSInfo_i::add_publication(), TAO_DDS_DCPSInfo_i::add_subscription(), TAO_DDS_DCPSInfo_i::assert_topic(), and DCPS_IR_Participant::takeOwnership().
00013 { 00014 // Invoke add on each of the iterators. 00015 for (Updaters::iterator iter = updaters_.begin(); 00016 iter != updaters_.end(); 00017 iter++) { 00018 (*iter)->create(info); 00019 } 00020 }
Definition at line 291 of file UpdateManager.cpp.
References updaters_.
Referenced by DCPS_IR_Participant::remove_all_dependents(), TAO_DDS_DCPSInfo_i::remove_domain_participant(), TAO_DDS_DCPSInfo_i::remove_publication(), TAO_DDS_DCPSInfo_i::remove_subscription(), and TAO_DDS_DCPSInfo_i::remove_topic().
00292 { 00293 // Invoke remove on each of the iterators. 00294 for (Updaters::iterator iter = updaters_.begin(); 00295 iter != updaters_.end(); 00296 iter++) { 00297 (*iter)->destroy(id, type, actor); 00298 } 00299 }
int Update::Manager::fini | ( | ) | [virtual] |
int Update::Manager::init | ( | int | argc, | |
ACE_TCHAR * | argv[] | |||
) | [virtual] |
void Update::Manager::pushImage | ( | const DImage & | image | ) |
Downstream request to push image.
Definition at line 111 of file UpdateManager.cpp.
References Update::ImageData< T, P, A, W >::actors, Update::DataReader, Update::DataWriter, Update::ContentSubscriptionInfo::exprParams, Update::ContentSubscriptionInfo::filterClassName, Update::ContentSubscriptionInfo::filterExpr, info_, Update::ImageData< T, P, A, W >::participants, TAO_DDS_DCPSInfo_i::receive_image(), Update::SeqGuard< T >::seq(), Update::ImageData< T, P, A, W >::topics, and Update::ImageData< T, P, A, W >::wActors.
Referenced by Update::PersistenceUpdater::requestImage().
00112 { 00113 if (info_ == NULL) { 00114 return; 00115 } 00116 00117 // image to be propagated. 00118 UImage u_image; 00119 00120 /*************************** 00121 // The downstream image needs to be converted to a 00122 // format compatible with the upstream layers (Dimage -> UImage) 00123 // The Uimage cotains a lot of refrences to the complex data 00124 // types. These are collecetd in several buckets (see below) and 00125 // passed by reference. Usage of a custom guard class 'SeqGuard' 00126 // automates memory cleanup. 00127 ***************************/ 00128 00129 // Participant buckets 00130 SeqGuard<DDS::DomainParticipantQos> part_qos_guard; 00131 SeqGuard<DDS::DomainParticipantQos>::Seq& part_qos = part_qos_guard.seq(); 00132 00133 SeqGuard<UParticipant> part_guard; 00134 SeqGuard<UParticipant>::Seq& parts = part_guard.seq(); 00135 00136 for (DImage::ParticipantSeq::const_iterator iter = image.participants.begin(); 00137 iter != image.participants.end(); iter++) { 00138 const DParticipant& part = *iter; 00139 00140 TAO_InputCDR in_cdr(part.participantQos.second.second 00141 , part.participantQos.second.first); 00142 00143 DDS::DomainParticipantQos* qos; 00144 ACE_NEW_NORETURN(qos, DDS::DomainParticipantQos); 00145 in_cdr >> *qos; 00146 part_qos.push_back(qos); 00147 00148 UParticipant* u_part; 00149 ACE_NEW_NORETURN(u_part, UParticipant(part.domainId 00150 , part.owner 00151 , part.participantId 00152 , *qos)); 00153 parts.push_back(u_part); 00154 00155 // push newly created UParticipant into UImage Participant bucket 00156 u_image.participants.push_back(u_part); 00157 } 00158 00159 // Topic buckets 00160 SeqGuard<DDS::TopicQos> topics_qos_guard; 00161 SeqGuard<DDS::TopicQos>::Seq& topics_qos = topics_qos_guard.seq(); 00162 00163 SeqGuard<UTopic> topics_guard; 00164 SeqGuard<UTopic>::Seq& topics = topics_guard.seq(); 00165 00166 for (DImage::TopicSeq::const_iterator iter = image.topics.begin(); 00167 iter != image.topics.end(); iter++) { 00168 const DTopic& topic = *iter; 00169 00170 TAO_InputCDR in_cdr(topic.topicQos.second.second 00171 , topic.topicQos.second.first); 00172 00173 DDS::TopicQos* qos; 00174 ACE_NEW_NORETURN(qos, DDS::TopicQos); 00175 in_cdr >> *qos; 00176 topics_qos.push_back(qos); 00177 00178 UTopic* u_topic; 00179 ACE_NEW_NORETURN(u_topic 00180 , UTopic(topic.domainId, topic.topicId 00181 , topic.participantId 00182 , topic.name.c_str() 00183 , topic.dataType.c_str(), *qos)); 00184 topics.push_back(u_topic); 00185 00186 // Push newly created UTopic into UImage Topic bucket 00187 u_image.topics.push_back(u_topic); 00188 } 00189 00190 // Actor buckets 00191 SeqGuard<DDS::PublisherQos> pub_qos_guard; 00192 SeqGuard<DDS::PublisherQos>::Seq& pub_qos_seq = pub_qos_guard.seq(); 00193 SeqGuard<DDS::DataWriterQos> dw_qos_guard; 00194 SeqGuard<DDS::DataWriterQos>::Seq& dw_qos_seq = dw_qos_guard.seq(); 00195 00196 SeqGuard<DDS::SubscriberQos> sub_qos_guard; 00197 SeqGuard<DDS::SubscriberQos>::Seq& sub_qos_seq = sub_qos_guard.seq(); 00198 SeqGuard<DDS::DataReaderQos> dr_qos_guard; 00199 SeqGuard<DDS::DataReaderQos>::Seq& dr_qos_seq = dr_qos_guard.seq(); 00200 00201 SeqGuard<OpenDDS::DCPS::TransportLocatorSeq> trans_guard; 00202 SeqGuard<OpenDDS::DCPS::TransportLocatorSeq>::Seq& transports = trans_guard.seq(); 00203 00204 SeqGuard<URActor> reader_guard; 00205 SeqGuard<URActor>::Seq& readers = reader_guard.seq(); 00206 SeqGuard<UWActor> writer_guard; 00207 SeqGuard<UWActor>::Seq& writers = writer_guard.seq(); 00208 00209 for (DImage::ReaderSeq::const_iterator iter = image.actors.begin(); 00210 iter != image.actors.end(); iter++) { 00211 const DActor& actor = *iter; 00212 00213 TAO_InputCDR in_cdr(actor.transportInterfaceInfo.second 00214 , actor.transportInterfaceInfo.first); 00215 OpenDDS::DCPS::TransportLocatorSeq* trans; 00216 ACE_NEW_NORETURN(trans, OpenDDS::DCPS::TransportLocatorSeq); 00217 transports.push_back(trans); 00218 in_cdr >> *trans; 00219 00220 DDS::PublisherQos* pub_qos = 0; 00221 DDS::DataWriterQos* writer_qos = 0; 00222 DDS::SubscriberQos* sub_qos = 0; 00223 DDS::DataReaderQos* reader_qos = 0; 00224 00225 if (actor.type == DataReader) { 00226 TAO_InputCDR sub_cdr(actor.pubsubQos.second.second 00227 , actor.pubsubQos.second.first); 00228 ACE_NEW_NORETURN(sub_qos, DDS::SubscriberQos); 00229 sub_qos_seq.push_back(sub_qos); 00230 sub_cdr >> *sub_qos; 00231 00232 TAO_InputCDR read_cdr(actor.drdwQos.second.second 00233 , actor.drdwQos.second.first); 00234 ACE_NEW_NORETURN(reader_qos, DDS::DataReaderQos); 00235 dr_qos_seq.push_back(reader_qos); 00236 read_cdr >> *reader_qos; 00237 00238 ContentSubscriptionInfo* csi = 0; 00239 ACE_NEW_NORETURN(csi, ContentSubscriptionInfo); 00240 csi->filterClassName = actor.contentSubscriptionProfile.filterClassName.c_str(); 00241 csi->filterExpr = actor.contentSubscriptionProfile.filterExpr.c_str(); 00242 TAO_InputCDR csp_cdr(actor.contentSubscriptionProfile.exprParams.second, 00243 actor.contentSubscriptionProfile.exprParams.first); 00244 csp_cdr >> csi->exprParams; 00245 00246 URActor* reader; 00247 ACE_NEW_NORETURN(reader 00248 , URActor(actor.domainId, actor.actorId 00249 , actor.topicId, actor.participantId 00250 , actor.type, actor.callback.c_str() 00251 , *sub_qos, *reader_qos 00252 , *trans, *csi)); 00253 readers.push_back(reader); 00254 u_image.actors.push_back(reader); 00255 00256 } else if (actor.type == DataWriter) { 00257 TAO_InputCDR pub_cdr(actor.pubsubQos.second.second 00258 , actor.pubsubQos.second.first); 00259 ACE_NEW_NORETURN(pub_qos, DDS::PublisherQos); 00260 pub_qos_seq.push_back(pub_qos); 00261 pub_cdr >> *pub_qos; 00262 00263 TAO_InputCDR write_cdr(actor.drdwQos.second.second 00264 , actor.drdwQos.second.first); 00265 ACE_NEW_NORETURN(writer_qos, DDS::DataWriterQos); 00266 dw_qos_seq.push_back(writer_qos); 00267 write_cdr >> *writer_qos; 00268 00269 ContentSubscriptionInfo csi; //writers have no info 00270 00271 UWActor* writer; 00272 ACE_NEW_NORETURN(writer 00273 , UWActor(actor.domainId, actor.actorId 00274 , actor.topicId, actor.participantId 00275 , actor.type, actor.callback.c_str() 00276 , *pub_qos, *writer_qos 00277 , *trans, csi)); 00278 writers.push_back(writer); 00279 u_image.wActors.push_back(writer); 00280 00281 } else { 00282 ACE_ERROR((LM_ERROR, "Update::Manager::pushImage> unknown " 00283 "actor type.\n")); 00284 } 00285 } 00286 00287 info_->receive_image(u_image); 00288 }
void Update::Manager::remove | ( | const Updater * | updater | ) |
Definition at line 51 of file UpdateManager.cpp.
References updaters_.
00052 { 00053 // check if the Updaters is part of the list. 00054 Updaters::iterator iter = updaters_.find(const_cast<Updater*>(updater)); 00055 00056 if (iter == updaters_.end()) { 00057 return; 00058 } 00059 00060 // remove the element 00061 updaters_.erase(iter); 00062 }
void Update::Manager::remove | ( | ) |
Definition at line 44 of file UpdateManager.cpp.
References info_.
00045 { 00046 // Clean the refrence to the InfoRepo. 00047 info_ = 0; 00048 }
void Update::Manager::requestImage | ( | ) |
Force a clean shutdown.
Upstream request for a fresh image Currently handled synchronously via 'pushImage' TBD: Replace with an asynchronous model.
Definition at line 77 of file UpdateManager.cpp.
References updaters_.
Referenced by TAO_DDS_DCPSInfo_i::init_persistence().
00078 { 00079 for (Updaters::iterator iter = updaters_.begin(); 00080 iter != updaters_.end(); 00081 iter++) { 00082 (*iter)->requestImage(); 00083 } 00084 }
void Update::Manager::update | ( | const IdPath & | id, | |
const QosType & | qos | |||
) |
Definition at line 24 of file UpdateManager_T.cpp.
References updaters_.
Referenced by TAO_DDS_DCPSInfo_i::update_domain_participant_qos(), TAO_DDS_DCPSInfo_i::update_publication_qos(), TAO_DDS_DCPSInfo_i::update_subscription_params(), TAO_DDS_DCPSInfo_i::update_subscription_qos(), and TAO_DDS_DCPSInfo_i::update_topic_qos().
00025 { 00026 // Invoke update on each of the iterators. 00027 for (Updaters::iterator iter = updaters_.begin(); 00028 iter != updaters_.end(); 00029 iter++) { 00030 (*iter)->update(id, qos); 00031 } 00032 }
TAO_DDS_DCPSInfo_i* Update::Manager::info_ [private] |
Updaters Update::Manager::updaters_ [private] |
Definition at line 84 of file UpdateManager.h.
Referenced by add(), create(), destroy(), remove(), requestImage(), and update().