Public Member Functions | |
Task (Sedp *sedp) | |
~Task () | |
void | enqueue (const SPDPdiscoveredParticipantData *pdata) |
void | enqueue (DCPS::MessageId id, const OpenDDS::DCPS::DiscoveredWriterData *wdata) |
void | enqueue (DCPS::MessageId id, const OpenDDS::DCPS::DiscoveredReaderData *rdata) |
void | enqueue (DCPS::MessageId id, const ParticipantMessageData *data) |
void | enqueue (Msg::MsgType which_bit, const DDS::InstanceHandle_t bit_ih) |
void | acknowledge () |
void | shutdown () |
Private Member Functions | |
int | svc () |
void | svc_i (const SPDPdiscoveredParticipantData *pdata) |
void | svc_i (DCPS::MessageId id, const OpenDDS::DCPS::DiscoveredWriterData *wdata) |
void | svc_i (DCPS::MessageId id, const OpenDDS::DCPS::DiscoveredReaderData *rdata) |
void | svc_i (DCPS::MessageId id, const ParticipantMessageData *data) |
void | svc_i (Msg::MsgType which_bit, const DDS::InstanceHandle_t bit_ih) |
Private Attributes | |
Spdp * | spdp_ |
Sedp * | sedp_ |
bool | shutting_down_ |
Definition at line 243 of file Sedp.h.
OpenDDS::RTPS::Sedp::Task::Task | ( | Sedp * | sedp | ) | [inline, explicit] |
Definition at line 244 of file Sedp.h.
00245 : spdp_(&sedp->spdp_) 00246 , sedp_(sedp) 00247 , shutting_down_(false) 00248 { 00249 activate(); 00250 }
OpenDDS::RTPS::Sedp::Task::~Task | ( | ) |
void OpenDDS::RTPS::Sedp::Task::acknowledge | ( | ) |
Definition at line 900 of file Sedp.cpp.
References OpenDDS::RTPS::Sedp::Msg::MSG_FINI_BIT, and OpenDDS::DCPS::REQUEST_ACK.
Referenced by OpenDDS::RTPS::Sedp::acknowledge().
00901 { 00902 // id is really a don't care, but just set to REQUEST_ACK 00903 putq(new Msg(Msg::MSG_FINI_BIT, DCPS::REQUEST_ACK, 0)); 00904 }
void OpenDDS::RTPS::Sedp::Task::enqueue | ( | Msg::MsgType | which_bit, | |
const DDS::InstanceHandle_t | bit_ih | |||
) |
Definition at line 1996 of file Sedp.cpp.
References OpenDDS::DCPS::DISPOSE_INSTANCE, OpenDDS::RTPS::Spdp::shutting_down(), and spdp_.
01997 { 01998 #ifndef DDS_HAS_MINIMUM_BIT 01999 if (spdp_->shutting_down()) { return; } 02000 putq(new Msg(which_bit, DCPS::DISPOSE_INSTANCE, bit_ih)); 02001 #else 02002 ACE_UNUSED_ARG(which_bit); 02003 ACE_UNUSED_ARG(bit_ih); 02004 #endif /* DDS_HAS_MINIMUM_BIT */ 02005 }
void OpenDDS::RTPS::Sedp::Task::enqueue | ( | DCPS::MessageId | id, | |
const ParticipantMessageData * | data | |||
) |
Definition at line 1989 of file Sedp.cpp.
References OpenDDS::RTPS::Sedp::Msg::MSG_PARTICIPANT_DATA, OpenDDS::RTPS::Spdp::shutting_down(), and spdp_.
01990 { 01991 if (spdp_->shutting_down()) { return; } 01992 putq(new Msg(Msg::MSG_PARTICIPANT_DATA, id, data)); 01993 }
void OpenDDS::RTPS::Sedp::Task::enqueue | ( | DCPS::MessageId | id, | |
const OpenDDS::DCPS::DiscoveredReaderData * | rdata | |||
) |
Definition at line 1982 of file Sedp.cpp.
References OpenDDS::RTPS::Sedp::Msg::MSG_READER, OpenDDS::RTPS::Spdp::shutting_down(), and spdp_.
01983 { 01984 if (spdp_->shutting_down()) { return; } 01985 putq(new Msg(Msg::MSG_READER, id, rdata)); 01986 }
void OpenDDS::RTPS::Sedp::Task::enqueue | ( | DCPS::MessageId | id, | |
const OpenDDS::DCPS::DiscoveredWriterData * | wdata | |||
) |
Definition at line 1975 of file Sedp.cpp.
References OpenDDS::RTPS::Sedp::Msg::MSG_WRITER, OpenDDS::RTPS::Spdp::shutting_down(), and spdp_.
01976 { 01977 if (spdp_->shutting_down()) { return; } 01978 putq(new Msg(Msg::MSG_WRITER, id, wdata)); 01979 }
void OpenDDS::RTPS::Sedp::Task::enqueue | ( | const SPDPdiscoveredParticipantData * | pdata | ) |
Definition at line 1968 of file Sedp.cpp.
References OpenDDS::RTPS::Sedp::Msg::MSG_PARTICIPANT, OpenDDS::DCPS::SAMPLE_DATA, OpenDDS::RTPS::Spdp::shutting_down(), and spdp_.
Referenced by OpenDDS::RTPS::Sedp::associate(), OpenDDS::RTPS::Sedp::Reader::data_received(), and OpenDDS::RTPS::Sedp::remove_from_bit_i().
01969 { 01970 if (spdp_->shutting_down()) { return; } 01971 putq(new Msg(Msg::MSG_PARTICIPANT, DCPS::SAMPLE_DATA, pdata)); 01972 }
void OpenDDS::RTPS::Sedp::Task::shutdown | ( | ) |
Definition at line 907 of file Sedp.cpp.
References OpenDDS::DCPS::GRACEFUL_DISCONNECT, OpenDDS::RTPS::Sedp::Msg::MSG_STOP, and shutting_down_.
Referenced by OpenDDS::RTPS::Sedp::shutdown(), and ~Task().
00908 { 00909 if (!shutting_down_) { 00910 shutting_down_ = true; 00911 putq(new Msg(Msg::MSG_STOP, DCPS::GRACEFUL_DISCONNECT, 0)); 00912 wait(); 00913 } 00914 }
int OpenDDS::RTPS::Sedp::Task::svc | ( | ) | [private] |
Definition at line 2008 of file Sedp.cpp.
References OpenDDS::RTPS::WaitForAcks::ack(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::RTPS::Sedp::Msg::MSG_FINI_BIT, OpenDDS::RTPS::Sedp::Msg::MSG_PARTICIPANT, OpenDDS::RTPS::Sedp::Msg::MSG_PARTICIPANT_DATA, OpenDDS::RTPS::Sedp::Msg::MSG_READER, OpenDDS::RTPS::Sedp::Msg::MSG_REMOVE_FROM_PUB_BIT, OpenDDS::RTPS::Sedp::Msg::MSG_REMOVE_FROM_SUB_BIT, OpenDDS::RTPS::Sedp::Msg::MSG_STOP, OpenDDS::RTPS::Sedp::Msg::MSG_WRITER, spdp_, svc_i(), and OpenDDS::RTPS::Spdp::wait_for_acks().
02009 { 02010 for (Msg* msg = 0; getq(msg) != -1; /*no increment*/) { 02011 if (DCPS::DCPS_debug_level > 5) { 02012 ACE_DEBUG((LM_DEBUG, "(%P|%t) Sedp::Task::svc " 02013 "got message from queue type %d\n", msg->type_)); 02014 } 02015 ACE_Auto_Basic_Ptr<Msg> delete_the_msg(msg); 02016 switch (msg->type_) { 02017 case Msg::MSG_PARTICIPANT: 02018 svc_i(msg->dpdata_); 02019 break; 02020 case Msg::MSG_WRITER: 02021 svc_i(msg->id_, msg->wdata_); 02022 break; 02023 case Msg::MSG_READER: 02024 svc_i(msg->id_, msg->rdata_); 02025 break; 02026 case Msg::MSG_PARTICIPANT_DATA: 02027 svc_i(msg->id_, msg->pmdata_); 02028 break; 02029 case Msg::MSG_REMOVE_FROM_PUB_BIT: 02030 case Msg::MSG_REMOVE_FROM_SUB_BIT: 02031 svc_i(msg->type_, msg->ih_); 02032 break; 02033 case Msg::MSG_FINI_BIT: 02034 // acknowledge that fini_bit has been called (this just ensures that 02035 // this task is not in the act of using one of BIT Subscriber's Data 02036 // Readers while it is being deleted 02037 spdp_->wait_for_acks().ack(); 02038 break; 02039 case Msg::MSG_STOP: 02040 if (DCPS::DCPS_debug_level > 3) { 02041 ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) Sedp::Task::svc - ") 02042 ACE_TEXT("received MSG_STOP. Task exiting\n"))); 02043 } 02044 return 0; 02045 } 02046 if (DCPS::DCPS_debug_level > 5) { 02047 ACE_DEBUG((LM_DEBUG, "(%P|%t) Sedp::Task::svc done with message\n")); 02048 } 02049 } 02050 if (DCPS::DCPS_debug_level > 3) { 02051 ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) Sedp::Task::svc - ") 02052 ACE_TEXT("Task exiting.\n"))); 02053 } 02054 return 0; 02055 }
void OpenDDS::RTPS::Sedp::Task::svc_i | ( | Msg::MsgType | which_bit, | |
const DDS::InstanceHandle_t | bit_ih | |||
) | [private] |
Definition at line 657 of file Sedp.cpp.
References DDS::HANDLE_NIL, OpenDDS::RTPS::Sedp::Msg::MSG_REMOVE_FROM_PUB_BIT, OpenDDS::RTPS::Sedp::Msg::MSG_REMOVE_FROM_SUB_BIT, DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE, OpenDDS::RTPS::Sedp::pub_bit(), sedp_, and OpenDDS::RTPS::Sedp::sub_bit().
00658 { 00659 #ifndef DDS_HAS_MINIMUM_BIT 00660 switch (which_bit) { 00661 case Msg::MSG_REMOVE_FROM_PUB_BIT: { 00662 DDS::PublicationBuiltinTopicDataDataReaderImpl* bit = sedp_->pub_bit(); 00663 // bit may be null if the DomainParticipant is shutting down 00664 if (bit && bit_ih != DDS::HANDLE_NIL) { 00665 bit->set_instance_state(bit_ih, 00666 DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE); 00667 } 00668 break; 00669 } 00670 case Msg::MSG_REMOVE_FROM_SUB_BIT: { 00671 DDS::SubscriptionBuiltinTopicDataDataReaderImpl* bit = sedp_->sub_bit(); 00672 // bit may be null if the DomainParticipant is shutting down 00673 if (bit && bit_ih != DDS::HANDLE_NIL) { 00674 bit->set_instance_state(bit_ih, 00675 DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE); 00676 } 00677 break; 00678 } 00679 default: 00680 break; 00681 } 00682 #else 00683 ACE_UNUSED_ARG(which_bit); 00684 ACE_UNUSED_ARG(bit_ih); 00685 #endif /* DDS_HAS_MINIMUM_BIT */ 00686 }
void OpenDDS::RTPS::Sedp::Task::svc_i | ( | DCPS::MessageId | id, | |
const ParticipantMessageData * | data | |||
) | [private] |
Definition at line 1276 of file Sedp.cpp.
References OpenDDS::RTPS::Sedp::data_received(), and sedp_.
01278 { 01279 ACE_Auto_Basic_Ptr<const ParticipantMessageData> delete_the_data(data); 01280 sedp_->data_received(message_id, *data); 01281 }
void OpenDDS::RTPS::Sedp::Task::svc_i | ( | DCPS::MessageId | id, | |
const OpenDDS::DCPS::DiscoveredReaderData * | rdata | |||
) | [private] |
Definition at line 1077 of file Sedp.cpp.
References OpenDDS::RTPS::Sedp::data_received(), and sedp_.
01079 { 01080 ACE_Auto_Basic_Ptr<const OpenDDS::DCPS::DiscoveredReaderData> delete_the_data(prdata); 01081 sedp_->data_received(message_id, *prdata); 01082 }
void OpenDDS::RTPS::Sedp::Task::svc_i | ( | DCPS::MessageId | id, | |
const OpenDDS::DCPS::DiscoveredWriterData * | wdata | |||
) | [private] |
Definition at line 917 of file Sedp.cpp.
References OpenDDS::RTPS::Sedp::data_received(), and sedp_.
00919 { 00920 ACE_Auto_Basic_Ptr<const OpenDDS::DCPS::DiscoveredWriterData> delete_the_data(pwdata); 00921 sedp_->data_received(message_id, *pwdata); 00922 }
void OpenDDS::RTPS::Sedp::Task::svc_i | ( | const SPDPdiscoveredParticipantData * | pdata | ) | [private] |
Definition at line 438 of file Sedp.cpp.
References OpenDDS::RTPS::Sedp::Writer::assoc(), OpenDDS::RTPS::Sedp::associated_participants_, OpenDDS::RTPS::ParticipantProxy_t::availableBuiltinEndpoints, OpenDDS::RTPS::BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_READER, OpenDDS::RTPS::create_association_data_proto(), OpenDDS::RTPS::Sedp::data_received(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::RTPS::Sedp::defer_match_endpoints_, OpenDDS::RTPS::Sedp::deferred_publications_, OpenDDS::RTPS::Sedp::deferred_subscriptions_, OpenDDS::RTPS::DISC_BUILTIN_ENDPOINT_PUBLICATION_DETECTOR, OpenDDS::RTPS::DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_DETECTOR, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::discovered_publications_, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::discovered_subscriptions_, OpenDDS::DCPS::ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_READER, OpenDDS::DCPS::ENTITYID_PARTICIPANT, OpenDDS::DCPS::ENTITYID_SEDP_BUILTIN_PUBLICATIONS_READER, OpenDDS::DCPS::ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_READER, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::lock_, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::match_endpoints(), OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::OPENDDS_MAP(), OPENDDS_STRING, OpenDDS::RTPS::Sedp::participant_message_writer_, OpenDDS::RTPS::SPDPdiscoveredParticipantData::participantProxy, OpenDDS::RTPS::Sedp::publications_writer_, OpenDDS::DCPS::AssociationData::remote_id_, sedp_, OpenDDS::RTPS::Spdp::shutting_down(), spdp_, OpenDDS::RTPS::Sedp::subscriptions_writer_, OpenDDS::RTPS::Sedp::write_durable_participant_message_data(), OpenDDS::RTPS::Sedp::write_durable_publication_data(), and OpenDDS::RTPS::Sedp::write_durable_subscription_data().
Referenced by svc().
00439 { 00440 ACE_Auto_Basic_Ptr<const SPDPdiscoveredParticipantData> delete_the_data(ppdata); 00441 const SPDPdiscoveredParticipantData& pdata = *ppdata; 00442 // First create a 'prototypical' instance of AssociationData. It will 00443 // be copied and modified for each of the (up to) four SEDP Endpoints. 00444 DCPS::AssociationData proto; 00445 create_association_data_proto(proto, pdata); 00446 00447 const BuiltinEndpointSet_t& avail = 00448 pdata.participantProxy.availableBuiltinEndpoints; 00449 00450 // See RTPS v2.1 section 8.5.5.1 00451 if (avail & DISC_BUILTIN_ENDPOINT_PUBLICATION_DETECTOR) { 00452 DCPS::AssociationData peer = proto; 00453 peer.remote_id_.entityId = ENTITYID_SEDP_BUILTIN_PUBLICATIONS_READER; 00454 sedp_->publications_writer_.assoc(peer); 00455 } 00456 if (avail & DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_DETECTOR) { 00457 DCPS::AssociationData peer = proto; 00458 peer.remote_id_.entityId = ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_READER; 00459 sedp_->subscriptions_writer_.assoc(peer); 00460 } 00461 if (avail & BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_READER) { 00462 DCPS::AssociationData peer = proto; 00463 peer.remote_id_.entityId = ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_READER; 00464 sedp_->participant_message_writer_.assoc(peer); 00465 } 00466 00467 //FUTURE: if/when topic propagation is supported, add it here 00468 00469 // Process deferred publications and subscriptions. 00470 for (DeferredSubscriptionMap::iterator pos = sedp_->deferred_subscriptions_.lower_bound (proto.remote_id_), 00471 limit = sedp_->deferred_subscriptions_.upper_bound (proto.remote_id_); 00472 pos != limit; 00473 /* Increment in body. */) { 00474 sedp_->data_received (pos->second.first, pos->second.second); 00475 sedp_->deferred_subscriptions_.erase (pos++); 00476 } 00477 for (DeferredPublicationMap::iterator pos = sedp_->deferred_publications_.lower_bound (proto.remote_id_), 00478 limit = sedp_->deferred_publications_.upper_bound (proto.remote_id_); 00479 pos != limit; 00480 /* Increment in body. */) { 00481 sedp_->data_received (pos->second.first, pos->second.second); 00482 sedp_->deferred_publications_.erase (pos++); 00483 } 00484 00485 ACE_GUARD(ACE_Thread_Mutex, g, sedp_->lock_); 00486 if (spdp_->shutting_down()) { return; } 00487 00488 proto.remote_id_.entityId = ENTITYID_PARTICIPANT; 00489 sedp_->associated_participants_.insert(proto.remote_id_); 00490 00491 // Write durable data 00492 if (avail & DISC_BUILTIN_ENDPOINT_PUBLICATION_DETECTOR) { 00493 proto.remote_id_.entityId = ENTITYID_SEDP_BUILTIN_PUBLICATIONS_READER; 00494 sedp_->write_durable_publication_data(proto.remote_id_); 00495 } 00496 if (avail & DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_DETECTOR) { 00497 proto.remote_id_.entityId = ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_READER; 00498 sedp_->write_durable_subscription_data(proto.remote_id_); 00499 } 00500 if (avail & BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_READER) { 00501 proto.remote_id_.entityId = ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_READER; 00502 sedp_->write_durable_participant_message_data(proto.remote_id_); 00503 } 00504 00505 for (DCPS::RepoIdSet::iterator it = sedp_->defer_match_endpoints_.begin(); 00506 it != sedp_->defer_match_endpoints_.end(); /*incremented in body*/) { 00507 if (0 == std::memcmp(it->guidPrefix, proto.remote_id_.guidPrefix, 00508 sizeof(GuidPrefix_t))) { 00509 OPENDDS_STRING topic; 00510 if (it->entityId.entityKind & 4) { 00511 DiscoveredSubscriptionIter dsi = 00512 sedp_->discovered_subscriptions_.find(*it); 00513 if (dsi != sedp_->discovered_subscriptions_.end()) { 00514 topic = dsi->second.reader_data_.ddsSubscriptionData.topic_name; 00515 } 00516 } else { 00517 DiscoveredPublicationIter dpi = 00518 sedp_->discovered_publications_.find(*it); 00519 if (dpi != sedp_->discovered_publications_.end()) { 00520 topic = dpi->second.writer_data_.ddsPublicationData.topic_name; 00521 } 00522 } 00523 if (DCPS::DCPS_debug_level > 3) { 00524 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Sedp::AssociateTask::svc - ") 00525 ACE_TEXT("processing deferred endpoints for topic %C\n"), 00526 topic.c_str())); 00527 } 00528 if (!topic.empty()) { 00529 OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator ti = 00530 sedp_->topics_.find(topic); 00531 if (ti != sedp_->topics_.end()) { 00532 if (DCPS::DCPS_debug_level > 3) { 00533 DCPS::GuidConverter conv(*it); 00534 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Sedp::AssociateTask::svc - ") 00535 ACE_TEXT("calling match_endpoints %C\n"), 00536 OPENDDS_STRING(conv).c_str())); 00537 } 00538 sedp_->match_endpoints(*it, ti->second); 00539 if (spdp_->shutting_down()) { return; } 00540 } 00541 } 00542 sedp_->defer_match_endpoints_.erase(it++); 00543 } else { 00544 ++it; 00545 } 00546 } 00547 }
Sedp* OpenDDS::RTPS::Sedp::Task::sedp_ [private] |
bool OpenDDS::RTPS::Sedp::Task::shutting_down_ [private] |
Spdp* OpenDDS::RTPS::Sedp::Task::spdp_ [private] |