#include <UpdateManager.h>
Public Member Functions | |
Manager () | |
virtual | ~Manager () |
virtual int | init (int argc, ACE_TCHAR *argv[]) |
Shared object initializer. | |
virtual int | fini () |
Shared object finalizer. | |
void | add (TAO_DDS_DCPSInfo_i *info) |
void | add (Updater *updater) |
void | remove () |
void | remove (const Updater *updater) |
void | requestImage () |
Force a clean shutdown. | |
void | pushImage (const DImage &image) |
Downstream request to push image. | |
template<class UType > | |
void | create (const UType &info) |
template<class QosType > | |
void | update (const IdPath &id, const QosType &qos) |
void | destroy (const IdPath &id, ItemType type, ActorType actor=DataWriter) |
void | add (const DTopic &topic) |
void | add (const DParticipant &participant) |
void | add (const DActor &actor) |
virtual void | updateLastPartId (PartIdType partId) |
Update Last Participant Id for the repo. | |
Private Types | |
typedef std::set< Updater * > | Updaters |
Private Attributes | |
TAO_DDS_DCPSInfo_i * | info_ |
Updaters | updaters_ |
Definition at line 32 of file UpdateManager.h.
typedef std::set<Updater*> Update::Manager::Updaters [private] |
Definition at line 83 of file UpdateManager.h.
Update::Manager::Manager | ( | ) |
Definition at line 23 of file UpdateManager.cpp.
00024 : info_(0) 00025 { 00026 }
Update::Manager::~Manager | ( | ) | [virtual] |
Definition at line 28 of file UpdateManager.cpp.
void Update::Manager::add | ( | const DActor & | actor | ) |
Definition at line 344 of file UpdateManager.cpp.
References Update::ActorStrt< PSQ, RWQ, C, T, CSP >::actorId, TAO_DDS_DCPSInfo_i::add_publication(), TAO_DDS_DCPSInfo_i::add_subscription(), Update::ActorStrt< PSQ, RWQ, C, T, CSP >::callback, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::contentSubscriptionProfile, Update::DataReader, Update::DataWriter, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::domainId, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::drdwQos, Update::ContentSubscriptionInfo::exprParams, Update::ContentSubscriptionInfo::filterClassName, Update::ContentSubscriptionInfo::filterExpr, info_, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::participantId, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::pubsubQos, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::topicId, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::transportInterfaceInfo, and Update::ActorStrt< PSQ, RWQ, C, T, CSP >::type.
00345 { 00346 if (info_ == NULL) { 00347 return; 00348 } 00349 00350 // Demarshal QOS data 00351 TAO_InputCDR pubSubCdr(actor.pubsubQos.second.second 00352 , actor.pubsubQos.second.first); 00353 00354 TAO_InputCDR drdwCdr(actor.drdwQos.second.second 00355 , actor.drdwQos.second.first); 00356 00357 std::string callback(actor.callback.c_str()); 00358 00359 TAO_InputCDR transportCdr(actor.transportInterfaceInfo.second 00360 , actor.transportInterfaceInfo.first); 00361 00362 OpenDDS::DCPS::TransportLocatorSeq transport_info; 00363 transportCdr >> transport_info; 00364 00365 if (actor.type == DataReader) { 00366 DDS::SubscriberQos sub_qos; 00367 DDS::DataReaderQos reader_qos; 00368 00369 pubSubCdr >> sub_qos; 00370 drdwCdr >> reader_qos; 00371 00372 Update::ContentSubscriptionInfo csi; 00373 csi.filterClassName = actor.contentSubscriptionProfile.filterClassName.c_str(); 00374 csi.filterExpr = actor.contentSubscriptionProfile.filterExpr.c_str(); 00375 TAO_InputCDR cspCdr(actor.contentSubscriptionProfile.exprParams.second, 00376 actor.contentSubscriptionProfile.exprParams.first); 00377 cspCdr >> csi.exprParams; 00378 00379 // Pass actor to InfoRepo. 00380 info_->add_subscription(actor.domainId, actor.participantId 00381 , actor.topicId, actor.actorId 00382 , callback.c_str(), reader_qos 00383 , transport_info, sub_qos 00384 , csi.filterClassName, csi.filterExpr, csi.exprParams); 00385 00386 } else if (actor.type == DataWriter) { 00387 DDS::PublisherQos pub_qos; 00388 DDS::DataWriterQos writer_qos; 00389 00390 pubSubCdr >> pub_qos; 00391 drdwCdr >> writer_qos; 00392 00393 // Pass actor info to infoRepo. 00394 info_->add_publication(actor.domainId, actor.participantId 00395 , actor.topicId, actor.actorId 00396 , callback.c_str(), writer_qos 00397 , transport_info, pub_qos); 00398 } 00399 }
void Update::Manager::add | ( | const DParticipant & | participant | ) |
Definition at line 324 of file UpdateManager.cpp.
References TAO_DDS_DCPSInfo_i::add_domain_participant(), Update::ParticipantStrt< QosSeq >::domainId, info_, Update::ParticipantStrt< QosSeq >::participantId, and Update::ParticipantStrt< QosSeq >::participantQos.
00325 { 00326 if (info_ == NULL) { 00327 return; 00328 } 00329 00330 // Demarshal QOS data 00331 TAO_InputCDR in_cdr(participant.participantQos.second.second 00332 , participant.participantQos.second.first); 00333 00334 DDS::DomainParticipantQos qos; 00335 in_cdr >> qos; 00336 00337 // Pass participant info to infoRepo. 00338 info_->add_domain_participant(participant.domainId 00339 , participant.participantId 00340 , qos); 00341 }
void Update::Manager::add | ( | const DTopic & | topic | ) |
Definition at line 304 of file UpdateManager.cpp.
References TAO_DDS_DCPSInfo_i::add_topic(), Update::TopicStrt< Q, S >::dataType, Update::TopicStrt< Q, S >::domainId, info_, Update::TopicStrt< Q, S >::name, Update::TopicStrt< Q, S >::participantId, Update::TopicStrt< Q, S >::topicId, and Update::TopicStrt< Q, S >::topicQos.
00305 { 00306 if (info_ == NULL) { 00307 return; 00308 } 00309 00310 // Demarshal QOS data 00311 TAO_InputCDR in_cdr(topic.topicQos.second.second 00312 , topic.topicQos.second.first); 00313 00314 DDS::TopicQos qos; 00315 in_cdr >> qos; 00316 00317 // Pass topic info to infoRepo. 00318 info_->add_topic(topic.topicId, topic.domainId 00319 , topic.participantId, topic.name.c_str() 00320 , topic.dataType.c_str(), qos); 00321 }
void Update::Manager::add | ( | Updater * | updater | ) |
Definition at line 39 of file UpdateManager.cpp.
References updaters_.
00040 { 00041 // push new element to the back. 00042 updaters_.insert(updater); 00043 }
void Update::Manager::add | ( | TAO_DDS_DCPSInfo_i * | info | ) |
Definition at line 33 of file UpdateManager.cpp.
References info_.
Referenced by TAO_DDS_DCPSInfo_i::add(), Update::PersistenceUpdater::init(), and TAO_DDS_DCPSInfo_i::init_persistence().
00034 { 00035 info_ = info; 00036 }
void Update::Manager::create | ( | const UType & | info | ) | [inline] |
Definition at line 12 of file UpdateManager_T.cpp.
References updaters_.
Referenced by TAO_DDS_DCPSInfo_i::add_publication(), TAO_DDS_DCPSInfo_i::add_subscription(), TAO_DDS_DCPSInfo_i::assert_topic(), and DCPS_IR_Participant::takeOwnership().
00013 { 00014 // Invoke add on each of the iterators. 00015 for (Updaters::iterator iter = updaters_.begin(); 00016 iter != updaters_.end(); 00017 iter++) { 00018 (*iter)->create(info); 00019 } 00020 }
Definition at line 293 of file UpdateManager.cpp.
References updaters_.
Referenced by DCPS_IR_Participant::remove_all_dependents(), TAO_DDS_DCPSInfo_i::remove_domain_participant(), TAO_DDS_DCPSInfo_i::remove_publication(), TAO_DDS_DCPSInfo_i::remove_subscription(), and TAO_DDS_DCPSInfo_i::remove_topic().
00294 { 00295 // Invoke remove on each of the iterators. 00296 for (Updaters::iterator iter = updaters_.begin(); 00297 iter != updaters_.end(); 00298 iter++) { 00299 (*iter)->destroy(id, type, actor); 00300 } 00301 }
int Update::Manager::fini | ( | void | ) | [virtual] |
Shared object finalizer.
Reimplemented from ACE_Shared_Object.
Definition at line 73 of file UpdateManager.cpp.
int Update::Manager::init | ( | int | argc, | |
ACE_TCHAR * | argv[] | |||
) | [virtual] |
Shared object initializer.
Reimplemented from ACE_Shared_Object.
Definition at line 67 of file UpdateManager.cpp.
void Update::Manager::pushImage | ( | const DImage & | image | ) |
Downstream request to push image.
Definition at line 113 of file UpdateManager.cpp.
References Update::ActorStrt< PSQ, RWQ, C, T, CSP >::actorId, Update::ImageData< T, P, A, W >::actors, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::callback, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::contentSubscriptionProfile, Update::DataReader, Update::TopicStrt< Q, S >::dataType, Update::DataWriter, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::domainId, Update::TopicStrt< Q, S >::domainId, Update::ParticipantStrt< QosSeq >::domainId, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::drdwQos, Update::ContentSubscriptionInfo::exprParams, Update::ContentSubscriptionInfo::filterClassName, Update::ContentSubscriptionInfo::filterExpr, info_, Update::ImageData< T, P, A, W >::lastPartId, LM_ERROR, Update::TopicStrt< Q, S >::name, Update::ParticipantStrt< QosSeq >::owner, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::participantId, Update::TopicStrt< Q, S >::participantId, Update::ParticipantStrt< QosSeq >::participantId, Update::ParticipantStrt< QosSeq >::participantQos, Update::ImageData< T, P, A, W >::participants, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::pubsubQos, TAO_DDS_DCPSInfo_i::receive_image(), Update::SeqGuard< T >::seq(), Update::ActorStrt< PSQ, RWQ, C, T, CSP >::topicId, Update::TopicStrt< Q, S >::topicId, Update::TopicStrt< Q, S >::topicQos, Update::ImageData< T, P, A, W >::topics, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::transportInterfaceInfo, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::type, and Update::ImageData< T, P, A, W >::wActors.
Referenced by Update::PersistenceUpdater::requestImage().
00114 { 00115 if (info_ == NULL) { 00116 return; 00117 } 00118 00119 // image to be propagated. 00120 UImage u_image; 00121 00122 /*************************** 00123 // The downstream image needs to be converted to a 00124 // format compatible with the upstream layers (Dimage -> UImage) 00125 // The Uimage cotains a lot of refrences to the complex data 00126 // types. These are collecetd in several buckets (see below) and 00127 // passed by reference. Usage of a custom guard class 'SeqGuard' 00128 // automates memory cleanup. 00129 ***************************/ 00130 00131 // Participant buckets 00132 SeqGuard<DDS::DomainParticipantQos> part_qos_guard; 00133 SeqGuard<DDS::DomainParticipantQos>::Seq& part_qos = part_qos_guard.seq(); 00134 00135 SeqGuard<UParticipant> part_guard; 00136 SeqGuard<UParticipant>::Seq& parts = part_guard.seq(); 00137 00138 for (DImage::ParticipantSeq::const_iterator iter = image.participants.begin(); 00139 iter != image.participants.end(); iter++) { 00140 const DParticipant& part = *iter; 00141 00142 TAO_InputCDR in_cdr(part.participantQos.second.second 00143 , part.participantQos.second.first); 00144 00145 DDS::DomainParticipantQos* qos; 00146 ACE_NEW_NORETURN(qos, DDS::DomainParticipantQos); 00147 in_cdr >> *qos; 00148 part_qos.push_back(qos); 00149 00150 UParticipant* u_part; 00151 ACE_NEW_NORETURN(u_part, UParticipant(part.domainId 00152 , part.owner 00153 , part.participantId 00154 , *qos)); 00155 parts.push_back(u_part); 00156 00157 // push newly created UParticipant into UImage Participant bucket 00158 u_image.participants.push_back(u_part); 00159 } 00160 00161 // Topic buckets 00162 SeqGuard<DDS::TopicQos> topics_qos_guard; 00163 SeqGuard<DDS::TopicQos>::Seq& topics_qos = topics_qos_guard.seq(); 00164 00165 SeqGuard<UTopic> topics_guard; 00166 SeqGuard<UTopic>::Seq& topics = topics_guard.seq(); 00167 00168 for (DImage::TopicSeq::const_iterator iter = image.topics.begin(); 00169 iter != image.topics.end(); iter++) { 00170 const DTopic& topic = *iter; 00171 00172 TAO_InputCDR in_cdr(topic.topicQos.second.second 00173 , topic.topicQos.second.first); 00174 00175 DDS::TopicQos* qos; 00176 ACE_NEW_NORETURN(qos, DDS::TopicQos); 00177 in_cdr >> *qos; 00178 topics_qos.push_back(qos); 00179 00180 UTopic* u_topic; 00181 ACE_NEW_NORETURN(u_topic 00182 , UTopic(topic.domainId, topic.topicId 00183 , topic.participantId 00184 , topic.name.c_str() 00185 , topic.dataType.c_str(), *qos)); 00186 topics.push_back(u_topic); 00187 00188 // Push newly created UTopic into UImage Topic bucket 00189 u_image.topics.push_back(u_topic); 00190 } 00191 00192 // Actor buckets 00193 SeqGuard<DDS::PublisherQos> pub_qos_guard; 00194 SeqGuard<DDS::PublisherQos>::Seq& pub_qos_seq = pub_qos_guard.seq(); 00195 SeqGuard<DDS::DataWriterQos> dw_qos_guard; 00196 SeqGuard<DDS::DataWriterQos>::Seq& dw_qos_seq = dw_qos_guard.seq(); 00197 00198 SeqGuard<DDS::SubscriberQos> sub_qos_guard; 00199 SeqGuard<DDS::SubscriberQos>::Seq& sub_qos_seq = sub_qos_guard.seq(); 00200 SeqGuard<DDS::DataReaderQos> dr_qos_guard; 00201 SeqGuard<DDS::DataReaderQos>::Seq& dr_qos_seq = dr_qos_guard.seq(); 00202 00203 SeqGuard<OpenDDS::DCPS::TransportLocatorSeq> trans_guard; 00204 SeqGuard<OpenDDS::DCPS::TransportLocatorSeq>::Seq& transports = trans_guard.seq(); 00205 00206 SeqGuard<URActor> reader_guard; 00207 SeqGuard<URActor>::Seq& readers = reader_guard.seq(); 00208 SeqGuard<UWActor> writer_guard; 00209 SeqGuard<UWActor>::Seq& writers = writer_guard.seq(); 00210 00211 ContentSubscriptionInfo csi; 00212 00213 for (DImage::ReaderSeq::const_iterator iter = image.actors.begin(); 00214 iter != image.actors.end(); iter++) { 00215 const DActor& actor = *iter; 00216 00217 TAO_InputCDR in_cdr(actor.transportInterfaceInfo.second 00218 , actor.transportInterfaceInfo.first); 00219 OpenDDS::DCPS::TransportLocatorSeq* trans; 00220 ACE_NEW_NORETURN(trans, OpenDDS::DCPS::TransportLocatorSeq); 00221 transports.push_back(trans); 00222 in_cdr >> *trans; 00223 00224 DDS::PublisherQos* pub_qos = 0; 00225 DDS::DataWriterQos* writer_qos = 0; 00226 DDS::SubscriberQos* sub_qos = 0; 00227 DDS::DataReaderQos* reader_qos = 0; 00228 00229 if (actor.type == DataReader) { 00230 TAO_InputCDR sub_cdr(actor.pubsubQos.second.second 00231 , actor.pubsubQos.second.first); 00232 ACE_NEW_NORETURN(sub_qos, DDS::SubscriberQos); 00233 sub_qos_seq.push_back(sub_qos); 00234 sub_cdr >> *sub_qos; 00235 00236 TAO_InputCDR read_cdr(actor.drdwQos.second.second 00237 , actor.drdwQos.second.first); 00238 ACE_NEW_NORETURN(reader_qos, DDS::DataReaderQos); 00239 dr_qos_seq.push_back(reader_qos); 00240 read_cdr >> *reader_qos; 00241 00242 csi.filterClassName = actor.contentSubscriptionProfile.filterClassName.c_str(); 00243 csi.filterExpr = actor.contentSubscriptionProfile.filterExpr.c_str(); 00244 TAO_InputCDR csp_cdr(actor.contentSubscriptionProfile.exprParams.second, 00245 actor.contentSubscriptionProfile.exprParams.first); 00246 csp_cdr >> csi.exprParams; 00247 00248 URActor* reader; 00249 ACE_NEW_NORETURN(reader 00250 , URActor(actor.domainId, actor.actorId 00251 , actor.topicId, actor.participantId 00252 , actor.type, actor.callback.c_str() 00253 , *sub_qos, *reader_qos 00254 , *trans, csi)); 00255 readers.push_back(reader); 00256 u_image.actors.push_back(reader); 00257 00258 } else if (actor.type == DataWriter) { 00259 TAO_InputCDR pub_cdr(actor.pubsubQos.second.second 00260 , actor.pubsubQos.second.first); 00261 ACE_NEW_NORETURN(pub_qos, DDS::PublisherQos); 00262 pub_qos_seq.push_back(pub_qos); 00263 pub_cdr >> *pub_qos; 00264 00265 TAO_InputCDR write_cdr(actor.drdwQos.second.second 00266 , actor.drdwQos.second.first); 00267 ACE_NEW_NORETURN(writer_qos, DDS::DataWriterQos); 00268 dw_qos_seq.push_back(writer_qos); 00269 write_cdr >> *writer_qos; 00270 00271 UWActor* writer; 00272 ACE_NEW_NORETURN(writer 00273 , UWActor(actor.domainId, actor.actorId 00274 , actor.topicId, actor.participantId 00275 , actor.type, actor.callback.c_str() 00276 , *pub_qos, *writer_qos 00277 , *trans, csi)); 00278 writers.push_back(writer); 00279 u_image.wActors.push_back(writer); 00280 00281 } else { 00282 ACE_ERROR((LM_ERROR, "Update::Manager::pushImage> unknown " 00283 "actor type.\n")); 00284 } 00285 } 00286 00287 u_image.lastPartId = image.lastPartId; 00288 00289 info_->receive_image(u_image); 00290 }
void Update::Manager::remove | ( | const Updater * | updater | ) |
Definition at line 53 of file UpdateManager.cpp.
References updaters_.
void Update::Manager::remove | ( | void | ) |
Definition at line 46 of file UpdateManager.cpp.
References info_.
00047 { 00048 // Clean the refrence to the InfoRepo. 00049 info_ = 0; 00050 }
void Update::Manager::requestImage | ( | ) |
Force a clean shutdown.
Upstream request for a fresh image Currently handled synchronously via 'pushImage' TBD: Replace with an asynchronous model.
Definition at line 79 of file UpdateManager.cpp.
References updaters_.
Referenced by TAO_DDS_DCPSInfo_i::init_persistence().
00080 { 00081 for (Updaters::iterator iter = updaters_.begin(); 00082 iter != updaters_.end(); 00083 iter++) { 00084 (*iter)->requestImage(); 00085 } 00086 }
void Update::Manager::update | ( | const IdPath & | id, | |
const QosType & | qos | |||
) | [inline] |
Definition at line 24 of file UpdateManager_T.cpp.
References updaters_.
Referenced by TAO_DDS_DCPSInfo_i::update_domain_participant_qos(), TAO_DDS_DCPSInfo_i::update_publication_qos(), TAO_DDS_DCPSInfo_i::update_subscription_params(), TAO_DDS_DCPSInfo_i::update_subscription_qos(), and TAO_DDS_DCPSInfo_i::update_topic_qos().
00025 { 00026 // Invoke update on each of the iterators. 00027 for (Updaters::iterator iter = updaters_.begin(); 00028 iter != updaters_.end(); 00029 iter++) { 00030 (*iter)->update(id, qos); 00031 } 00032 }
void Update::Manager::updateLastPartId | ( | PartIdType | partId | ) | [virtual] |
TAO_DDS_DCPSInfo_i* Update::Manager::info_ [private] |
Definition at line 88 of file UpdateManager.h.
Referenced by add(), pushImage(), and remove().
Updaters Update::Manager::updaters_ [private] |
Definition at line 89 of file UpdateManager.h.
Referenced by add(), create(), destroy(), remove(), requestImage(), update(), and updateLastPartId().