OpenDDS::RTPS::Sedp::Task Struct Reference

Collaboration diagram for OpenDDS::RTPS::Sedp::Task:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 Task (Sedp *sedp)
 ~Task ()
void enqueue (const SPDPdiscoveredParticipantData *pdata)
void enqueue (DCPS::MessageId id, const OpenDDS::DCPS::DiscoveredWriterData *wdata)
void enqueue (DCPS::MessageId id, const OpenDDS::DCPS::DiscoveredReaderData *rdata)
void enqueue (DCPS::MessageId id, const ParticipantMessageData *data)
void enqueue (Msg::MsgType which_bit, const DDS::InstanceHandle_t bit_ih)
void acknowledge ()
void shutdown ()

Private Member Functions

int svc ()
void svc_i (const SPDPdiscoveredParticipantData *pdata)
void svc_i (DCPS::MessageId id, const OpenDDS::DCPS::DiscoveredWriterData *wdata)
void svc_i (DCPS::MessageId id, const OpenDDS::DCPS::DiscoveredReaderData *rdata)
void svc_i (DCPS::MessageId id, const ParticipantMessageData *data)
void svc_i (Msg::MsgType which_bit, const DDS::InstanceHandle_t bit_ih)

Private Attributes

Spdpspdp_
Sedpsedp_
bool shutting_down_

Detailed Description

Definition at line 243 of file Sedp.h.


Constructor & Destructor Documentation

OpenDDS::RTPS::Sedp::Task::Task ( Sedp sedp  )  [inline, explicit]

Definition at line 244 of file Sedp.h.

00245       : spdp_(&sedp->spdp_)
00246       , sedp_(sedp)
00247       , shutting_down_(false)
00248     {
00249       activate();
00250     }

OpenDDS::RTPS::Sedp::Task::~Task (  ) 

Definition at line 2057 of file Sedp.cpp.

References shutdown().

02058 {
02059   shutdown();
02060 }


Member Function Documentation

void OpenDDS::RTPS::Sedp::Task::acknowledge (  ) 

Definition at line 900 of file Sedp.cpp.

References OpenDDS::RTPS::Sedp::Msg::MSG_FINI_BIT, and OpenDDS::DCPS::REQUEST_ACK.

Referenced by OpenDDS::RTPS::Sedp::acknowledge().

00901 {
00902   // id is really a don't care, but just set to REQUEST_ACK
00903   putq(new Msg(Msg::MSG_FINI_BIT, DCPS::REQUEST_ACK, 0));
00904 }

void OpenDDS::RTPS::Sedp::Task::enqueue ( Msg::MsgType  which_bit,
const DDS::InstanceHandle_t  bit_ih 
)

Definition at line 1996 of file Sedp.cpp.

References OpenDDS::DCPS::DISPOSE_INSTANCE, OpenDDS::RTPS::Spdp::shutting_down(), and spdp_.

01997 {
01998 #ifndef DDS_HAS_MINIMUM_BIT
01999   if (spdp_->shutting_down()) { return; }
02000   putq(new Msg(which_bit, DCPS::DISPOSE_INSTANCE, bit_ih));
02001 #else
02002   ACE_UNUSED_ARG(which_bit);
02003   ACE_UNUSED_ARG(bit_ih);
02004 #endif /* DDS_HAS_MINIMUM_BIT */
02005 }

void OpenDDS::RTPS::Sedp::Task::enqueue ( DCPS::MessageId  id,
const ParticipantMessageData data 
)

Definition at line 1989 of file Sedp.cpp.

References OpenDDS::RTPS::Sedp::Msg::MSG_PARTICIPANT_DATA, OpenDDS::RTPS::Spdp::shutting_down(), and spdp_.

01990 {
01991   if (spdp_->shutting_down()) { return; }
01992   putq(new Msg(Msg::MSG_PARTICIPANT_DATA, id, data));
01993 }

void OpenDDS::RTPS::Sedp::Task::enqueue ( DCPS::MessageId  id,
const OpenDDS::DCPS::DiscoveredReaderData rdata 
)

Definition at line 1982 of file Sedp.cpp.

References OpenDDS::RTPS::Sedp::Msg::MSG_READER, OpenDDS::RTPS::Spdp::shutting_down(), and spdp_.

01983 {
01984   if (spdp_->shutting_down()) { return; }
01985   putq(new Msg(Msg::MSG_READER, id, rdata));
01986 }

void OpenDDS::RTPS::Sedp::Task::enqueue ( DCPS::MessageId  id,
const OpenDDS::DCPS::DiscoveredWriterData wdata 
)

Definition at line 1975 of file Sedp.cpp.

References OpenDDS::RTPS::Sedp::Msg::MSG_WRITER, OpenDDS::RTPS::Spdp::shutting_down(), and spdp_.

01976 {
01977   if (spdp_->shutting_down()) { return; }
01978   putq(new Msg(Msg::MSG_WRITER, id, wdata));
01979 }

void OpenDDS::RTPS::Sedp::Task::enqueue ( const SPDPdiscoveredParticipantData pdata  ) 

Definition at line 1968 of file Sedp.cpp.

References OpenDDS::RTPS::Sedp::Msg::MSG_PARTICIPANT, OpenDDS::DCPS::SAMPLE_DATA, OpenDDS::RTPS::Spdp::shutting_down(), and spdp_.

Referenced by OpenDDS::RTPS::Sedp::associate(), OpenDDS::RTPS::Sedp::Reader::data_received(), and OpenDDS::RTPS::Sedp::remove_from_bit_i().

01969 {
01970   if (spdp_->shutting_down()) { return; }
01971   putq(new Msg(Msg::MSG_PARTICIPANT, DCPS::SAMPLE_DATA, pdata));
01972 }

void OpenDDS::RTPS::Sedp::Task::shutdown (  ) 

Definition at line 907 of file Sedp.cpp.

References OpenDDS::DCPS::GRACEFUL_DISCONNECT, OpenDDS::RTPS::Sedp::Msg::MSG_STOP, and shutting_down_.

Referenced by OpenDDS::RTPS::Sedp::shutdown(), and ~Task().

00908 {
00909   if (!shutting_down_) {
00910     shutting_down_ = true;
00911     putq(new Msg(Msg::MSG_STOP, DCPS::GRACEFUL_DISCONNECT, 0));
00912     wait();
00913   }
00914 }

int OpenDDS::RTPS::Sedp::Task::svc (  )  [private]

Definition at line 2008 of file Sedp.cpp.

References OpenDDS::RTPS::WaitForAcks::ack(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::RTPS::Sedp::Msg::MSG_FINI_BIT, OpenDDS::RTPS::Sedp::Msg::MSG_PARTICIPANT, OpenDDS::RTPS::Sedp::Msg::MSG_PARTICIPANT_DATA, OpenDDS::RTPS::Sedp::Msg::MSG_READER, OpenDDS::RTPS::Sedp::Msg::MSG_REMOVE_FROM_PUB_BIT, OpenDDS::RTPS::Sedp::Msg::MSG_REMOVE_FROM_SUB_BIT, OpenDDS::RTPS::Sedp::Msg::MSG_STOP, OpenDDS::RTPS::Sedp::Msg::MSG_WRITER, spdp_, svc_i(), and OpenDDS::RTPS::Spdp::wait_for_acks().

02009 {
02010   for (Msg* msg = 0; getq(msg) != -1; /*no increment*/) {
02011     if (DCPS::DCPS_debug_level > 5) {
02012       ACE_DEBUG((LM_DEBUG, "(%P|%t) Sedp::Task::svc "
02013         "got message from queue type %d\n", msg->type_));
02014     }
02015     ACE_Auto_Basic_Ptr<Msg> delete_the_msg(msg);
02016     switch (msg->type_) {
02017     case Msg::MSG_PARTICIPANT:
02018       svc_i(msg->dpdata_);
02019       break;
02020     case Msg::MSG_WRITER:
02021       svc_i(msg->id_, msg->wdata_);
02022       break;
02023     case Msg::MSG_READER:
02024       svc_i(msg->id_, msg->rdata_);
02025       break;
02026     case Msg::MSG_PARTICIPANT_DATA:
02027       svc_i(msg->id_, msg->pmdata_);
02028       break;
02029     case Msg::MSG_REMOVE_FROM_PUB_BIT:
02030     case Msg::MSG_REMOVE_FROM_SUB_BIT:
02031       svc_i(msg->type_, msg->ih_);
02032       break;
02033     case Msg::MSG_FINI_BIT:
02034       // acknowledge that fini_bit has been called (this just ensures that
02035       // this task is not in the act of using one of BIT Subscriber's Data
02036       // Readers while it is being deleted
02037       spdp_->wait_for_acks().ack();
02038       break;
02039     case Msg::MSG_STOP:
02040       if (DCPS::DCPS_debug_level > 3) {
02041         ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) Sedp::Task::svc - ")
02042                             ACE_TEXT("received MSG_STOP. Task exiting\n")));
02043       }
02044       return 0;
02045     }
02046     if (DCPS::DCPS_debug_level > 5) {
02047       ACE_DEBUG((LM_DEBUG, "(%P|%t) Sedp::Task::svc done with message\n"));
02048     }
02049   }
02050   if (DCPS::DCPS_debug_level > 3) {
02051     ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) Sedp::Task::svc - ")
02052                         ACE_TEXT("Task exiting.\n")));
02053   }
02054   return 0;
02055 }

void OpenDDS::RTPS::Sedp::Task::svc_i ( Msg::MsgType  which_bit,
const DDS::InstanceHandle_t  bit_ih 
) [private]

Definition at line 657 of file Sedp.cpp.

References DDS::HANDLE_NIL, OpenDDS::RTPS::Sedp::Msg::MSG_REMOVE_FROM_PUB_BIT, OpenDDS::RTPS::Sedp::Msg::MSG_REMOVE_FROM_SUB_BIT, DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE, OpenDDS::RTPS::Sedp::pub_bit(), sedp_, and OpenDDS::RTPS::Sedp::sub_bit().

00658 {
00659 #ifndef DDS_HAS_MINIMUM_BIT
00660   switch (which_bit) {
00661   case Msg::MSG_REMOVE_FROM_PUB_BIT: {
00662     DDS::PublicationBuiltinTopicDataDataReaderImpl* bit = sedp_->pub_bit();
00663     // bit may be null if the DomainParticipant is shutting down
00664     if (bit && bit_ih != DDS::HANDLE_NIL) {
00665       bit->set_instance_state(bit_ih,
00666                               DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE);
00667     }
00668     break;
00669   }
00670   case Msg::MSG_REMOVE_FROM_SUB_BIT: {
00671     DDS::SubscriptionBuiltinTopicDataDataReaderImpl* bit = sedp_->sub_bit();
00672     // bit may be null if the DomainParticipant is shutting down
00673     if (bit && bit_ih != DDS::HANDLE_NIL) {
00674       bit->set_instance_state(bit_ih,
00675                               DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE);
00676     }
00677     break;
00678   }
00679   default:
00680     break;
00681   }
00682 #else
00683   ACE_UNUSED_ARG(which_bit);
00684   ACE_UNUSED_ARG(bit_ih);
00685 #endif /* DDS_HAS_MINIMUM_BIT */
00686 }

void OpenDDS::RTPS::Sedp::Task::svc_i ( DCPS::MessageId  id,
const ParticipantMessageData data 
) [private]

Definition at line 1276 of file Sedp.cpp.

References OpenDDS::RTPS::Sedp::data_received(), and sedp_.

01278 {
01279   ACE_Auto_Basic_Ptr<const ParticipantMessageData> delete_the_data(data);
01280   sedp_->data_received(message_id, *data);
01281 }

void OpenDDS::RTPS::Sedp::Task::svc_i ( DCPS::MessageId  id,
const OpenDDS::DCPS::DiscoveredReaderData rdata 
) [private]

Definition at line 1077 of file Sedp.cpp.

References OpenDDS::RTPS::Sedp::data_received(), and sedp_.

01079 {
01080   ACE_Auto_Basic_Ptr<const OpenDDS::DCPS::DiscoveredReaderData> delete_the_data(prdata);
01081   sedp_->data_received(message_id, *prdata);
01082 }

void OpenDDS::RTPS::Sedp::Task::svc_i ( DCPS::MessageId  id,
const OpenDDS::DCPS::DiscoveredWriterData wdata 
) [private]

Definition at line 917 of file Sedp.cpp.

References OpenDDS::RTPS::Sedp::data_received(), and sedp_.

00919 {
00920   ACE_Auto_Basic_Ptr<const OpenDDS::DCPS::DiscoveredWriterData> delete_the_data(pwdata);
00921   sedp_->data_received(message_id, *pwdata);
00922 }

void OpenDDS::RTPS::Sedp::Task::svc_i ( const SPDPdiscoveredParticipantData pdata  )  [private]

Definition at line 438 of file Sedp.cpp.

References OpenDDS::RTPS::Sedp::Writer::assoc(), OpenDDS::RTPS::Sedp::associated_participants_, OpenDDS::RTPS::ParticipantProxy_t::availableBuiltinEndpoints, OpenDDS::RTPS::BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_READER, OpenDDS::RTPS::create_association_data_proto(), OpenDDS::RTPS::Sedp::data_received(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::RTPS::Sedp::defer_match_endpoints_, OpenDDS::RTPS::Sedp::deferred_publications_, OpenDDS::RTPS::Sedp::deferred_subscriptions_, OpenDDS::RTPS::DISC_BUILTIN_ENDPOINT_PUBLICATION_DETECTOR, OpenDDS::RTPS::DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_DETECTOR, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::discovered_publications_, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::discovered_subscriptions_, OpenDDS::DCPS::ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_READER, OpenDDS::DCPS::ENTITYID_PARTICIPANT, OpenDDS::DCPS::ENTITYID_SEDP_BUILTIN_PUBLICATIONS_READER, OpenDDS::DCPS::ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_READER, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::lock_, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::match_endpoints(), OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::OPENDDS_MAP(), OPENDDS_STRING, OpenDDS::RTPS::Sedp::participant_message_writer_, OpenDDS::RTPS::SPDPdiscoveredParticipantData::participantProxy, OpenDDS::RTPS::Sedp::publications_writer_, OpenDDS::DCPS::AssociationData::remote_id_, sedp_, OpenDDS::RTPS::Spdp::shutting_down(), spdp_, OpenDDS::RTPS::Sedp::subscriptions_writer_, OpenDDS::RTPS::Sedp::write_durable_participant_message_data(), OpenDDS::RTPS::Sedp::write_durable_publication_data(), and OpenDDS::RTPS::Sedp::write_durable_subscription_data().

Referenced by svc().

00439 {
00440   ACE_Auto_Basic_Ptr<const SPDPdiscoveredParticipantData> delete_the_data(ppdata);
00441   const SPDPdiscoveredParticipantData& pdata = *ppdata;
00442   // First create a 'prototypical' instance of AssociationData.  It will
00443   // be copied and modified for each of the (up to) four SEDP Endpoints.
00444   DCPS::AssociationData proto;
00445   create_association_data_proto(proto, pdata);
00446 
00447   const BuiltinEndpointSet_t& avail =
00448     pdata.participantProxy.availableBuiltinEndpoints;
00449 
00450   // See RTPS v2.1 section 8.5.5.1
00451   if (avail & DISC_BUILTIN_ENDPOINT_PUBLICATION_DETECTOR) {
00452     DCPS::AssociationData peer = proto;
00453     peer.remote_id_.entityId = ENTITYID_SEDP_BUILTIN_PUBLICATIONS_READER;
00454     sedp_->publications_writer_.assoc(peer);
00455   }
00456   if (avail & DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_DETECTOR) {
00457     DCPS::AssociationData peer = proto;
00458     peer.remote_id_.entityId = ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_READER;
00459     sedp_->subscriptions_writer_.assoc(peer);
00460   }
00461   if (avail & BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_READER) {
00462     DCPS::AssociationData peer = proto;
00463     peer.remote_id_.entityId = ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_READER;
00464     sedp_->participant_message_writer_.assoc(peer);
00465   }
00466 
00467   //FUTURE: if/when topic propagation is supported, add it here
00468 
00469   // Process deferred publications and subscriptions.
00470   for (DeferredSubscriptionMap::iterator pos = sedp_->deferred_subscriptions_.lower_bound (proto.remote_id_),
00471          limit = sedp_->deferred_subscriptions_.upper_bound (proto.remote_id_);
00472        pos != limit;
00473        /* Increment in body. */) {
00474     sedp_->data_received (pos->second.first, pos->second.second);
00475     sedp_->deferred_subscriptions_.erase (pos++);
00476   }
00477   for (DeferredPublicationMap::iterator pos = sedp_->deferred_publications_.lower_bound (proto.remote_id_),
00478          limit = sedp_->deferred_publications_.upper_bound (proto.remote_id_);
00479        pos != limit;
00480        /* Increment in body. */) {
00481     sedp_->data_received (pos->second.first, pos->second.second);
00482     sedp_->deferred_publications_.erase (pos++);
00483   }
00484 
00485   ACE_GUARD(ACE_Thread_Mutex, g, sedp_->lock_);
00486   if (spdp_->shutting_down()) { return; }
00487 
00488   proto.remote_id_.entityId = ENTITYID_PARTICIPANT;
00489   sedp_->associated_participants_.insert(proto.remote_id_);
00490 
00491   // Write durable data
00492   if (avail & DISC_BUILTIN_ENDPOINT_PUBLICATION_DETECTOR) {
00493     proto.remote_id_.entityId = ENTITYID_SEDP_BUILTIN_PUBLICATIONS_READER;
00494     sedp_->write_durable_publication_data(proto.remote_id_);
00495   }
00496   if (avail & DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_DETECTOR) {
00497     proto.remote_id_.entityId = ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_READER;
00498     sedp_->write_durable_subscription_data(proto.remote_id_);
00499   }
00500   if (avail & BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_READER) {
00501     proto.remote_id_.entityId = ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_READER;
00502     sedp_->write_durable_participant_message_data(proto.remote_id_);
00503   }
00504 
00505   for (DCPS::RepoIdSet::iterator it = sedp_->defer_match_endpoints_.begin();
00506        it != sedp_->defer_match_endpoints_.end(); /*incremented in body*/) {
00507     if (0 == std::memcmp(it->guidPrefix, proto.remote_id_.guidPrefix,
00508                          sizeof(GuidPrefix_t))) {
00509       OPENDDS_STRING topic;
00510       if (it->entityId.entityKind & 4) {
00511         DiscoveredSubscriptionIter dsi =
00512           sedp_->discovered_subscriptions_.find(*it);
00513         if (dsi != sedp_->discovered_subscriptions_.end()) {
00514           topic = dsi->second.reader_data_.ddsSubscriptionData.topic_name;
00515         }
00516       } else {
00517         DiscoveredPublicationIter dpi =
00518           sedp_->discovered_publications_.find(*it);
00519         if (dpi != sedp_->discovered_publications_.end()) {
00520           topic = dpi->second.writer_data_.ddsPublicationData.topic_name;
00521         }
00522       }
00523       if (DCPS::DCPS_debug_level > 3) {
00524         ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Sedp::AssociateTask::svc - ")
00525           ACE_TEXT("processing deferred endpoints for topic %C\n"),
00526           topic.c_str()));
00527       }
00528       if (!topic.empty()) {
00529         OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator ti =
00530           sedp_->topics_.find(topic);
00531         if (ti != sedp_->topics_.end()) {
00532           if (DCPS::DCPS_debug_level > 3) {
00533             DCPS::GuidConverter conv(*it);
00534             ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Sedp::AssociateTask::svc - ")
00535               ACE_TEXT("calling match_endpoints %C\n"),
00536               OPENDDS_STRING(conv).c_str()));
00537           }
00538           sedp_->match_endpoints(*it, ti->second);
00539           if (spdp_->shutting_down()) { return; }
00540         }
00541       }
00542       sedp_->defer_match_endpoints_.erase(it++);
00543     } else {
00544       ++it;
00545     }
00546   }
00547 }


Member Data Documentation

Sedp* OpenDDS::RTPS::Sedp::Task::sedp_ [private]

Definition at line 273 of file Sedp.h.

Referenced by svc_i().

bool OpenDDS::RTPS::Sedp::Task::shutting_down_ [private]

Definition at line 274 of file Sedp.h.

Referenced by shutdown().

Spdp* OpenDDS::RTPS::Sedp::Task::spdp_ [private]

Definition at line 272 of file Sedp.h.

Referenced by enqueue(), svc(), and svc_i().


The documentation for this struct was generated from the following files:
Generated on Fri Feb 12 20:06:50 2016 for OpenDDS by  doxygen 1.4.7