Definition at line 411 of file Sedp.h.
OpenDDS::RTPS::Sedp::Task::Task | ( | Sedp * | sedp | ) | [inline, explicit] |
Definition at line 412 of file Sedp.h.
References ACE_Task_Base::activate().
00413 : spdp_(&sedp->spdp_) 00414 , sedp_(sedp) 00415 , shutting_down_(false) 00416 { 00417 activate(); 00418 }
OpenDDS::RTPS::Sedp::Task::~Task | ( | ) |
Definition at line 3667 of file Sedp.cpp.
References shutdown().
03668 { 03669 shutdown(); 03670 }
void OpenDDS::RTPS::Sedp::Task::acknowledge | ( | ) |
Definition at line 1507 of file Sedp.cpp.
References OpenDDS::RTPS::Sedp::Msg::MSG_FINI_BIT, ACE_Task_Ex< ACE_MT_SYNCH, Msg >::putq(), and OpenDDS::DCPS::REQUEST_ACK.
Referenced by OpenDDS::RTPS::Sedp::acknowledge().
01508 { 01509 // id is really a don't care, but just set to REQUEST_ACK 01510 putq(new Msg(Msg::MSG_FINI_BIT, DCPS::REQUEST_ACK, 0)); 01511 }
void OpenDDS::RTPS::Sedp::Task::enqueue | ( | Msg::MsgType | which_bit, | |
const DDS::InstanceHandle_t | bit_ih | |||
) |
Definition at line 3546 of file Sedp.cpp.
References OpenDDS::DCPS::DISPOSE_INSTANCE, ACE_Task_Ex< ACE_MT_SYNCH, Msg >::putq(), OpenDDS::RTPS::Spdp::shutting_down(), and spdp_.
03547 { 03548 #ifndef DDS_HAS_MINIMUM_BIT 03549 if (spdp_->shutting_down()) { return; } 03550 putq(new Msg(which_bit, DCPS::DISPOSE_INSTANCE, bit_ih)); 03551 #else 03552 ACE_UNUSED_ARG(which_bit); 03553 ACE_UNUSED_ARG(bit_ih); 03554 #endif /* DDS_HAS_MINIMUM_BIT */ 03555 }
void OpenDDS::RTPS::Sedp::Task::enqueue | ( | DCPS::MessageId | id, | |
DCPS::unique_ptr< ParticipantMessageData > | data | |||
) |
Definition at line 3539 of file Sedp.cpp.
References OpenDDS::RTPS::Sedp::Msg::MSG_PARTICIPANT_DATA, ACE_Task_Ex< ACE_MT_SYNCH, Msg >::putq(), OpenDDS::DCPS::unique_ptr< T, Deleter >::release(), OpenDDS::RTPS::Spdp::shutting_down(), and spdp_.
03540 { 03541 if (spdp_->shutting_down()) { return; } 03542 putq(new Msg(Msg::MSG_PARTICIPANT_DATA, id, data.release())); 03543 }
void OpenDDS::RTPS::Sedp::Task::enqueue | ( | DCPS::MessageId | id, | |
DCPS::unique_ptr< DCPS::DiscoveredReaderData > | rdata | |||
) |
Definition at line 3523 of file Sedp.cpp.
References OpenDDS::RTPS::Sedp::Msg::MSG_READER, ACE_Task_Ex< ACE_MT_SYNCH, Msg >::putq(), OpenDDS::DCPS::unique_ptr< T, Deleter >::release(), OpenDDS::RTPS::Spdp::shutting_down(), and spdp_.
03524 { 03525 if (spdp_->shutting_down()) { return; } 03526 putq(new Msg(Msg::MSG_READER, id, rdata.release())); 03527 }
void OpenDDS::RTPS::Sedp::Task::enqueue | ( | DCPS::MessageId | id, | |
DCPS::unique_ptr< DCPS::DiscoveredWriterData > | wdata | |||
) |
Definition at line 3507 of file Sedp.cpp.
References OpenDDS::RTPS::Sedp::Msg::MSG_WRITER, ACE_Task_Ex< ACE_MT_SYNCH, Msg >::putq(), OpenDDS::DCPS::unique_ptr< T, Deleter >::release(), OpenDDS::RTPS::Spdp::shutting_down(), and spdp_.
03508 { 03509 if (spdp_->shutting_down()) { return; } 03510 putq(new Msg(Msg::MSG_WRITER, id, wdata.release())); 03511 }
void OpenDDS::RTPS::Sedp::Task::enqueue | ( | DCPS::MessageId | id, | |
DCPS::unique_ptr< Security::SPDPdiscoveredParticipantData > | pdata | |||
) |
Definition at line 3491 of file Sedp.cpp.
References OpenDDS::Security::DPDK_SECURE, OpenDDS::RTPS::Sedp::Msg::MSG_PARTICIPANT, ACE_Task_Ex< ACE_MT_SYNCH, Msg >::putq(), OpenDDS::DCPS::unique_ptr< T, Deleter >::release(), OpenDDS::RTPS::Spdp::shutting_down(), and spdp_.
Referenced by OpenDDS::RTPS::Sedp::associate(), OpenDDS::RTPS::Sedp::Reader::data_received(), and OpenDDS::RTPS::Sedp::remove_from_bit_i().
03492 { 03493 if (spdp_->shutting_down()) { return; } 03494 03495 Msg::MsgType type = Msg::MSG_PARTICIPANT; 03496 03497 #if defined(OPENDDS_SECURITY) 03498 if (pdata->dataKind == Security::DPDK_SECURE) { 03499 type = Msg::MSG_DCPS_PARTICIPANT_SECURE; 03500 } 03501 #endif 03502 03503 putq(new Msg(type, id, pdata.release())); 03504 }
void OpenDDS::RTPS::Sedp::Task::shutdown | ( | void | ) |
Definition at line 1514 of file Sedp.cpp.
References OpenDDS::DCPS::GRACEFUL_DISCONNECT, OpenDDS::RTPS::Sedp::Msg::MSG_STOP, ACE_Task_Ex< ACE_MT_SYNCH, Msg >::putq(), shutting_down_, and ACE_Task_Base::wait().
Referenced by OpenDDS::RTPS::Sedp::shutdown(), and ~Task().
01515 { 01516 if (!shutting_down_) { 01517 shutting_down_ = true; 01518 putq(new Msg(Msg::MSG_STOP, DCPS::GRACEFUL_DISCONNECT, 0)); 01519 wait(); 01520 } 01521 }
int OpenDDS::RTPS::Sedp::Task::svc | ( | void | ) | [private, virtual] |
Reimplemented from ACE_Task_Base.
Definition at line 3581 of file Sedp.cpp.
References ACE_TEXT(), OpenDDS::RTPS::WaitForAcks::ack(), OpenDDS::DCPS::DCPS_debug_level, ACE_Task_Ex< ACE_MT_SYNCH, Msg >::getq(), LM_DEBUG, LM_INFO, OpenDDS::RTPS::Sedp::Msg::MSG_FINI_BIT, OpenDDS::RTPS::Sedp::Msg::MSG_PARTICIPANT, OpenDDS::RTPS::Sedp::Msg::MSG_PARTICIPANT_DATA, OpenDDS::RTPS::Sedp::Msg::MSG_READER, OpenDDS::RTPS::Sedp::Msg::MSG_REMOVE_FROM_PUB_BIT, OpenDDS::RTPS::Sedp::Msg::MSG_REMOVE_FROM_SUB_BIT, OpenDDS::RTPS::Sedp::Msg::MSG_STOP, OpenDDS::RTPS::Sedp::Msg::MSG_WRITER, spdp_, svc_i(), svc_secure_i(), and OpenDDS::RTPS::Spdp::wait_for_acks().
03582 { 03583 for (Msg* msg = 0; getq(msg) != -1; /*no increment*/) { 03584 if (DCPS::DCPS_debug_level > 5) { 03585 ACE_DEBUG((LM_DEBUG, "(%P|%t) Sedp::Task::svc " 03586 "got message from queue type %d\n", msg->type_)); 03587 } 03588 DCPS::unique_ptr<Msg> delete_the_msg(msg); 03589 switch (msg->type_) { 03590 case Msg::MSG_PARTICIPANT: 03591 svc_i(msg->dpdata_); 03592 break; 03593 03594 case Msg::MSG_WRITER: 03595 svc_i(msg->id_, msg->wdata_); 03596 break; 03597 03598 #if defined(OPENDDS_SECURITY) 03599 case Msg::MSG_WRITER_SECURE: 03600 svc_i(msg->id_, msg->wdata_secure_); 03601 break; 03602 #endif 03603 03604 case Msg::MSG_READER: 03605 svc_i(msg->id_, msg->rdata_); 03606 break; 03607 03608 #if defined(OPENDDS_SECURITY) 03609 case Msg::MSG_READER_SECURE: 03610 svc_i(msg->id_, msg->rdata_secure_); 03611 break; 03612 #endif 03613 03614 case Msg::MSG_PARTICIPANT_DATA: 03615 svc_i(msg->id_, msg->pmdata_); 03616 break; 03617 03618 #if defined(OPENDDS_SECURITY) 03619 case Msg::MSG_PARTICIPANT_DATA_SECURE: 03620 svc_participant_message_data_secure(msg->id_, msg->pmdata_); 03621 break; 03622 03623 case Msg::MSG_PARTICIPANT_STATELESS_DATA: 03624 svc_stateless_message(msg->id_, msg->pgmdata_); 03625 break; 03626 03627 case Msg::MSG_PARTICIPANT_VOLATILE_SECURE: 03628 svc_volatile_message_secure(msg->id_, msg->pgmdata_); 03629 break; 03630 03631 case Msg::MSG_DCPS_PARTICIPANT_SECURE: 03632 svc_secure_i(msg->id_, msg->dpdata_); 03633 break; 03634 #endif 03635 03636 case Msg::MSG_REMOVE_FROM_PUB_BIT: 03637 case Msg::MSG_REMOVE_FROM_SUB_BIT: 03638 svc_i(msg->type_, msg->ih_); 03639 break; 03640 03641 case Msg::MSG_FINI_BIT: 03642 // acknowledge that fini_bit has been called (this just ensures that 03643 // this task is not in the act of using one of BIT Subscriber's Data 03644 // Readers while it is being deleted 03645 spdp_->wait_for_acks().ack(); 03646 break; 03647 03648 case Msg::MSG_STOP: 03649 if (DCPS::DCPS_debug_level > 3) { 03650 ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) Sedp::Task::svc - ") 03651 ACE_TEXT("received MSG_STOP. Task exiting\n"))); 03652 } 03653 return 0; 03654 } 03655 03656 if (DCPS::DCPS_debug_level > 5) { 03657 ACE_DEBUG((LM_DEBUG, "(%P|%t) Sedp::Task::svc done with message\n")); 03658 } 03659 } 03660 if (DCPS::DCPS_debug_level > 3) { 03661 ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) Sedp::Task::svc - ") 03662 ACE_TEXT("Task exiting.\n"))); 03663 } 03664 return 0; 03665 }
void OpenDDS::RTPS::Sedp::Task::svc_i | ( | Msg::MsgType | which_bit, | |
const DDS::InstanceHandle_t | bit_ih | |||
) | [private] |
Definition at line 1264 of file Sedp.cpp.
References DDS::HANDLE_NIL, OpenDDS::RTPS::Sedp::Msg::MSG_REMOVE_FROM_PUB_BIT, OpenDDS::RTPS::Sedp::Msg::MSG_REMOVE_FROM_SUB_BIT, DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE, OpenDDS::RTPS::Sedp::pub_bit(), sedp_, OpenDDS::DCPS::DataReaderImpl_T< MessageType >::set_instance_state(), and OpenDDS::RTPS::Sedp::sub_bit().
01265 { 01266 #ifndef DDS_HAS_MINIMUM_BIT 01267 switch (which_bit) { 01268 case Msg::MSG_REMOVE_FROM_PUB_BIT: { 01269 DCPS::PublicationBuiltinTopicDataDataReaderImpl* bit = sedp_->pub_bit(); 01270 // bit may be null if the DomainParticipant is shutting down 01271 if (bit && bit_ih != DDS::HANDLE_NIL) { 01272 bit->set_instance_state(bit_ih, 01273 DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE); 01274 } 01275 break; 01276 } 01277 case Msg::MSG_REMOVE_FROM_SUB_BIT: { 01278 DCPS::SubscriptionBuiltinTopicDataDataReaderImpl* bit = sedp_->sub_bit(); 01279 // bit may be null if the DomainParticipant is shutting down 01280 if (bit && bit_ih != DDS::HANDLE_NIL) { 01281 bit->set_instance_state(bit_ih, 01282 DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE); 01283 } 01284 break; 01285 } 01286 default: 01287 break; 01288 } 01289 #else 01290 ACE_UNUSED_ARG(which_bit); 01291 ACE_UNUSED_ARG(bit_ih); 01292 #endif /* DDS_HAS_MINIMUM_BIT */ 01293 }
void OpenDDS::RTPS::Sedp::Task::svc_i | ( | DCPS::MessageId | id, | |
const ParticipantMessageData * | data | |||
) | [private] |
Definition at line 2151 of file Sedp.cpp.
References OpenDDS::RTPS::Sedp::data_received(), and sedp_.
02153 { 02154 DCPS::unique_ptr<const ParticipantMessageData> delete_the_data(data); 02155 sedp_->data_received(message_id, *data); 02156 }
void OpenDDS::RTPS::Sedp::Task::svc_i | ( | DCPS::MessageId | id, | |
const DCPS::DiscoveredReaderData * | rdata | |||
) | [private] |
Definition at line 2081 of file Sedp.cpp.
References OpenDDS::RTPS::Sedp::data_received(), and sedp_.
02083 { 02084 DCPS::unique_ptr<const DCPS::DiscoveredReaderData> delete_the_data(prdata); 02085 sedp_->data_received(message_id, *prdata); 02086 }
void OpenDDS::RTPS::Sedp::Task::svc_i | ( | DCPS::MessageId | id, | |
const DCPS::DiscoveredWriterData * | wdata | |||
) | [private] |
Definition at line 1524 of file Sedp.cpp.
References OpenDDS::RTPS::Sedp::data_received(), and sedp_.
01526 { 01527 DCPS::unique_ptr<const DCPS::DiscoveredWriterData> delete_the_data(pwdata); 01528 sedp_->data_received(message_id, *pwdata); 01529 }
void OpenDDS::RTPS::Sedp::Task::svc_i | ( | const Security::SPDPdiscoveredParticipantData * | pdata | ) | [private] |
Definition at line 991 of file Sedp.cpp.
References ACE_TEXT(), OpenDDS::RTPS::Sedp::Writer::assoc(), OpenDDS::RTPS::Sedp::associated_participants_, OpenDDS::RTPS::BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_READER, OpenDDS::RTPS::create_association_data_proto(), OpenDDS::RTPS::Sedp::data_received(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::RTPS::Sedp::defer_match_endpoints_, OpenDDS::RTPS::Sedp::deferred_publications_, OpenDDS::RTPS::Sedp::deferred_subscriptions_, OpenDDS::RTPS::DISC_BUILTIN_ENDPOINT_PUBLICATION_DETECTOR, OpenDDS::RTPS::DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_DETECTOR, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::discovered_publications_, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::discovered_subscriptions_, OpenDDS::DCPS::ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_READER, OpenDDS::DCPS::ENTITYID_PARTICIPANT, OpenDDS::DCPS::ENTITYID_SEDP_BUILTIN_PUBLICATIONS_READER, OpenDDS::DCPS::ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_READER, LM_DEBUG, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::lock_, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::match_endpoints(), OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::OPENDDS_MAP(), OPENDDS_STRING, OpenDDS::RTPS::Sedp::participant_message_writer_, OpenDDS::RTPS::Sedp::publications_writer_, OpenDDS::DCPS::AssociationData::remote_id_, sedp_, OpenDDS::RTPS::Spdp::shutting_down(), spdp_, OpenDDS::RTPS::Sedp::subscriptions_writer_, OpenDDS::RTPS::Sedp::write_durable_participant_message_data(), OpenDDS::RTPS::Sedp::write_durable_publication_data(), and OpenDDS::RTPS::Sedp::write_durable_subscription_data().
Referenced by svc().
00992 { 00993 DCPS::unique_ptr<const Security::SPDPdiscoveredParticipantData> pdata(ppdata); 00994 00995 // First create a 'prototypical' instance of AssociationData. It will 00996 // be copied and modified for each of the (up to) four SEDP Endpoints. 00997 DCPS::AssociationData proto; 00998 create_association_data_proto(proto, *pdata); 00999 01000 const BuiltinEndpointSet_t& avail = 01001 pdata->participantProxy.availableBuiltinEndpoints; 01002 01003 // See RTPS v2.1 section 8.5.5.1 01004 if (avail & DISC_BUILTIN_ENDPOINT_PUBLICATION_DETECTOR) { 01005 DCPS::AssociationData peer = proto; 01006 peer.remote_id_.entityId = ENTITYID_SEDP_BUILTIN_PUBLICATIONS_READER; 01007 sedp_->publications_writer_.assoc(peer); 01008 } 01009 if (avail & DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_DETECTOR) { 01010 DCPS::AssociationData peer = proto; 01011 peer.remote_id_.entityId = ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_READER; 01012 sedp_->subscriptions_writer_.assoc(peer); 01013 } 01014 if (avail & BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_READER) { 01015 DCPS::AssociationData peer = proto; 01016 peer.remote_id_.entityId = ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_READER; 01017 sedp_->participant_message_writer_.assoc(peer); 01018 } 01019 01020 #if defined(OPENDDS_SECURITY) 01021 if (sedp_->is_security_enabled()) { 01022 sedp_->associate_secure_readers_to_writers(*pdata); 01023 } 01024 #endif 01025 01026 //FUTURE: if/when topic propagation is supported, add it here 01027 01028 // Process deferred publications and subscriptions. 01029 for (DeferredSubscriptionMap::iterator pos = sedp_->deferred_subscriptions_.lower_bound(proto.remote_id_), 01030 limit = sedp_->deferred_subscriptions_.upper_bound(proto.remote_id_); 01031 pos != limit; 01032 /* Increment in body. */) { 01033 sedp_->data_received (pos->second.first, pos->second.second); 01034 sedp_->deferred_subscriptions_.erase (pos++); 01035 } 01036 for (DeferredPublicationMap::iterator pos = sedp_->deferred_publications_.lower_bound(proto.remote_id_), 01037 limit = sedp_->deferred_publications_.upper_bound(proto.remote_id_); 01038 pos != limit; 01039 /* Increment in body. */) { 01040 sedp_->data_received (pos->second.first, pos->second.second); 01041 sedp_->deferred_publications_.erase (pos++); 01042 } 01043 01044 ACE_GUARD(ACE_Thread_Mutex, g, sedp_->lock_); 01045 if (spdp_->shutting_down()) { return; } 01046 01047 proto.remote_id_.entityId = ENTITYID_PARTICIPANT; 01048 sedp_->associated_participants_.insert(proto.remote_id_); 01049 01050 #if defined(OPENDDS_SECURITY) 01051 if (sedp_->is_security_enabled()) { 01052 spdp_->send_participant_crypto_tokens(proto.remote_id_); 01053 sedp_->send_builtin_crypto_tokens(*pdata); 01054 } 01055 #endif 01056 01057 // Write durable data 01058 if (avail & DISC_BUILTIN_ENDPOINT_PUBLICATION_DETECTOR) { 01059 proto.remote_id_.entityId = ENTITYID_SEDP_BUILTIN_PUBLICATIONS_READER; 01060 sedp_->write_durable_publication_data(proto.remote_id_); 01061 } 01062 if (avail & DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_DETECTOR) { 01063 proto.remote_id_.entityId = ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_READER; 01064 sedp_->write_durable_subscription_data(proto.remote_id_); 01065 } 01066 if (avail & BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_READER) { 01067 proto.remote_id_.entityId = ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_READER; 01068 sedp_->write_durable_participant_message_data(proto.remote_id_); 01069 } 01070 01071 for (DCPS::RepoIdSet::iterator it = sedp_->defer_match_endpoints_.begin(); 01072 it != sedp_->defer_match_endpoints_.end(); /*incremented in body*/) { 01073 if (0 == std::memcmp(it->guidPrefix, proto.remote_id_.guidPrefix, 01074 sizeof(GuidPrefix_t))) { 01075 OPENDDS_STRING topic; 01076 if (it->entityId.entityKind & 4) { 01077 DiscoveredSubscriptionIter dsi = 01078 sedp_->discovered_subscriptions_.find(*it); 01079 if (dsi != sedp_->discovered_subscriptions_.end()) { 01080 topic = dsi->second.reader_data_.ddsSubscriptionData.topic_name; 01081 } 01082 } else { 01083 DiscoveredPublicationIter dpi = 01084 sedp_->discovered_publications_.find(*it); 01085 if (dpi != sedp_->discovered_publications_.end()) { 01086 topic = dpi->second.writer_data_.ddsPublicationData.topic_name; 01087 } 01088 } 01089 if (DCPS::DCPS_debug_level > 3) { 01090 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Sedp::AssociateTask::svc - ") 01091 ACE_TEXT("processing deferred endpoints for topic %C\n"), 01092 topic.c_str())); 01093 } 01094 if (!topic.empty()) { 01095 OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator ti = 01096 sedp_->topics_.find(topic); 01097 if (ti != sedp_->topics_.end()) { 01098 if (DCPS::DCPS_debug_level > 3) { 01099 DCPS::GuidConverter conv(*it); 01100 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Sedp::AssociateTask::svc - ") 01101 ACE_TEXT("calling match_endpoints %C\n"), 01102 OPENDDS_STRING(conv).c_str())); 01103 } 01104 sedp_->match_endpoints(*it, ti->second); 01105 if (spdp_->shutting_down()) { return; } 01106 } 01107 } 01108 sedp_->defer_match_endpoints_.erase(it++); 01109 } else { 01110 ++it; 01111 } 01112 } 01113 }
void OpenDDS::RTPS::Sedp::Task::svc_secure_i | ( | DCPS::MessageId | id, | |
const Security::SPDPdiscoveredParticipantData * | pdata | |||
) | [private] |
Definition at line 1116 of file Sedp.cpp.
References OpenDDS::RTPS::Spdp::handle_participant_data(), and spdp_.
Referenced by svc().
01118 { 01119 DCPS::unique_ptr<const Security::SPDPdiscoveredParticipantData> pdata(ppdata); 01120 spdp_->handle_participant_data(id, *pdata); 01121 }
Sedp* OpenDDS::RTPS::Sedp::Task::sedp_ [private] |
bool OpenDDS::RTPS::Sedp::Task::shutting_down_ [private] |
Definition at line 468 of file Sedp.h.
Referenced by shutdown().
Spdp* OpenDDS::RTPS::Sedp::Task::spdp_ [private] |