DataLink.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
00009 #include "DataLink.h"
00010 
00011 #include "ReceivedDataSample.h"
00012 
00013 #include "TransportImpl.h"
00014 #include "TransportInst.h"
00015 #include "TransportClient.h"
00016 
00017 #include "dds/DCPS/DataWriterImpl.h"
00018 #include "dds/DCPS/DataReaderImpl.h"
00019 #include "dds/DCPS/Service_Participant.h"
00020 #include "dds/DCPS/GuidConverter.h"
00021 #include "dds/DdsDcpsGuidTypeSupportImpl.h"
00022 #include "dds/DCPS/Util.h"
00023 #include "dds/DCPS/Definitions.h"
00024 #include "dds/DCPS/SafetyProfileStreams.h"
00025 
00026 #include "EntryExit.h"
00027 #include "tao/debug.h"
00028 #include "ace/Reactor.h"
00029 #include "ace/SOCK.h"
00030 
00031 
00032 #if !defined (__ACE_INLINE__)
00033 #include "DataLink.inl"
00034 #endif /* __ACE_INLINE__ */
00035 
00036 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00037 
00038 namespace OpenDDS {
00039 namespace DCPS {
00040 
00041 /// Only called by our TransportImpl object.
00042 DataLink::DataLink(TransportImpl& impl, Priority priority, bool is_loopback,
00043                    bool is_active)
00044   : stopped_(false),
00045     scheduled_to_stop_at_(ACE_Time_Value::zero),
00046     impl_(impl),
00047     transport_priority_(priority),
00048     scheduling_release_(false),
00049     is_loopback_(is_loopback),
00050     is_active_(is_active),
00051     started_(false),
00052     send_response_listener_("DataLink")
00053 {
00054   DBG_ENTRY_LVL("DataLink", "DataLink", 6);
00055 
00056   datalink_release_delay_.sec(impl.config().datalink_release_delay_ / 1000);
00057   datalink_release_delay_.usec(impl.config().datalink_release_delay_ % 1000 * 1000);
00058 
00059   id_ = DataLink::get_next_datalink_id();
00060 
00061   if (impl.config().thread_per_connection_) {
00062     this->thr_per_con_send_task_.reset(new ThreadPerConnectionSendTask(this));
00063 
00064     if (this->thr_per_con_send_task_->open() == -1) {
00065       ACE_ERROR((LM_ERROR,
00066                  ACE_TEXT("(%P|%t) DataLink::DataLink: ")
00067                  ACE_TEXT("failed to open ThreadPerConnectionSendTask\n")));
00068 
00069     } else if (DCPS_debug_level > 4) {
00070       ACE_DEBUG((LM_DEBUG,
00071                  ACE_TEXT("(%P|%t) DataLink::DataLink - ")
00072                  ACE_TEXT("started new thread to send data with.\n")));
00073     }
00074   }
00075 
00076   // Initialize transport control sample allocators:
00077   size_t control_chunks = impl.config().datalink_control_chunks_;
00078 
00079   this->mb_allocator_.reset(new MessageBlockAllocator(control_chunks));
00080   this->db_allocator_.reset(new DataBlockAllocator(control_chunks));
00081 }
00082 
00083 DataLink::~DataLink()
00084 {
00085   DBG_ENTRY_LVL("DataLink", "~DataLink", 6);
00086 
00087   if (!assoc_by_local_.empty()) {
00088     ACE_DEBUG((LM_WARNING,
00089                ACE_TEXT("(%P|%t) WARNING: DataLink[%@]::~DataLink() - ")
00090                ACE_TEXT("link still in use by %d entities when deleted!\n"),
00091                this, assoc_by_local_.size()));
00092   }
00093 
00094   if (this->thr_per_con_send_task_ != 0) {
00095     this->thr_per_con_send_task_->close(1);
00096   }
00097 }
00098 
00099 TransportImpl&
00100 DataLink::impl() const
00101 {
00102   return impl_;
00103 }
00104 
00105 
00106 bool
00107 DataLink::add_on_start_callback(const TransportClient_wrch& client, const RepoId& remote)
00108 {
00109   GuardType guard(strategy_lock_);
00110 
00111   if (started_ && !send_strategy_.is_nil()) {
00112     return false; // link already started
00113   }
00114   on_start_callbacks_.push_back(std::make_pair(client, remote));
00115   return true;
00116 }
00117 
00118 void
00119 DataLink::remove_on_start_callback(const TransportClient_wrch& client, const RepoId& remote)
00120 {
00121   GuardType guard(strategy_lock_);
00122   on_start_callbacks_.erase(
00123     std::remove(on_start_callbacks_.begin(),
00124                 on_start_callbacks_.end(),
00125                 std::make_pair(client, remote)),
00126     on_start_callbacks_.end());
00127 }
00128 
00129 
00130 void
00131 DataLink::invoke_on_start_callbacks(bool success)
00132 {
00133   const DataLink_rch link(success ? this : 0, inc_count());
00134 
00135   while (true) {
00136     GuardType guard(strategy_lock_);
00137 
00138     if (on_start_callbacks_.empty()) {
00139       break;
00140     }
00141 
00142     OnStartCallback last_callback = on_start_callbacks_.back();
00143     on_start_callbacks_.pop_back();
00144 
00145     guard.release();
00146     TransportClient_rch client = last_callback.first.lock();
00147     if (client)
00148       client->use_datalink(last_callback.second, link);
00149   }
00150 }
00151 
00152 //Reactor invokes this after being notified in schedule_stop or cancel_release
00153 int
00154 DataLink::handle_exception(ACE_HANDLE /* fd */)
00155 {
00156   if(this->scheduled_to_stop_at_ == ACE_Time_Value::zero) {
00157     if (DCPS_debug_level > 0) {
00158       ACE_DEBUG((LM_DEBUG,
00159                  ACE_TEXT("(%P|%t) DataLink::handle_exception() - not scheduling or stopping\n")));
00160     }
00161     ACE_Reactor_Timer_Interface* reactor = impl_.timer();
00162     if (reactor->cancel_timer(this) > 0) {
00163       if (DCPS_debug_level > 0) {
00164         ACE_DEBUG((LM_DEBUG,
00165                    ACE_TEXT("(%P|%t) DataLink::handle_exception() - cancelled future release timer\n")));
00166       }
00167     }
00168     return 0;
00169   } else if (this->scheduled_to_stop_at_ <= ACE_OS::gettimeofday()) {
00170     if (this->scheduling_release_) {
00171       if (DCPS_debug_level > 0) {
00172         ACE_DEBUG((LM_DEBUG,
00173                    ACE_TEXT("(%P|%t) DataLink::handle_exception() - delay already elapsed so handle_timeout now\n")));
00174       }
00175       this->handle_timeout(ACE_Time_Value::zero, 0);
00176       return 0;
00177     }
00178     if (DCPS_debug_level > 0) {
00179       ACE_DEBUG((LM_DEBUG,
00180                  ACE_TEXT("(%P|%t) DataLink::handle_exception() - stopping now\n")));
00181     }
00182     this->stop();
00183     return 0;
00184   } else /* SCHEDULE TO STOP IN THE FUTURE*/ {
00185     if (DCPS_debug_level > 0) {
00186       ACE_DEBUG((LM_DEBUG,
00187                  ACE_TEXT("(%P|%t) DataLink::handle_exception() - (delay) scheduling timer for future release\n")));
00188     }
00189     ACE_Reactor_Timer_Interface* reactor = impl_.timer();
00190     ACE_Time_Value future_release_time = this->scheduled_to_stop_at_ - ACE_OS::gettimeofday();
00191     reactor->schedule_timer(this, 0, future_release_time);
00192   }
00193   return 0;
00194 }
00195 
00196 //Allows DataLink::stop to be done on the reactor thread so that
00197 //this thread avoids possibly deadlocking trying to access reactor
00198 //to stop strategies or schedule timers
00199 void
00200 DataLink::schedule_stop(const ACE_Time_Value& schedule_to_stop_at)
00201 {
00202   if (!this->stopped_ && this->scheduled_to_stop_at_ == ACE_Time_Value::zero) {
00203     this->scheduled_to_stop_at_ = schedule_to_stop_at;
00204     notify_reactor();
00205     // reactor will invoke our DataLink::handle_exception()
00206   } else {
00207     if (DCPS_debug_level > 0) {
00208       ACE_DEBUG((LM_DEBUG,
00209                  ACE_TEXT("(%P|%t) DataLink::schedule_stop() - Already stopped or already scheduled for stop\n")));
00210     }
00211   }
00212 }
00213 
00214 void
00215 DataLink::notify_reactor()
00216 {
00217   TransportReactorTask_rch reactor(impl_.reactor_task());
00218   reactor->get_reactor()->notify(this);
00219 }
00220 
00221 void
00222 DataLink::stop()
00223 {
00224   this->pre_stop_i();
00225 
00226   TransportSendStrategy_rch send_strategy;
00227   TransportStrategy_rch recv_strategy;
00228 
00229   {
00230     GuardType guard(this->strategy_lock_);
00231 
00232     if (this->stopped_) return;
00233 
00234     send_strategy = this->send_strategy_;
00235     this->send_strategy_.reset();
00236 
00237     recv_strategy = this->receive_strategy_;
00238     this->receive_strategy_.reset();
00239   }
00240 
00241   if (!send_strategy.is_nil()) {
00242     send_strategy->stop();
00243   }
00244 
00245   if (!recv_strategy.is_nil()) {
00246     recv_strategy->stop();
00247   }
00248 
00249   this->stop_i();
00250   this->stopped_ = true;
00251   this->scheduled_to_stop_at_ = ACE_Time_Value::zero;
00252 }
00253 
00254 void
00255 DataLink::resume_send()
00256 {
00257   if (!this->send_strategy_->isDirectMode())
00258     this->send_strategy_->resume_send();
00259 }
00260 
00261 int
00262 DataLink::make_reservation(const RepoId& remote_subscription_id,
00263                            const RepoId& local_publication_id,
00264                            const TransportSendListener_wrch& send_listener)
00265 {
00266   DBG_ENTRY_LVL("DataLink", "make_reservation", 6);
00267 
00268   if (DCPS_debug_level > 9) {
00269     GuidConverter local(local_publication_id), remote(remote_subscription_id);
00270     ACE_DEBUG((LM_DEBUG,
00271                ACE_TEXT("(%P|%t) DataLink::make_reservation() - ")
00272                ACE_TEXT("creating association local publication  %C ")
00273                ACE_TEXT("<--> with remote subscription %C.\n"),
00274                OPENDDS_STRING(local).c_str(),
00275                OPENDDS_STRING(remote).c_str()));
00276   }
00277 
00278   {
00279     GuardType guard(strategy_lock_);
00280 
00281     if (!send_strategy_.is_nil()) {
00282       send_strategy_->link_released(false);
00283     }
00284   }
00285   {
00286     GuardType guard(pub_sub_maps_lock_);
00287 
00288     assoc_by_local_[local_publication_id].insert(remote_subscription_id);
00289     ReceiveListenerSet_rch& rls = assoc_by_remote_[remote_subscription_id];
00290 
00291     if (rls.is_nil())
00292       rls = make_rch<ReceiveListenerSet>();
00293     rls->insert(local_publication_id, TransportReceiveListener_rch());
00294 
00295     send_listeners_.insert(std::make_pair(local_publication_id,send_listener));
00296   }
00297   return 0;
00298 }
00299 
00300 int
00301 DataLink::make_reservation(const RepoId& remote_publication_id,
00302                            const RepoId& local_subscription_id,
00303                            const TransportReceiveListener_wrch& receive_listener)
00304 {
00305   DBG_ENTRY_LVL("DataLink", "make_reservation", 6);
00306 
00307   if (DCPS_debug_level > 9) {
00308     GuidConverter local(local_subscription_id), remote(remote_publication_id);
00309     ACE_DEBUG((LM_DEBUG,
00310                ACE_TEXT("(%P|%t) DataLink::make_reservation() - ")
00311                ACE_TEXT("creating association local subscription %C ")
00312                ACE_TEXT("<--> with remote publication  %C.\n"),
00313                OPENDDS_STRING(local).c_str(), OPENDDS_STRING(remote).c_str()));
00314   }
00315 
00316   {
00317     GuardType guard(strategy_lock_);
00318 
00319     if (!send_strategy_.is_nil()) {
00320       send_strategy_->link_released(false);
00321     }
00322   }
00323   {
00324     GuardType guard(pub_sub_maps_lock_);
00325 
00326     assoc_by_local_[local_subscription_id].insert(remote_publication_id);
00327     ReceiveListenerSet_rch& rls = assoc_by_remote_[remote_publication_id];
00328 
00329     if (rls.is_nil())
00330       rls = make_rch<ReceiveListenerSet>();
00331     rls->insert(local_subscription_id, receive_listener);
00332 
00333     recv_listeners_.insert(std::make_pair(local_subscription_id,
00334                                           receive_listener));
00335   }
00336   return 0;
00337 }
00338 
00339 template <typename Seq>
00340 void set_to_seq(const RepoIdSet& rids, Seq& seq)
00341 {
00342   seq.length(static_cast<CORBA::ULong>(rids.size()));
00343   CORBA::ULong i = 0;
00344   for (RepoIdSet::const_iterator iter = rids.begin(); iter != rids.end(); ++iter) {
00345     seq[i++] = *iter;
00346   }
00347 }
00348 
00349 GUIDSeq*
00350 DataLink::peer_ids(const RepoId& local_id) const
00351 {
00352   GuardType guard(pub_sub_maps_lock_);
00353 
00354   const AssocByLocal::const_iterator iter = assoc_by_local_.find(local_id);
00355 
00356   if (iter == assoc_by_local_.end())
00357     return 0;
00358 
00359   GUIDSeq_var result = new GUIDSeq;
00360   set_to_seq(iter->second, static_cast<GUIDSeq&>(result));
00361   return result._retn();
00362 }
00363 
00364 /// This gets invoked when a TransportClient::remove_associations()
00365 /// call has been made.  Because this DataLink can be shared amongst
00366 /// different TransportClient objects, and different threads could
00367 /// be "managing" the different TransportClient objects, we need
00368 /// to make sure that this release_reservations() works in conjunction
00369 /// with a simultaneous call (in another thread) to one of this
00370 /// DataLink's make_reservation() methods.
00371 void
00372 DataLink::release_reservations(RepoId remote_id, RepoId local_id,
00373                                DataLinkSetMap& released_locals)
00374 {
00375   DBG_ENTRY_LVL("DataLink", "release_reservations", 6);
00376 
00377   if (DCPS_debug_level > 9) {
00378     GuidConverter local(local_id);
00379     GuidConverter remote(remote_id);
00380     ACE_DEBUG((LM_DEBUG,
00381                ACE_TEXT("(%P|%t) DataLink::release_reservations() - ")
00382                ACE_TEXT("releasing association local: %C ")
00383                ACE_TEXT("<--> with remote %C.\n"),
00384                OPENDDS_STRING(local).c_str(),
00385                OPENDDS_STRING(remote).c_str()));
00386   }
00387 
00388   //let the specific class release its reservations
00389   //done this way to prevent deadlock of holding pub_sub_maps_lock_
00390   //then obtaining a specific class lock in release_reservations_i
00391   //which reverses lock ordering of the active send logic of needing
00392   //the specific class lock before obtaining the over arching DataLink
00393   //pub_sub_maps_lock_
00394   this->release_reservations_i(remote_id, local_id);
00395 
00396   bool release_remote_required = false;
00397   {
00398     GuardType guard(this->pub_sub_maps_lock_);
00399 
00400     ReceiveListenerSet_rch& rls = assoc_by_remote_[remote_id];
00401     if (rls->size() == 1) {
00402       assoc_by_remote_.erase(remote_id);
00403       release_remote_required = true;
00404     } else {
00405       rls->remove(local_id);
00406     }
00407     RepoIdSet& ris = assoc_by_local_[local_id];
00408     if (ris.size() == 1) {
00409       DataLinkSet_rch& links = released_locals[local_id];
00410       if (links.is_nil())
00411         links = make_rch<DataLinkSet>();
00412       links->insert_link(rchandle_from(this));
00413       assoc_by_local_.erase(local_id);
00414     } else {
00415       ris.erase(remote_id);
00416     }
00417 
00418     if (assoc_by_local_.empty()) {
00419       VDBG_LVL((LM_DEBUG,
00420                 ACE_TEXT("(%P|%t) DataLink::release_reservations: ")
00421                 ACE_TEXT("release_datalink due to no remaining pubs or subs.\n")), 5);
00422 
00423       impl_.release_datalink(this);
00424     }
00425   }
00426   if (release_remote_required)
00427     release_remote_i(remote_id);
00428 }
00429 
00430 void
00431 DataLink::schedule_delayed_release()
00432 {
00433   DBG_ENTRY_LVL("DataLink", "schedule_delayed_release", 6);
00434 
00435   VDBG((LM_DEBUG, "(%P|%t) DataLink[%@]::schedule_delayed_release\n", this));
00436 
00437   // The samples have to be removed at this point, otherwise the samples
00438   // can not be delivered when new association is added and still use
00439   // this connection/datalink.
00440   if (!this->send_strategy_.is_nil()) {
00441     this->send_strategy_->clear();
00442   }
00443 
00444   ACE_Time_Value future_release_time = ACE_OS::gettimeofday() + this->datalink_release_delay_;
00445   this->schedule_stop(future_release_time);
00446 }
00447 
00448 bool
00449 DataLink::cancel_release()
00450 {
00451   DBG_ENTRY_LVL("DataLink", "cancel_release", 6);
00452   if (stopped_) {
00453     if (DCPS_debug_level > 0) {
00454       ACE_DEBUG((LM_DEBUG, "(%P|%t) DataLink::cancel_release - link[%@] already stopped_ cannot cancel release\n", this));
00455     }
00456     return false;
00457   }
00458   if (scheduling_release_) {
00459     if (DCPS_debug_level > 0) {
00460       ACE_DEBUG((LM_DEBUG, "(%P|%t) DataLink::cancel_release - link[%@] currently scheduling release, notify reactor of cancel\n", this));
00461     }
00462     this->set_scheduling_release(false);
00463     this->scheduled_to_stop_at_ = ACE_Time_Value::zero;
00464     notify_reactor();
00465   }
00466   return true;
00467 }
00468 
00469 void
00470 DataLink::stop_i()
00471 {
00472   DBG_ENTRY_LVL("DataLink", "stop_i", 6);
00473 }
00474 
00475 ACE_Message_Block*
00476 DataLink::create_control(char submessage_id,
00477                          DataSampleHeader& header,
00478                          Message_Block_Ptr data)
00479 {
00480   DBG_ENTRY_LVL("DataLink", "create_control", 6);
00481 
00482   header.byte_order_ = ACE_CDR_BYTE_ORDER;
00483   header.message_id_ = TRANSPORT_CONTROL;
00484   header.submessage_id_ = submessage_id;
00485   header.message_length_ = static_cast<ACE_UINT32>(data->total_length());
00486 
00487   ACE_Message_Block* message = 0;
00488   ACE_NEW_MALLOC_RETURN(message,
00489                         static_cast<ACE_Message_Block*>(
00490                           this->mb_allocator_->malloc(sizeof(ACE_Message_Block))),
00491                         ACE_Message_Block(header.max_marshaled_size(),
00492                                           ACE_Message_Block::MB_DATA,
00493                                           data.release(),
00494                                           0,  // data
00495                                           0,  // allocator_strategy
00496                                           0,  // locking_strategy
00497                                           ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
00498                                           ACE_Time_Value::zero,
00499                                           ACE_Time_Value::max_time,
00500                                           this->db_allocator_.get(),
00501                                           this->mb_allocator_.get()),
00502                         0);
00503 
00504   if (!(*message << header)) {
00505     ACE_ERROR((LM_ERROR,
00506                ACE_TEXT("(%P|%t) DataLink::create_control: ")
00507                ACE_TEXT("cannot put header in message\n")));
00508     ACE_DES_FREE(message, this->mb_allocator_->free, ACE_Message_Block);
00509     message = 0;
00510   }
00511 
00512   return message;
00513 }
00514 
00515 SendControlStatus
00516 DataLink::send_control(const DataSampleHeader& header, Message_Block_Ptr message)
00517 {
00518   DBG_ENTRY_LVL("DataLink", "send_control", 6);
00519 
00520   TransportSendControlElement* const elem = new TransportSendControlElement(1, // initial_count
00521                                        GUID_UNKNOWN, &send_response_listener_,
00522                                        header, move(message));
00523 
00524   send_response_listener_.track_message();
00525 
00526   RepoId senderId(header.publication_id_);
00527   send_start();
00528   send(elem);
00529   send_stop(senderId);
00530 
00531   return SEND_CONTROL_OK;
00532 }
00533 
00534 /// This method will "deliver" the sample to all TransportReceiveListeners
00535 /// within this DataLink that are interested in the (remote) publisher id
00536 /// that sent the sample.
00537 int
00538 DataLink::data_received(ReceivedDataSample& sample,
00539                         const RepoId& readerId /* = GUID_UNKNOWN */)
00540 {
00541   data_received_i(sample, readerId, RepoIdSet(), ReceiveListenerSet::SET_EXCLUDED);
00542   return 0;
00543 }
00544 
00545 void
00546 DataLink::data_received_include(ReceivedDataSample& sample, const RepoIdSet& incl)
00547 {
00548   data_received_i(sample, GUID_UNKNOWN, incl, ReceiveListenerSet::SET_INCLUDED);
00549 }
00550 
00551 void
00552 DataLink::data_received_i(ReceivedDataSample& sample,
00553                           const RepoId& readerId,
00554                           const RepoIdSet& incl_excl,
00555                           ReceiveListenerSet::ConstrainReceiveSet constrain)
00556 {
00557   DBG_ENTRY_LVL("DataLink", "data_received_i", 6);
00558   // Which remote publication sent this message?
00559   const RepoId& publication_id = sample.header_.publication_id_;
00560 
00561   // Locate the set of TransportReceiveListeners associated with this
00562   // DataLink that are interested in hearing about any samples received
00563   // from the remote publisher_id.
00564   if (DCPS_debug_level > 9) {
00565     const GuidConverter converter(publication_id);
00566     const GuidConverter reader(readerId);
00567     ACE_DEBUG((LM_DEBUG,
00568                ACE_TEXT("(%P|%t) DataLink::data_received_i: ")
00569                ACE_TEXT("from publication %C received sample: %C to readerId %C (%C).\n"),
00570                OPENDDS_STRING(converter).c_str(),
00571                to_string(sample.header_).c_str(),
00572                OPENDDS_STRING(reader).c_str(),
00573                constrain == ReceiveListenerSet::SET_EXCLUDED ? "SET_EXCLUDED" : "SET_INCLUDED"));
00574   }
00575 
00576   if (Transport_debug_level > 9) {
00577     const GuidConverter converter(publication_id);
00578     ACE_DEBUG((LM_DEBUG,
00579                ACE_TEXT("(%P|%t) DataLink::data_received_i: ")
00580                ACE_TEXT("from publication %C received sample: %C.\n"),
00581                OPENDDS_STRING(converter).c_str(),
00582                to_string(sample.header_).c_str()));
00583   }
00584 
00585   ReceiveListenerSet_rch listener_set;
00586   {
00587     GuardType guard(this->pub_sub_maps_lock_);
00588     AssocByRemote::iterator iter = assoc_by_remote_.find(publication_id);
00589     if (iter != assoc_by_remote_.end())
00590       listener_set = iter->second;
00591 
00592     if (listener_set.is_nil()) {
00593       TransportReceiveListener_rch listener = this->default_listener_.lock();
00594       if (listener)
00595         listener->data_received(sample);
00596       return;
00597     }
00598   }
00599 
00600   if (listener_set.is_nil()) {
00601     // Nobody has any interest in this message.  Drop it on the floor.
00602     if (Transport_debug_level > 4) {
00603       const GuidConverter converter(publication_id);
00604       ACE_DEBUG((LM_DEBUG,
00605                  ACE_TEXT("(%P|%t) DataLink::data_received_i: ")
00606                  ACE_TEXT(" discarding sample from publication %C due to no listeners.\n"),
00607                  OPENDDS_STRING(converter).c_str()));
00608     }
00609 
00610     return;
00611   }
00612 
00613   if (readerId != GUID_UNKNOWN) {
00614     listener_set->data_received(sample, readerId);
00615     return;
00616   }
00617 
00618 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00619 
00620   if (sample.header_.content_filter_
00621       && sample.header_.content_filter_entries_.length()) {
00622     ReceiveListenerSet subset(*listener_set.in());
00623     subset.remove_all(sample.header_.content_filter_entries_);
00624     subset.data_received(sample, incl_excl, constrain);
00625 
00626   } else {
00627 #endif // OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00628 
00629     if (DCPS_debug_level > 9) {
00630       // Just get the set to do our dirty work by having it iterate over its
00631       // collection of TransportReceiveListeners, and invoke the data_received()
00632       // method on each one.
00633       OPENDDS_STRING included_ids;
00634       bool first = true;
00635       RepoIdSet::const_iterator iter = incl_excl.begin();
00636       while(iter != incl_excl.end()) {
00637         included_ids += (first ? "" : "\n") + OPENDDS_STRING(GuidConverter(*iter));
00638         first = false;
00639         ++iter;
00640       }
00641       ACE_DEBUG((LM_DEBUG, "(%P|%t) DataLink::data_received_i - normal data received to each subscription in listener_set %C ids:%C\n",
00642                  constrain == ReceiveListenerSet::SET_EXCLUDED ? "exclude" : "include", included_ids.c_str()));
00643     }
00644     listener_set->data_received(sample, incl_excl, constrain);
00645 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00646   }
00647 
00648 #endif // OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00649 }
00650 
00651 // static
00652 ACE_UINT64
00653 DataLink::get_next_datalink_id()
00654 {
00655   static ACE_UINT64 next_id = 0;
00656   static LockType lock;
00657 
00658   ACE_UINT64 id;
00659   {
00660     GuardType guard(lock);
00661     id = next_id++;
00662 
00663     if (0 == next_id) {
00664       ACE_ERROR((LM_ERROR,
00665                  ACE_TEXT("ERROR: DataLink::get_next_datalink_id: ")
00666                  ACE_TEXT("has rolled over and is reusing ids!\n")));
00667     }
00668   }
00669 
00670   return id;
00671 }
00672 
00673 void
00674 DataLink::transport_shutdown()
00675 {
00676   DBG_ENTRY_LVL("DataLink", "transport_shutdown", 6);
00677 
00678   //this->cancel_release();
00679   this->set_scheduling_release(false);
00680   this->scheduled_to_stop_at_ = ACE_Time_Value::zero;
00681 
00682   ACE_Reactor_Timer_Interface* reactor = impl_.timer();
00683   reactor->cancel_timer(this);
00684 
00685   this->stop();
00686   // this->send_listeners_.clear();
00687   // this->recv_listeners_.clear();
00688   // Drop our reference to the TransportImpl object
00689 }
00690 
00691 void
00692 DataLink::notify(ConnectionNotice notice)
00693 {
00694   DBG_ENTRY_LVL("DataLink", "notify", 6);
00695 
00696   VDBG((LM_DEBUG,
00697         ACE_TEXT("(%P|%t) DataLink::notify: this(%X) notify %C\n"),
00698         this,
00699         connection_notice_as_str(notice)));
00700 
00701   GuardType guard(this->pub_sub_maps_lock_);
00702 
00703   // Notify the datawriters
00704   // the lost publications due to a connection problem.
00705   for (IdToSendListenerMap::iterator itr = send_listeners_.begin();
00706        itr != send_listeners_.end(); ++itr) {
00707 
00708     TransportSendListener_rch tsl = itr->second.lock();
00709 
00710     if (tsl) {
00711       if (Transport_debug_level > 0) {
00712         GuidConverter converter(itr->first);
00713         ACE_DEBUG((LM_DEBUG,
00714                    ACE_TEXT("(%P|%t) DataLink::notify: ")
00715                    ACE_TEXT("notify pub %C %C.\n"),
00716                    OPENDDS_STRING(converter).c_str(),
00717                    connection_notice_as_str(notice)));
00718       }
00719       AssocByLocal::iterator local_it = assoc_by_local_.find(itr->first);
00720       if (local_it == assoc_by_local_.end()) {
00721         if (Transport_debug_level) {
00722           GuidConverter converter(itr->first);
00723           ACE_DEBUG((LM_DEBUG,
00724                      ACE_TEXT("(%P|%t) DataLink::notify: ")
00725                      ACE_TEXT("try to notify pub %C %C - no associations to notify.\n"),
00726                      OPENDDS_STRING(converter).c_str(),
00727                      connection_notice_as_str(notice)));
00728         }
00729         break;
00730       }
00731       const RepoIdSet& rids = local_it->second;
00732 
00733       ReaderIdSeq subids;
00734       set_to_seq(rids, subids);
00735 
00736       switch (notice) {
00737       case DISCONNECTED:
00738         tsl->notify_publication_disconnected(subids);
00739         break;
00740 
00741       case RECONNECTED:
00742         tsl->notify_publication_reconnected(subids);
00743         break;
00744 
00745       case LOST:
00746         tsl->notify_publication_lost(subids);
00747         break;
00748 
00749       default:
00750         ACE_ERROR((LM_ERROR,
00751                    ACE_TEXT("(%P|%t) ERROR: DataLink::notify: ")
00752                    ACE_TEXT("unknown notice to TransportSendListener\n")));
00753         break;
00754       }
00755 
00756     } else {
00757       if (Transport_debug_level > 0) {
00758         GuidConverter converter(itr->first);
00759         ACE_DEBUG((LM_DEBUG,
00760                    ACE_TEXT("(%P|%t) DataLink::notify: ")
00761                    ACE_TEXT("not notify pub %C %C \n"),
00762                    OPENDDS_STRING(converter).c_str(),
00763                    connection_notice_as_str(notice)));
00764       }
00765     }
00766 
00767   }
00768 
00769   // Notify the datareaders registered with TransportImpl
00770   // the lost subscriptions due to a connection problem.
00771   for (IdToRecvListenerMap::iterator itr = recv_listeners_.begin();
00772        itr != recv_listeners_.end(); ++itr) {
00773 
00774     TransportReceiveListener_rch trl = itr->second.lock();
00775 
00776     if (trl) {
00777       if (Transport_debug_level > 0) {
00778         GuidConverter converter(itr->first);
00779         ACE_DEBUG((LM_DEBUG,
00780                    ACE_TEXT("(%P|%t) DataLink::notify: ")
00781                    ACE_TEXT("notify sub %C %C.\n"),
00782                    OPENDDS_STRING(converter).c_str(),
00783                    connection_notice_as_str(notice)));
00784       }
00785       AssocByLocal::iterator local_it = assoc_by_local_.find(itr->first);
00786       if (local_it == assoc_by_local_.end()) {
00787         if (Transport_debug_level) {
00788           GuidConverter converter(itr->first);
00789           ACE_DEBUG((LM_DEBUG,
00790                      ACE_TEXT("(%P|%t) DataLink::notify: ")
00791                      ACE_TEXT("try to notify sub %C %C - no associations to notify.\n"),
00792                      OPENDDS_STRING(converter).c_str(),
00793                      connection_notice_as_str(notice)));
00794         }
00795         break;
00796       }
00797       const RepoIdSet& rids = local_it->second;
00798 
00799       WriterIdSeq pubids;
00800       set_to_seq(rids, pubids);
00801 
00802       switch (notice) {
00803       case DISCONNECTED:
00804         trl->notify_subscription_disconnected(pubids);
00805         break;
00806 
00807       case RECONNECTED:
00808         trl->notify_subscription_reconnected(pubids);
00809         break;
00810 
00811       case LOST:
00812         trl->notify_subscription_lost(pubids);
00813         break;
00814 
00815       default:
00816         ACE_ERROR((LM_ERROR,
00817                    ACE_TEXT("(%P|%t) ERROR: DataLink::notify: ")
00818                    ACE_TEXT("unknown notice to datareader.\n")));
00819         break;
00820       }
00821 
00822     } else {
00823       if (Transport_debug_level > 0) {
00824         GuidConverter converter(itr->first);
00825         ACE_DEBUG((LM_DEBUG,
00826                    ACE_TEXT("(%P|%t) DataLink::notify: ")
00827                    ACE_TEXT("not notify sub %C subscription lost.\n"),
00828                    OPENDDS_STRING(converter).c_str()));
00829       }
00830 
00831     }
00832   }
00833 }
00834 
00835 
00836 
00837 void
00838 DataLink::pre_stop_i()
00839 {
00840   if (this->thr_per_con_send_task_ != 0) {
00841     this->thr_per_con_send_task_->close(1);
00842   }
00843 }
00844 
00845 bool
00846 DataLink::release_resources()
00847 {
00848   DBG_ENTRY_LVL("DataLink", "release_resources", 6);
00849 
00850   this->prepare_release();
00851   return impl_.release_link_resources(this);
00852 }
00853 
00854 bool
00855 DataLink::is_target(const RepoId& remote_sub_id)
00856 {
00857   GuardType guard(this->pub_sub_maps_lock_);
00858   return assoc_by_remote_.count(remote_sub_id);
00859 }
00860 
00861 GUIDSeq*
00862 DataLink::target_intersection(const RepoId& pub_id, const GUIDSeq& in,
00863                               size_t& n_subs)
00864 {
00865   GUIDSeq_var res;
00866   GuardType guard(this->pub_sub_maps_lock_);
00867   AssocByLocal::const_iterator iter = assoc_by_local_.find(pub_id);
00868 
00869   if (iter != assoc_by_local_.end()) {
00870     n_subs = iter->second.size();
00871     const CORBA::ULong len = in.length();
00872 
00873     for (CORBA::ULong i(0); i < len; ++i) {
00874       if (iter->second.count(in[i])) {
00875         if (res.ptr() == 0) {
00876           res = new GUIDSeq;
00877         }
00878 
00879         push_back(res.inout(), in[i]);
00880       }
00881     }
00882   }
00883 
00884   return res._retn();
00885 }
00886 
00887 void DataLink::prepare_release()
00888 {
00889   GuardType guard(this->pub_sub_maps_lock_);
00890 
00891   if (!assoc_releasing_.empty()) {
00892     ACE_ERROR((LM_ERROR,
00893                ACE_TEXT("(%P|%t) DataLink::prepare_release: ")
00894                ACE_TEXT("already prepared for release.\n")));
00895     return;
00896   }
00897 
00898   assoc_releasing_ = assoc_by_local_;
00899 }
00900 
00901 void DataLink::clear_associations()
00902 {
00903   for (AssocByLocal::iterator iter = assoc_releasing_.begin();
00904        iter != assoc_releasing_.end(); ++iter) {
00905     TransportSendListener_rch tsl = send_listener_for(iter->first);
00906     if (tsl) {
00907       ReaderIdSeq sub_ids;
00908       set_to_seq(iter->second, sub_ids);
00909       tsl->remove_associations(sub_ids, false);
00910       continue;
00911     }
00912     TransportReceiveListener_rch trl = recv_listener_for(iter->first);
00913     if (trl) {
00914       WriterIdSeq pub_ids;
00915       set_to_seq(iter->second, pub_ids);
00916       trl->remove_associations(pub_ids, false);
00917     }
00918   }
00919   assoc_releasing_.clear();
00920 }
00921 
00922 int
00923 DataLink::handle_timeout(const ACE_Time_Value& /*tv*/, const void* /*arg*/)
00924 {
00925   if (this->scheduled_to_stop_at_ != ACE_Time_Value::zero) {
00926     VDBG_LVL((LM_DEBUG, "(%P|%t) DataLink::handle_timeout called\n"), 4);
00927     impl_.unbind_link(this);
00928 
00929     if (assoc_by_remote_.empty() && assoc_by_local_.empty()) {
00930       this->stop();
00931     }
00932   }
00933   return 0;
00934 }
00935 
00936 int
00937 DataLink::handle_close(ACE_HANDLE h, ACE_Reactor_Mask m)
00938 {
00939   if (h == ACE_INVALID_HANDLE && m == TIMER_MASK) {
00940     // Reactor is shutting down with this timer still pending.
00941     // Take the same cleanup actions as if the timeout had expired.
00942     handle_timeout(ACE_Time_Value::zero, 0);
00943   }
00944 
00945   return 0;
00946 }
00947 
00948 void
00949 DataLink::set_dscp_codepoint(int cp, ACE_SOCK& socket)
00950 {
00951   /**
00952    * The following IPV6 code was lifted in spirit from the RTCORBA
00953    * implementation of setting the DiffServ codepoint.
00954    */
00955   int result = 0;
00956 
00957   // Shift the code point up to bits, so that we only use the DS field
00958   int tos = cp << 2;
00959 
00960   const char* which = "IPV4 TOS";
00961 #if defined (ACE_HAS_IPV6)
00962   ACE_INET_Addr local_address;
00963 
00964   if (socket.get_local_addr(local_address) == -1) {
00965     return;
00966 
00967   } else if (local_address.get_type() == AF_INET6)
00968 #if !defined (IPV6_TCLASS)
00969   {
00970     if (DCPS_debug_level > 0) {
00971       ACE_ERROR((LM_ERROR,
00972                  ACE_TEXT("(%P|%t) ERROR: DataLink::set_dscp_codepoint() - ")
00973                  ACE_TEXT("IPV6 TCLASS not supported yet, not setting codepoint %d.\n"),
00974                  cp));
00975     }
00976 
00977     return;
00978   }
00979 
00980 #else /* IPV6_TCLASS */
00981   {
00982     which = "IPV6 TCLASS";
00983     result = socket.set_option(
00984                IPPROTO_IPV6,
00985                IPV6_TCLASS,
00986                &tos,
00987                sizeof(tos));
00988 
00989   } else // This is a bit tricky and might be hard to follow...
00990 
00991 #endif /* IPV6_TCLASS */
00992 #endif /* ACE_HAS_IPV6 */
00993 
00994 #ifdef IP_TOS
00995   result = socket.set_option(
00996              IPPROTO_IP,
00997              IP_TOS,
00998              &tos,
00999              sizeof(tos));
01000 
01001   if ((result == -1) && (errno != ENOTSUP)
01002 #ifdef WSAEINVAL
01003       && (errno != WSAEINVAL)
01004 #endif
01005      ) {
01006 #endif // IP_TOS
01007     ACE_DEBUG((LM_DEBUG,
01008                ACE_TEXT("(%P|%t) DataLink::set_dscp_codepoint() - ")
01009                ACE_TEXT("failed to set the %C codepoint to %d: %m, ")
01010                ACE_TEXT("try running as superuser.\n"),
01011                which,
01012                cp));
01013 #ifdef IP_TOS
01014   } else if (DCPS_debug_level > 4) {
01015     ACE_DEBUG((LM_DEBUG,
01016                ACE_TEXT("(%P|%t) DataLink::set_dscp_codepoint() - ")
01017                ACE_TEXT("set %C codepoint to %d.\n"),
01018                which,
01019                cp));
01020   }
01021 #endif
01022 }
01023 
01024 bool
01025 DataLink::handle_send_request_ack(TransportQueueElement* element)
01026 {
01027   element->data_delivered();
01028   return true;
01029 }
01030 
01031 #ifndef OPENDDS_SAFETY_PROFILE
01032 std::ostream&
01033 operator<<(std::ostream& str, const DataLink& value)
01034 {
01035   str << "   There are " << value.assoc_by_local_.size()
01036       << " local entities currently using this link comprising following associations:"
01037       << std::endl;
01038 
01039   for (DataLink::AssocByLocal::const_iterator
01040        localId = value.assoc_by_local_.begin();
01041        localId != value.assoc_by_local_.end();
01042        ++localId) {
01043     for (RepoIdSet::const_iterator
01044          remoteId = localId->second.begin();
01045          remoteId != localId->second.end();
01046          ++remoteId) {
01047       str << GuidConverter(localId->first) << " --> "
01048           << GuidConverter(*remoteId) << "   " << std::endl;
01049     }
01050   }
01051   return str;
01052 }
01053 #endif
01054 }
01055 }
01056 
01057 OPENDDS_END_VERSIONED_NAMESPACE_DECL
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1