DataLink.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
00009 #include "DataLink.h"
00010 
00011 #include "ReceivedDataSample.h"
00012 
00013 #include "TransportImpl.h"
00014 #include "TransportInst.h"
00015 #include "TransportClient.h"
00016 
00017 #include "dds/DCPS/DataWriterImpl.h"
00018 #include "dds/DCPS/DataReaderImpl.h"
00019 #include "dds/DCPS/Service_Participant.h"
00020 #include "dds/DCPS/GuidConverter.h"
00021 #include "dds/DdsDcpsGuidTypeSupportImpl.h"
00022 #include "dds/DCPS/Util.h"
00023 #include "dds/DCPS/Definitions.h"
00024 #include "dds/DCPS/SafetyProfileStreams.h"
00025 
00026 #include "EntryExit.h"
00027 #include "tao/ORB_Core.h"
00028 #include "tao/debug.h"
00029 #include "ace/Reactor.h"
00030 #include "ace/SOCK.h"
00031 
00032 
00033 #if !defined (__ACE_INLINE__)
00034 #include "DataLink.inl"
00035 #endif /* __ACE_INLINE__ */
00036 
00037 namespace OpenDDS {
00038 namespace DCPS {
00039 
00040 /// Only called by our TransportImpl object.
00041 DataLink::DataLink(TransportImpl* impl, Priority priority, bool is_loopback,
00042                    bool is_active)
00043   : stopped_(false),
00044     scheduled_to_stop_at_(ACE_Time_Value::zero),
00045     default_listener_(0),
00046     thr_per_con_send_task_(0),
00047     transport_priority_(priority),
00048     scheduling_release_(false),
00049     send_control_allocator_(0),
00050     mb_allocator_(0),
00051     db_allocator_(0),
00052     is_loopback_(is_loopback),
00053     is_active_(is_active),
00054     started_(false),
00055     send_response_listener_("DataLink")
00056 {
00057   DBG_ENTRY_LVL("DataLink", "DataLink", 6);
00058 
00059   impl->_add_ref();
00060   this->impl_ = impl;
00061 
00062   datalink_release_delay_.sec(this->impl_->config_->datalink_release_delay_ / 1000);
00063   datalink_release_delay_.usec(this->impl_->config_->datalink_release_delay_ % 1000 * 1000);
00064 
00065   id_ = DataLink::get_next_datalink_id();
00066 
00067   if (this->impl_->config_->thread_per_connection_) {
00068     this->thr_per_con_send_task_ = new ThreadPerConnectionSendTask(this);
00069 
00070     if (this->thr_per_con_send_task_->open() == -1) {
00071       ACE_ERROR((LM_ERROR,
00072                  ACE_TEXT("(%P|%t) DataLink::DataLink: ")
00073                  ACE_TEXT("failed to open ThreadPerConnectionSendTask\n")));
00074 
00075     } else if (DCPS_debug_level > 4) {
00076       ACE_DEBUG((LM_DEBUG,
00077                  ACE_TEXT("(%P|%t) DataLink::DataLink - ")
00078                  ACE_TEXT("started new thread to send data with.\n")));
00079     }
00080   }
00081 
00082   // Initialize transport control sample allocators:
00083   size_t control_chunks = this->impl_->config_->datalink_control_chunks_;
00084 
00085   this->send_control_allocator_ =
00086     new TransportSendControlElementAllocator(control_chunks);
00087 
00088   this->mb_allocator_ = new MessageBlockAllocator(control_chunks);
00089   this->db_allocator_ = new DataBlockAllocator(control_chunks);
00090 }
00091 
00092 DataLink::~DataLink()
00093 {
00094   DBG_ENTRY_LVL("DataLink", "~DataLink", 6);
00095 
00096   if (!assoc_by_local_.empty()) {
00097     ACE_DEBUG((LM_WARNING,
00098                ACE_TEXT("(%P|%t) WARNING: DataLink[%@]::~DataLink() - ")
00099                ACE_TEXT("link still in use by %d entities when deleted!\n"),
00100                this, assoc_by_local_.size()));
00101   }
00102 
00103   delete this->db_allocator_;
00104   delete this->mb_allocator_;
00105 
00106   delete this->send_control_allocator_;
00107 
00108   if (this->thr_per_con_send_task_ != 0) {
00109     this->thr_per_con_send_task_->close(1);
00110     delete this->thr_per_con_send_task_;
00111   }
00112 }
00113 
00114 TransportImpl_rch
00115 DataLink::impl() const
00116 {
00117   return impl_;
00118 }
00119 
00120 void
00121 DataLink::invoke_on_start_callbacks(bool success)
00122 {
00123   const DataLink_rch link(success ? this : 0, false);
00124 
00125   while (true) {
00126     GuardType guard(strategy_lock_);
00127 
00128     if (on_start_callbacks_.empty()) {
00129       break;
00130     }
00131 
00132     OnStartCallback last_callback = on_start_callbacks_.back();
00133     on_start_callbacks_.pop_back();
00134 
00135     guard.release();
00136     last_callback.first->use_datalink(last_callback.second, link);
00137   }
00138 }
00139 
00140 //Reactor invokes this after being notified in schedule_stop or cancel_release
00141 int
00142 DataLink::handle_exception(ACE_HANDLE /* fd */)
00143 {
00144   if(this->scheduled_to_stop_at_ == ACE_Time_Value::zero) {
00145     if (DCPS_debug_level > 0) {
00146       ACE_DEBUG((LM_DEBUG,
00147                  ACE_TEXT("(%P|%t) DataLink::handle_exception() - not scheduling or stopping\n")));
00148     }
00149     if (this->impl_ != 0) {
00150       ACE_Reactor_Timer_Interface* reactor = this->impl_->timer();
00151       if (reactor->cancel_timer(this) > 0) {
00152         if (DCPS_debug_level > 0) {
00153           ACE_DEBUG((LM_DEBUG,
00154                      ACE_TEXT("(%P|%t) DataLink::handle_exception() - cancelled future release timer\n")));
00155         }
00156       }
00157     } else {
00158       if (DCPS_debug_level > 0) {
00159         ACE_DEBUG((LM_DEBUG,
00160                    ACE_TEXT("(%P|%t) DataLink::handle_exception() - impl_ == 0\n")));
00161       }
00162     }
00163     this->_remove_ref();
00164     return 0;
00165   } else if (this->scheduled_to_stop_at_ <= ACE_OS::gettimeofday()) {
00166     if (this->scheduling_release_) {
00167       if (DCPS_debug_level > 0) {
00168         ACE_DEBUG((LM_DEBUG,
00169                    ACE_TEXT("(%P|%t) DataLink::handle_exception() - delay already elapsed so handle_timeout now\n")));
00170       }
00171       this->handle_timeout(ACE_Time_Value::zero, 0);
00172       return 0;
00173     }
00174     if (DCPS_debug_level > 0) {
00175       ACE_DEBUG((LM_DEBUG,
00176                  ACE_TEXT("(%P|%t) DataLink::handle_exception() - stopping now\n")));
00177     }
00178     this->stop();
00179     this->_remove_ref();
00180     return 0;
00181   } else /* SCHEDULE TO STOP IN THE FUTURE*/ {
00182     if (DCPS_debug_level > 0) {
00183       ACE_DEBUG((LM_DEBUG,
00184                  ACE_TEXT("(%P|%t) DataLink::handle_exception() - (delay) scheduling timer for future release\n")));
00185     }
00186     this->_add_ref();
00187     ACE_Reactor_Timer_Interface* reactor = this->impl_->timer();
00188     ACE_Time_Value future_release_time = this->scheduled_to_stop_at_ - ACE_OS::gettimeofday();
00189     reactor->schedule_timer(this, 0, future_release_time);
00190   }
00191   return 0;
00192 }
00193 
00194 //Allows DataLink::stop to be done on the reactor thread so that
00195 //this thread avoids possibly deadlocking trying to access reactor
00196 //to stop strategies or schedule timers
00197 void
00198 DataLink::schedule_stop(ACE_Time_Value& schedule_to_stop_at)
00199 {
00200   if (!this->stopped_ && this->scheduled_to_stop_at_ == ACE_Time_Value::zero) {
00201     //Add ref before handing to the reactor
00202     //ref removed in handle_exception or in handle_timeout based on stopping now or delayed
00203     this->_add_ref();
00204     this->scheduled_to_stop_at_ = schedule_to_stop_at;
00205     TransportReactorTask_rch reactor(this->impl_->reactor_task());
00206     reactor->get_reactor()->notify(this);
00207     // reactor will invoke our DataLink::handle_exception()
00208   } else {
00209     if (DCPS_debug_level > 0) {
00210       ACE_DEBUG((LM_DEBUG,
00211                  ACE_TEXT("(%P|%t) DataLink::schedule_stop() - Already stopped or already scheduled for stop\n")));
00212     }
00213   }
00214 }
00215 
00216 void
00217 DataLink::stop()
00218 {
00219   this->pre_stop_i();
00220 
00221   TransportSendStrategy_rch send_strategy;
00222   TransportStrategy_rch recv_strategy;
00223 
00224   {
00225     GuardType guard(this->strategy_lock_);
00226 
00227     if (this->stopped_) return;
00228 
00229     send_strategy = this->send_strategy_;
00230     this->send_strategy_ = 0;
00231 
00232     recv_strategy = this->receive_strategy_;
00233     this->receive_strategy_ = 0;
00234   }
00235 
00236   if (!send_strategy.is_nil()) {
00237     send_strategy->stop();
00238   }
00239 
00240   if (!recv_strategy.is_nil()) {
00241     recv_strategy->stop();
00242   }
00243 
00244   this->stop_i();
00245   this->stopped_ = true;
00246   this->scheduled_to_stop_at_ = ACE_Time_Value::zero;
00247 }
00248 
00249 void
00250 DataLink::resume_send()
00251 {
00252   if (!this->send_strategy_->isDirectMode())
00253     this->send_strategy_->resume_send();
00254 }
00255 
00256 int
00257 DataLink::make_reservation(const RepoId& remote_subscription_id,
00258                            const RepoId& local_publication_id,
00259                            TransportSendListener* send_listener)
00260 {
00261   DBG_ENTRY_LVL("DataLink", "make_reservation", 6);
00262 
00263   if (DCPS_debug_level > 9) {
00264     GuidConverter local(local_publication_id), remote(remote_subscription_id);
00265     ACE_DEBUG((LM_DEBUG,
00266                ACE_TEXT("(%P|%t) DataLink::make_reservation() - ")
00267                ACE_TEXT("creating association local publication  %C ")
00268                ACE_TEXT("<--> with remote subscription %C.\n"),
00269                OPENDDS_STRING(local).c_str(),
00270                OPENDDS_STRING(remote).c_str()));
00271   }
00272 
00273   {
00274     GuardType guard(strategy_lock_);
00275 
00276     if (!send_strategy_.is_nil()) {
00277       send_strategy_->link_released(false);
00278     }
00279   }
00280   {
00281     GuardType guard(pub_sub_maps_lock_);
00282 
00283     assoc_by_local_[local_publication_id].insert(remote_subscription_id);
00284     ReceiveListenerSet_rch& rls = assoc_by_remote_[remote_subscription_id];
00285 
00286     if (rls.is_nil())
00287       rls = new ReceiveListenerSet;
00288     rls->insert(local_publication_id, 0);
00289 
00290     send_listeners_[local_publication_id] = send_listener;
00291   }
00292   return 0;
00293 }
00294 
00295 int
00296 DataLink::make_reservation(const RepoId& remote_publication_id,
00297                            const RepoId& local_subscription_id,
00298                            TransportReceiveListener* receive_listener)
00299 {
00300   DBG_ENTRY_LVL("DataLink", "make_reservation", 6);
00301 
00302   if (DCPS_debug_level > 9) {
00303     GuidConverter local(local_subscription_id), remote(remote_publication_id);
00304     ACE_DEBUG((LM_DEBUG,
00305                ACE_TEXT("(%P|%t) DataLink::make_reservation() - ")
00306                ACE_TEXT("creating association local subscription %C ")
00307                ACE_TEXT("<--> with remote publication  %C.\n"),
00308                OPENDDS_STRING(local).c_str(), OPENDDS_STRING(remote).c_str()));
00309   }
00310 
00311   {
00312     GuardType guard(strategy_lock_);
00313 
00314     if (!send_strategy_.is_nil()) {
00315       send_strategy_->link_released(false);
00316     }
00317   }
00318   {
00319     GuardType guard(pub_sub_maps_lock_);
00320 
00321     assoc_by_local_[local_subscription_id].insert(remote_publication_id);
00322     ReceiveListenerSet_rch& rls = assoc_by_remote_[remote_publication_id];
00323 
00324     if (rls.is_nil())
00325       rls = new ReceiveListenerSet;
00326     rls->insert(local_subscription_id, receive_listener);
00327 
00328     recv_listeners_[local_subscription_id] = receive_listener;
00329   }
00330   return 0;
00331 }
00332 
00333 template <typename Seq>
00334 void set_to_seq(const RepoIdSet& rids, Seq& seq)
00335 {
00336   seq.length(static_cast<CORBA::ULong>(rids.size()));
00337   CORBA::ULong i = 0;
00338   for (RepoIdSet::const_iterator iter = rids.begin(); iter != rids.end(); ++iter) {
00339     seq[i++] = *iter;
00340   }
00341 }
00342 
00343 GUIDSeq*
00344 DataLink::peer_ids(const RepoId& local_id) const
00345 {
00346   GuardType guard(pub_sub_maps_lock_);
00347 
00348   const AssocByLocal::const_iterator iter = assoc_by_local_.find(local_id);
00349 
00350   if (iter == assoc_by_local_.end())
00351     return 0;
00352 
00353   GUIDSeq_var result = new GUIDSeq;
00354   set_to_seq(iter->second, static_cast<GUIDSeq&>(result));
00355   return result._retn();
00356 }
00357 
00358 /// This gets invoked when a TransportClient::remove_associations()
00359 /// call has been made.  Because this DataLink can be shared amongst
00360 /// different TransportClient objects, and different threads could
00361 /// be "managing" the different TransportClient objects, we need
00362 /// to make sure that this release_reservations() works in conjunction
00363 /// with a simultaneous call (in another thread) to one of this
00364 /// DataLink's make_reservation() methods.
00365 void
00366 DataLink::release_reservations(RepoId remote_id, RepoId local_id,
00367                                DataLinkSetMap& released_locals)
00368 {
00369   DBG_ENTRY_LVL("DataLink", "release_reservations", 6);
00370 
00371   if (DCPS_debug_level > 9) {
00372     GuidConverter local(local_id);
00373     GuidConverter remote(remote_id);
00374     ACE_DEBUG((LM_DEBUG,
00375                ACE_TEXT("(%P|%t) DataLink::release_reservations() - ")
00376                ACE_TEXT("releasing association local: %C ")
00377                ACE_TEXT("<--> with remote %C.\n"),
00378                OPENDDS_STRING(local).c_str(),
00379                OPENDDS_STRING(remote).c_str()));
00380   }
00381 
00382   //let the specific class release its reservations
00383   //done this way to prevent deadlock of holding pub_sub_maps_lock_
00384   //then obtaining a specific class lock in release_reservations_i
00385   //which reverses lock ordering of the active send logic of needing
00386   //the specific class lock before obtaining the over arching DataLink
00387   //pub_sub_maps_lock_
00388   this->release_reservations_i(remote_id, local_id);
00389 
00390   GuardType guard(this->pub_sub_maps_lock_);
00391 
00392   ReceiveListenerSet_rch& rls = assoc_by_remote_[remote_id];
00393   if (rls->size() == 1) {
00394     assoc_by_remote_.erase(remote_id);
00395     release_remote_i(remote_id);
00396   } else {
00397     rls->remove(local_id);
00398   }
00399   RepoIdSet& ris = assoc_by_local_[local_id];
00400   if (ris.size() == 1) {
00401     DataLinkSet_rch& links = released_locals[local_id];
00402     if (links.is_nil())
00403       links = new DataLinkSet;
00404     links->insert_link(this);
00405     {
00406       GuardType guard(this->released_assoc_by_local_lock_);
00407       released_assoc_by_local_[local_id].insert(remote_id);
00408     }
00409     assoc_by_local_.erase(local_id);
00410   } else {
00411     ris.erase(remote_id);
00412   }
00413 
00414   if (assoc_by_local_.empty()) {
00415     VDBG_LVL((LM_DEBUG,
00416               ACE_TEXT("(%P|%t) DataLink::release_reservations: ")
00417               ACE_TEXT("release_datalink due to no remaining pubs or subs.\n")), 5);
00418     this->impl_->release_datalink(this);
00419   }
00420 }
00421 
00422 void
00423 DataLink::schedule_delayed_release()
00424 {
00425   DBG_ENTRY_LVL("DataLink", "schedule_delayed_release", 6);
00426 
00427   VDBG((LM_DEBUG, "(%P|%t) DataLink[%@]::schedule_delayed_release\n", this));
00428 
00429   // The samples have to be removed at this point, otherwise the samples
00430   // can not be delivered when new association is added and still use
00431   // this connection/datalink.
00432   if (!this->send_strategy_.is_nil()) {
00433     this->send_strategy_->clear();
00434   }
00435 
00436   ACE_Time_Value future_release_time = ACE_OS::gettimeofday() + this->datalink_release_delay_;
00437   this->schedule_stop(future_release_time);
00438 }
00439 
00440 bool
00441 DataLink::cancel_release()
00442 {
00443   DBG_ENTRY_LVL("DataLink", "cancel_release", 6);
00444   if (stopped_) {
00445     if (DCPS_debug_level > 0) {
00446       ACE_DEBUG((LM_DEBUG, "(%P|%t) DataLink::cancel_release - link[%@] already stopped_ cannot cancel release\n", this));
00447     }
00448     return false;
00449   }
00450   if (scheduling_release_) {
00451     if (DCPS_debug_level > 0) {
00452       ACE_DEBUG((LM_DEBUG, "(%P|%t) DataLink::cancel_release - link[%@] currently scheduling release, notify reactor of cancel\n", this));
00453     }
00454     this->set_scheduling_release(false);
00455     this->scheduled_to_stop_at_ = ACE_Time_Value::zero;
00456     TransportReactorTask_rch reactor(this->impl_->reactor_task());
00457     reactor->get_reactor()->notify(this);
00458   }
00459   return true;
00460 }
00461 
00462 void
00463 DataLink::stop_i()
00464 {
00465   DBG_ENTRY_LVL("DataLink", "stop_i", 6);
00466 }
00467 
00468 ACE_Message_Block*
00469 DataLink::create_control(char submessage_id,
00470                          DataSampleHeader& header,
00471                          ACE_Message_Block* data)
00472 {
00473   DBG_ENTRY_LVL("DataLink", "create_control", 6);
00474 
00475   header.byte_order_ = ACE_CDR_BYTE_ORDER;
00476   header.message_id_ = TRANSPORT_CONTROL;
00477   header.submessage_id_ = submessage_id;
00478   header.message_length_ = static_cast<ACE_UINT32>(data->total_length());
00479 
00480   ACE_Message_Block* message;
00481   ACE_NEW_MALLOC_RETURN(message,
00482                         static_cast<ACE_Message_Block*>(
00483                           this->mb_allocator_->malloc(sizeof(ACE_Message_Block))),
00484                         ACE_Message_Block(header.max_marshaled_size(),
00485                                           ACE_Message_Block::MB_DATA,
00486                                           data,
00487                                           0,  // data
00488                                           0,  // allocator_strategy
00489                                           0,  // locking_strategy
00490                                           ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
00491                                           ACE_Time_Value::zero,
00492                                           ACE_Time_Value::max_time,
00493                                           this->db_allocator_,
00494                                           this->mb_allocator_),
00495                         0);
00496 
00497   *message << header;
00498 
00499   return message;
00500 }
00501 
00502 SendControlStatus
00503 DataLink::send_control(const DataSampleHeader& header, ACE_Message_Block* message)
00504 {
00505   DBG_ENTRY_LVL("DataLink", "send_control", 6);
00506 
00507   TransportSendControlElement* const elem =
00508     TransportSendControlElement::alloc(1, // initial_count
00509                                        GUID_UNKNOWN, &send_response_listener_,
00510                                        header, message, send_control_allocator_);
00511   if (!elem) return SEND_CONTROL_ERROR;
00512 
00513   send_response_listener_.track_message();
00514 
00515   RepoId senderId(header.publication_id_);
00516   send_start();
00517   send(elem);
00518   send_stop(senderId);
00519 
00520   return SEND_CONTROL_OK;
00521 }
00522 
00523 /// This method will "deliver" the sample to all TransportReceiveListeners
00524 /// within this DataLink that are interested in the (remote) publisher id
00525 /// that sent the sample.
00526 int
00527 DataLink::data_received(ReceivedDataSample& sample,
00528                         const RepoId& readerId /* = GUID_UNKNOWN */)
00529 {
00530   data_received_i(sample, readerId, RepoIdSet(), ReceiveListenerSet::SET_EXCLUDED);
00531   return 0;
00532 }
00533 
00534 void
00535 DataLink::data_received_include(ReceivedDataSample& sample, const RepoIdSet& incl)
00536 {
00537   data_received_i(sample, GUID_UNKNOWN, incl, ReceiveListenerSet::SET_INCLUDED);
00538 }
00539 
00540 void
00541 DataLink::data_received_i(ReceivedDataSample& sample,
00542                           const RepoId& readerId,
00543                           const RepoIdSet& incl_excl,
00544                           ReceiveListenerSet::ConstrainReceiveSet constrain)
00545 {
00546   DBG_ENTRY_LVL("DataLink", "data_received_i", 6);
00547   // Which remote publication sent this message?
00548   const RepoId& publication_id = sample.header_.publication_id_;
00549 
00550   // Locate the set of TransportReceiveListeners associated with this
00551   // DataLink that are interested in hearing about any samples received
00552   // from the remote publisher_id.
00553   if (DCPS_debug_level > 9) {
00554     const GuidConverter converter(publication_id);
00555     const GuidConverter reader(readerId);
00556     ACE_DEBUG((LM_DEBUG,
00557                ACE_TEXT("(%P|%t) DataLink::data_received_i: ")
00558                ACE_TEXT("from publication %C received sample: %C to readerId %C (%s).\n"),
00559                OPENDDS_STRING(converter).c_str(),
00560                to_string(sample.header_).c_str(),
00561                OPENDDS_STRING(reader).c_str(),
00562                constrain == ReceiveListenerSet::SET_EXCLUDED ? "SET_EXCLUDED" : "SET_INCLUDED"));
00563   }
00564 
00565   if (Transport_debug_level > 9) {
00566     const GuidConverter converter(publication_id);
00567     ACE_DEBUG((LM_DEBUG,
00568                ACE_TEXT("(%P|%t) DataLink::data_received_i: ")
00569                ACE_TEXT("from publication %C received sample: %C.\n"),
00570                OPENDDS_STRING(converter).c_str(),
00571                to_string(sample.header_).c_str()));
00572   }
00573 
00574   ReceiveListenerSet_rch listener_set;
00575   {
00576     GuardType guard(this->pub_sub_maps_lock_);
00577     AssocByRemote::iterator iter = assoc_by_remote_.find(publication_id);
00578     if (iter != assoc_by_remote_.end())
00579       listener_set = iter->second;
00580 
00581     if (listener_set.is_nil() && this->default_listener_) {
00582       this->default_listener_->data_received(sample);
00583       return;
00584     }
00585   }
00586 
00587   if (listener_set.is_nil()) {
00588     // Nobody has any interest in this message.  Drop it on the floor.
00589     if (Transport_debug_level > 4) {
00590       const GuidConverter converter(publication_id);
00591       ACE_DEBUG((LM_DEBUG,
00592                  ACE_TEXT("(%P|%t) DataLink::data_received_i: ")
00593                  ACE_TEXT(" discarding sample from publication %C due to no listeners.\n"),
00594                  OPENDDS_STRING(converter).c_str()));
00595     }
00596 
00597     return;
00598   }
00599 
00600   if (readerId != GUID_UNKNOWN) {
00601     listener_set->data_received(sample, readerId);
00602     return;
00603   }
00604 
00605 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00606 
00607   if (sample.header_.content_filter_
00608       && sample.header_.content_filter_entries_.length()) {
00609     ReceiveListenerSet subset(*listener_set.in());
00610     subset.remove_all(sample.header_.content_filter_entries_);
00611     subset.data_received(sample, incl_excl, constrain);
00612 
00613   } else {
00614 #endif // OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00615 
00616     if (DCPS_debug_level > 9) {
00617       // Just get the set to do our dirty work by having it iterate over its
00618       // collection of TransportReceiveListeners, and invoke the data_received()
00619       // method on each one.
00620       OPENDDS_STRING included_ids;
00621       bool first = true;
00622       RepoIdSet::const_iterator iter = incl_excl.begin();
00623       while(iter != incl_excl.end()) {
00624         included_ids += (first ? "" : "\n") + OPENDDS_STRING(GuidConverter(*iter));
00625         first = false;
00626         ++iter;
00627       }
00628       ACE_DEBUG((LM_DEBUG, "(%P|%t) DataLink::data_received_i - normal data received to each subscription in listener_set %s ids:%C\n",
00629                  constrain == ReceiveListenerSet::SET_EXCLUDED ? "exclude" : "include", included_ids.c_str()));
00630     }
00631     listener_set->data_received(sample, incl_excl, constrain);
00632 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00633   }
00634 
00635 #endif // OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00636 }
00637 
00638 // static
00639 ACE_UINT64
00640 DataLink::get_next_datalink_id()
00641 {
00642   static ACE_UINT64 next_id = 0;
00643   static LockType lock;
00644 
00645   ACE_UINT64 id;
00646   {
00647     GuardType guard(lock);
00648     id = next_id++;
00649 
00650     if (0 == next_id) {
00651       ACE_ERROR((LM_ERROR,
00652                  ACE_TEXT("ERROR: DataLink::get_next_datalink_id: ")
00653                  ACE_TEXT("has rolled over and is reusing ids!\n")));
00654     }
00655   }
00656 
00657   return id;
00658 }
00659 
00660 void
00661 DataLink::transport_shutdown()
00662 {
00663   DBG_ENTRY_LVL("DataLink", "transport_shutdown", 6);
00664 
00665   if (!this->send_strategy_.is_nil()) {
00666     this->send_strategy_->transport_shutdown();
00667   }
00668 
00669   //this->cancel_release();
00670   this->set_scheduling_release(false);
00671   this->scheduled_to_stop_at_ = ACE_Time_Value::zero;
00672   ACE_Reactor_Timer_Interface* reactor = this->impl_->timer();
00673   reactor->cancel_timer(this);
00674 
00675   this->stop();
00676 
00677   // Drop our reference to the TransportImpl object
00678   this->impl_ = 0;
00679 }
00680 
00681 void
00682 DataLink::notify(ConnectionNotice notice)
00683 {
00684   DBG_ENTRY_LVL("DataLink", "notify", 6);
00685 
00686   VDBG((LM_DEBUG,
00687         ACE_TEXT("(%P|%t) DataLink::notify: this(%X) notify %C\n"),
00688         this,
00689         connection_notice_as_str(notice)));
00690 
00691   GuardType guard(this->pub_sub_maps_lock_);
00692 
00693   // Notify the datawriters
00694   // the lost publications due to a connection problem.
00695   for (IdToSendListenerMap::iterator itr = send_listeners_.begin();
00696        itr != send_listeners_.end(); ++itr) {
00697 
00698     TransportSendListener* tsl = itr->second;
00699 
00700     if (tsl != 0) {
00701       if (Transport_debug_level > 0) {
00702         GuidConverter converter(itr->first);
00703         ACE_DEBUG((LM_DEBUG,
00704                    ACE_TEXT("(%P|%t) DataLink::notify: ")
00705                    ACE_TEXT("notify pub %C %C.\n"),
00706                    OPENDDS_STRING(converter).c_str(),
00707                    connection_notice_as_str(notice)));
00708       }
00709       AssocByLocal::iterator local_it = assoc_by_local_.find(itr->first);
00710       if (local_it == assoc_by_local_.end()) {
00711         if (Transport_debug_level) {
00712           GuidConverter converter(itr->first);
00713           ACE_DEBUG((LM_DEBUG,
00714                      ACE_TEXT("(%P|%t) DataLink::notify: ")
00715                      ACE_TEXT("try to notify pub %C %C - no associations to notify.\n"),
00716                      OPENDDS_STRING(converter).c_str(),
00717                      connection_notice_as_str(notice)));
00718         }
00719         break;
00720       }
00721       const RepoIdSet& rids = local_it->second;
00722 
00723       ReaderIdSeq subids;
00724       set_to_seq(rids, subids);
00725 
00726       switch (notice) {
00727       case DISCONNECTED:
00728         tsl->notify_publication_disconnected(subids);
00729         break;
00730 
00731       case RECONNECTED:
00732         tsl->notify_publication_reconnected(subids);
00733         break;
00734 
00735       case LOST:
00736         tsl->notify_publication_lost(subids);
00737         break;
00738 
00739       default:
00740         ACE_ERROR((LM_ERROR,
00741                    ACE_TEXT("(%P|%t) ERROR: DataLink::notify: ")
00742                    ACE_TEXT("unknown notice to TransportSendListener\n")));
00743         break;
00744       }
00745 
00746     } else {
00747       if (Transport_debug_level > 0) {
00748         GuidConverter converter(itr->first);
00749         ACE_DEBUG((LM_DEBUG,
00750                    ACE_TEXT("(%P|%t) DataLink::notify: ")
00751                    ACE_TEXT("not notify pub %C %C \n"),
00752                    OPENDDS_STRING(converter).c_str(),
00753                    connection_notice_as_str(notice)));
00754       }
00755     }
00756 
00757   }
00758 
00759   // Notify the datareaders registered with TransportImpl
00760   // the lost subscriptions due to a connection problem.
00761   for (IdToRecvListenerMap::iterator itr = recv_listeners_.begin();
00762        itr != recv_listeners_.end(); ++itr) {
00763 
00764     TransportReceiveListener* trl = itr->second;
00765 
00766     if (trl != 0) {
00767       if (Transport_debug_level > 0) {
00768         GuidConverter converter(itr->first);
00769         ACE_DEBUG((LM_DEBUG,
00770                    ACE_TEXT("(%P|%t) DataLink::notify: ")
00771                    ACE_TEXT("notify sub %C %C.\n"),
00772                    OPENDDS_STRING(converter).c_str(),
00773                    connection_notice_as_str(notice)));
00774       }
00775       AssocByLocal::iterator local_it = assoc_by_local_.find(itr->first);
00776       if (local_it == assoc_by_local_.end()) {
00777         if (Transport_debug_level) {
00778           GuidConverter converter(itr->first);
00779           ACE_DEBUG((LM_DEBUG,
00780                      ACE_TEXT("(%P|%t) DataLink::notify: ")
00781                      ACE_TEXT("try to notify sub %C %C - no associations to notify.\n"),
00782                      OPENDDS_STRING(converter).c_str(),
00783                      connection_notice_as_str(notice)));
00784         }
00785         break;
00786       }
00787       const RepoIdSet& rids = local_it->second;
00788 
00789       WriterIdSeq pubids;
00790       set_to_seq(rids, pubids);
00791 
00792       switch (notice) {
00793       case DISCONNECTED:
00794         trl->notify_subscription_disconnected(pubids);
00795         break;
00796 
00797       case RECONNECTED:
00798         trl->notify_subscription_reconnected(pubids);
00799         break;
00800 
00801       case LOST:
00802         trl->notify_subscription_lost(pubids);
00803         break;
00804 
00805       default:
00806         ACE_ERROR((LM_ERROR,
00807                    ACE_TEXT("(%P|%t) ERROR: DataLink::notify: ")
00808                    ACE_TEXT("unknown notice to datareader.\n")));
00809         break;
00810       }
00811 
00812     } else {
00813       if (Transport_debug_level > 0) {
00814         GuidConverter converter(itr->first);
00815         ACE_DEBUG((LM_DEBUG,
00816                    ACE_TEXT("(%P|%t) DataLink::notify: ")
00817                    ACE_TEXT("not notify sub %C subscription lost.\n"),
00818                    OPENDDS_STRING(converter).c_str()));
00819       }
00820 
00821     }
00822   }
00823 }
00824 
00825 void DataLink::notify_connection_deleted()
00826 {
00827   GuardType guard(this->released_assoc_by_local_lock_);
00828   for (AssocByLocal::iterator iter = released_assoc_by_local_.begin();
00829        iter != released_assoc_by_local_.end(); ++iter) {
00830       TransportSendListener* const tsl = send_listener_for(iter->first);
00831       if (tsl) {
00832         for (RepoIdSet::iterator ris_it = iter->second.begin();
00833              ris_it != iter->second.end(); ++ris_it) {
00834           tsl->notify_connection_deleted(*ris_it);
00835         }
00836         iter->second.clear();
00837         continue;
00838       }
00839       TransportReceiveListener* const trl = recv_listener_for(iter->first);
00840       if (trl) {
00841         for (RepoIdSet::iterator ris_it = iter->second.begin();
00842              ris_it != iter->second.end(); ++ris_it) {
00843           trl->notify_connection_deleted(*ris_it);
00844         }
00845         iter->second.clear();
00846       }
00847   }
00848 }
00849 
00850 void
00851 DataLink::pre_stop_i()
00852 {
00853   if (this->thr_per_con_send_task_ != 0) {
00854     this->thr_per_con_send_task_->close(1);
00855   }
00856 }
00857 
00858 bool
00859 DataLink::release_resources()
00860 {
00861   DBG_ENTRY_LVL("DataLink", "release_resources", 6);
00862 
00863   this->prepare_release();
00864 
00865   return impl_->release_link_resources(this);
00866 }
00867 
00868 bool
00869 DataLink::is_target(const RepoId& remote_sub_id)
00870 {
00871   GuardType guard(this->pub_sub_maps_lock_);
00872   return assoc_by_remote_.count(remote_sub_id);
00873 }
00874 
00875 GUIDSeq*
00876 DataLink::target_intersection(const RepoId& pub_id, const GUIDSeq& in,
00877                               size_t& n_subs)
00878 {
00879   GUIDSeq_var res;
00880   GuardType guard(this->pub_sub_maps_lock_);
00881   AssocByLocal::const_iterator iter = assoc_by_local_.find(pub_id);
00882 
00883   if (iter != assoc_by_local_.end()) {
00884     n_subs = iter->second.size();
00885     const CORBA::ULong len = in.length();
00886 
00887     for (CORBA::ULong i(0); i < len; ++i) {
00888       if (iter->second.count(in[i])) {
00889         if (res.ptr() == 0) {
00890           res = new GUIDSeq;
00891         }
00892 
00893         push_back(res.inout(), in[i]);
00894       }
00895     }
00896   }
00897 
00898   return res._retn();
00899 }
00900 
00901 void DataLink::prepare_release()
00902 {
00903   GuardType guard(this->pub_sub_maps_lock_);
00904 
00905   if (!assoc_releasing_.empty()) {
00906     ACE_ERROR((LM_ERROR,
00907                ACE_TEXT("(%P|%t) DataLink::prepare_release: ")
00908                ACE_TEXT("already prepared for release.\n")));
00909     return;
00910   }
00911 
00912   assoc_releasing_ = assoc_by_local_;
00913 }
00914 
00915 void DataLink::clear_associations()
00916 {
00917   for (AssocByLocal::iterator iter = assoc_releasing_.begin();
00918        iter != assoc_releasing_.end(); ++iter) {
00919     TransportSendListener* const tsl = send_listener_for(iter->first);
00920     if (tsl) {
00921       ReaderIdSeq sub_ids;
00922       set_to_seq(iter->second, sub_ids);
00923       tsl->remove_associations(sub_ids, false);
00924       continue;
00925     }
00926     TransportReceiveListener* const trl = recv_listener_for(iter->first);
00927     if (trl) {
00928       WriterIdSeq pub_ids;
00929       set_to_seq(iter->second, pub_ids);
00930       trl->remove_associations(pub_ids, false);
00931     }
00932   }
00933   assoc_releasing_.clear();
00934 }
00935 
00936 int
00937 DataLink::handle_timeout(const ACE_Time_Value& /*tv*/, const void* /*arg*/)
00938 {
00939   if (this->scheduled_to_stop_at_ != ACE_Time_Value::zero) {
00940     VDBG_LVL((LM_DEBUG, "(%P|%t) DataLink::handle_timeout called\n"), 4);
00941     this->impl_->unbind_link(this);
00942 
00943     if (assoc_by_remote_.empty() && assoc_by_local_.empty()) {
00944       this->stop();
00945     }
00946   }
00947   this->_remove_ref();
00948   return 0;
00949 }
00950 
00951 int
00952 DataLink::handle_close(ACE_HANDLE h, ACE_Reactor_Mask m)
00953 {
00954   if (h == ACE_INVALID_HANDLE && m == TIMER_MASK) {
00955     // Reactor is shutting down with this timer still pending.
00956     // Take the same cleanup actions as if the timeout had expired.
00957     handle_timeout(ACE_Time_Value::zero, 0);
00958   }
00959 
00960   return 0;
00961 }
00962 
00963 void
00964 DataLink::set_dscp_codepoint(int cp, ACE_SOCK& socket)
00965 {
00966   /**
00967    * The following IPV6 code was lifted in spirit from the RTCORBA
00968    * implementation of setting the DiffServ codepoint.
00969    */
00970   int result = 0;
00971 
00972   // Shift the code point up to bits, so that we only use the DS field
00973   int tos = cp << 2;
00974 
00975   const char* which = "IPV4 TOS";
00976 #if defined (ACE_HAS_IPV6)
00977   ACE_INET_Addr local_address;
00978 
00979   if (socket.get_local_addr(local_address) == -1) {
00980     return;
00981 
00982   } else if (local_address.get_type() == AF_INET6)
00983 #if !defined (IPV6_TCLASS)
00984   {
00985     if (DCPS_debug_level > 0) {
00986       ACE_ERROR((LM_ERROR,
00987                  ACE_TEXT("(%P|%t) ERROR: DataLink::set_dscp_codepoint() - ")
00988                  ACE_TEXT("IPV6 TCLASS not supported yet, not setting codepoint %d.\n"),
00989                  cp));
00990     }
00991 
00992     return;
00993   }
00994 
00995 #else /* IPV6_TCLASS */
00996   {
00997     which = "IPV6 TCLASS";
00998     result = socket.set_option(
00999                IPPROTO_IPV6,
01000                IPV6_TCLASS,
01001                &tos,
01002                sizeof(tos));
01003 
01004   } else // This is a bit tricky and might be hard to follow...
01005 
01006 #endif /* IPV6_TCLASS */
01007 #endif /* ACE_HAS_IPV6 */
01008 
01009 #ifdef IP_TOS
01010   result = socket.set_option(
01011              IPPROTO_IP,
01012              IP_TOS,
01013              &tos,
01014              sizeof(tos));
01015 
01016   if ((result == -1) && (errno != ENOTSUP)
01017 #ifdef WSAEINVAL
01018       && (errno != WSAEINVAL)
01019 #endif
01020      ) {
01021 #endif // IP_TOS
01022     ACE_DEBUG((LM_DEBUG,
01023                ACE_TEXT("(%P|%t) DataLink::set_dscp_codepoint() - ")
01024                ACE_TEXT("failed to set the %C codepoint to %d: %m, ")
01025                ACE_TEXT("try running as superuser.\n"),
01026                which,
01027                cp));
01028 #ifdef IP_TOS
01029   } else if (DCPS_debug_level > 4) {
01030     ACE_DEBUG((LM_DEBUG,
01031                ACE_TEXT("(%P|%t) DataLink::set_dscp_codepoint() - ")
01032                ACE_TEXT("set %C codepoint to %d.\n"),
01033                which,
01034                cp));
01035   }
01036 #endif
01037 }
01038 
01039 #ifndef OPENDDS_SAFETY_PROFILE
01040 std::ostream&
01041 operator<<(std::ostream& str, const DataLink& value)
01042 {
01043   str << "   There are " << value.assoc_by_local_.size()
01044       << " local entities currently using this link comprising following associations:"
01045       << std::endl;
01046 
01047   for (DataLink::AssocByLocal::const_iterator
01048        localId = value.assoc_by_local_.begin();
01049        localId != value.assoc_by_local_.end();
01050        ++localId) {
01051     for (RepoIdSet::const_iterator
01052          remoteId = localId->second.begin();
01053          remoteId != localId->second.end();
01054          ++remoteId) {
01055       str << GuidConverter(localId->first) << " --> "
01056           << GuidConverter(*remoteId) << "   " << std::endl;
01057     }
01058   }
01059   return str;
01060 }
01061 #endif
01062 
01063 }
01064 }

Generated on Fri Feb 12 20:05:19 2016 for OpenDDS by  doxygen 1.4.7