21 #include "dds/DdsDcpsGuidTypeSupportImpl.h" 32 #if !defined (__ACE_INLINE__) 46 transport_priority_(priority),
47 scheduling_release_(false),
48 is_loopback_(is_loopback),
49 is_active_(is_active),
51 send_response_listener_(
"DataLink"),
52 interceptor_(impl->reactor(), impl->reactor_owner())
69 ACE_TEXT(
"(%P|%t) DataLink::DataLink: ")
70 ACE_TEXT(
"failed to open ThreadPerConnectionSendTask\n")));
74 ACE_TEXT(
"(%P|%t) DataLink::DataLink - ")
75 ACE_TEXT(
"started new thread to send data with.\n")));
94 ACE_TEXT(
"(%P|%t) WARNING: DataLink[%@]::~DataLink() - ")
95 ACE_TEXT(
"link still in use by %d entities when deleted!\n"),
123 RepoIdSet::iterator it2 = it->second.find(client_id);
124 if (it2 != it->second.end()) {
125 it->second.erase(it2);
126 if (it->second.empty()) {
152 RepoToClientMap::iterator oit2 = oit->second.find(local);
153 if (oit2 != oit->second.end()) {
154 oit->second.erase(oit2);
155 if (oit->second.empty()) {
162 RepoIdSet::iterator pit2 = pit->second.find(local);
163 if (pit2 != pit->second.end()) {
164 pit->second.erase(pit2);
165 if (pit->second.empty()) {
177 const GUID_t id = client_lock->get_guid();
182 RepoToClientMap::iterator it2 = it->second.find(
id);
183 if (it2 != it->second.end()) {
184 it->second.erase(it2);
185 if (it->second.empty()) {
210 RepoToClientMap::iterator it2 = it->second.begin();
211 if (it2 != it->second.end()) {
212 client = it2->second;
213 it->second.erase(it2);
214 if (it->second.empty()) {
224 client_lock->use_datalink(remote, link);
235 bool made_callback =
false;
242 RepoToClientMap::iterator it2 = it->second.find(local);
243 if (it2 != it->second.end()) {
244 client = it2->second;
245 it->second.erase(it2);
246 if (it->second.empty()) {
260 client_lock->use_datalink(remote, link);
261 made_callback =
true;
265 return made_callback;
278 ACE_TEXT(
"(%P|%t) DataLink::handle_exception() - not scheduling or stopping\n")));
286 ACE_TEXT(
"(%P|%t) DataLink::handle_exception() - cancelled future release timer\n")));
295 ACE_TEXT(
"(%P|%t) DataLink::handle_exception() - delay already elapsed so handle_timeout now\n")));
302 ACE_TEXT(
"(%P|%t) DataLink::handle_exception() - stopping now\n")));
309 ACE_TEXT(
"(%P|%t) DataLink::handle_exception() - (delay) scheduling timer for future release\n")));
334 ACE_TEXT(
"(%P|%t) DataLink::schedule_stop() - Already stopped or already scheduled for stop\n")));
374 if (!send_strategy.
is_nil()) {
375 send_strategy->
stop();
378 if (!recv_strategy.
is_nil()) {
379 recv_strategy->
stop();
399 const GUID_t& local_publication_id,
406 LogGuid local_log(local_publication_id), remote_log(remote_subscription_id);
408 ACE_TEXT(
"(%P|%t) DataLink::make_reservation() - ")
409 ACE_TEXT(
"creating association local publication %C ")
410 ACE_TEXT(
"<--> with remote subscription %C.\n"),
412 remote_log.
c_str()));
430 rls = make_rch<ReceiveListenerSet>();
433 send_listeners_.insert(std::make_pair(local_publication_id, send_listener));
440 const GUID_t& local_subscription_id,
447 LogGuid local(local_subscription_id), remote(remote_publication_id);
449 ACE_TEXT(
"(%P|%t) DataLink::make_reservation() - ")
450 ACE_TEXT(
"creating association local subscription %C ")
451 ACE_TEXT(
"<--> with remote publication %C.\n"),
452 local.c_str(), remote.
c_str()));
470 rls = make_rch<ReceiveListenerSet>();
471 rls->insert(local_subscription_id, receive_listener);
479 template <
typename Seq>
482 seq.length(static_cast<CORBA::ULong>(rids.size()));
484 for (RepoIdSet::const_iterator iter = rids.begin(); iter != rids.end(); ++iter) {
494 const AssocByLocal::const_iterator iter =
assoc_by_local_.find(local_id);
499 GUIDSeq_var result =
new GUIDSeq;
500 set_to_seq(iter->second.associated_, static_cast<GUIDSeq&>(result));
501 return result._retn();
513 DataLinkSetMap& released_locals)
521 ACE_TEXT(
"(%P|%t) DataLink::release_reservations() - ")
522 ACE_TEXT(
"releasing association local: %C ")
538 bool release_remote_required =
false;
545 if (rls->size() == 1) {
547 release_remote_required =
true;
549 rls->remove(local_id);
552 if (ris.size() == 1) {
555 links = make_rch<DataLinkSet>();
560 ris.erase(remote_id);
565 ACE_TEXT(
"(%P|%t) DataLink::release_reservations: ")
566 ACE_TEXT(
"release_datalink due to no remaining pubs or subs.\n")), 5);
575 if (release_remote_required) {
585 VDBG((
LM_DEBUG,
"(%P|%t) DataLink[%@]::schedule_delayed_release\n",
this));
606 ACE_DEBUG((
LM_DEBUG,
"(%P|%t) DataLink::cancel_release - link[%@] already stopped_ cannot cancel release\n",
this));
612 ACE_DEBUG((
LM_DEBUG,
"(%P|%t) DataLink::cancel_release - link[%@] currently scheduling release, notify reactor of cancel\n",
this));
641 static_cast<ACE_Message_Block*>(
656 if (!(*message << header)) {
658 ACE_TEXT(
"(%P|%t) DataLink::create_control: ")
659 ACE_TEXT(
"cannot put header in message\n")));
674 header,
move(message));
720 ACE_TEXT(
"(%P|%t) DataLink::data_received_i: ")
721 ACE_TEXT(
"from publication %C received sample: %C to readerId %C (%C).\n"),
731 ACE_TEXT(
"(%P|%t) DataLink::data_received_i: ")
732 ACE_TEXT(
"from publication %C received sample: %C.\n"),
743 listener_set = iter->second;
749 if (listener_set.
is_nil()) {
757 ACE_TEXT(
"(%P|%t) DataLink::data_received_i: ")
758 ACE_TEXT(
" discarding sample from publication %C due to no listeners.\n"),
766 listener_set->data_received(sample, readerId);
770 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE 776 subset.data_received(sample, incl_excl, constrain);
787 RepoIdSet::const_iterator iter = incl_excl.begin();
788 while(iter != incl_excl.end()) {
793 ACE_DEBUG((
LM_DEBUG,
"(%P|%t) DataLink::data_received_i - normal data received to each subscription in listener_set %C ids:%C\n",
796 listener_set->data_received(sample, incl_excl, constrain);
797 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE 817 ACE_TEXT(
"ERROR: DataLink::get_next_datalink_id: ")
818 ACE_TEXT(
"has rolled over and is reusing ids!\n")));
853 ACE_TEXT(
"(%P|%t) DataLink::notify: this(%X) notify %C\n"),
870 ACE_TEXT(
"(%P|%t) DataLink::notify: ")
880 ACE_TEXT(
"(%P|%t) DataLink::notify: ")
881 ACE_TEXT(
"try to notify pub %C %C - no associations to notify.\n"),
887 const RepoIdSet& rids = local_it->second.associated_;
894 tsl->notify_publication_disconnected(subids);
898 tsl->notify_publication_reconnected(subids);
902 tsl->notify_publication_lost(subids);
907 ACE_TEXT(
"(%P|%t) ERROR: DataLink::notify: ")
908 ACE_TEXT(
"unknown notice to TransportSendListener\n")));
916 ACE_TEXT(
"(%P|%t) DataLink::notify: ")
935 ACE_TEXT(
"(%P|%t) DataLink::notify: ")
945 ACE_TEXT(
"(%P|%t) DataLink::notify: ")
946 ACE_TEXT(
"try to notify sub %C %C - no associations to notify.\n"),
952 const RepoIdSet& rids = local_it->second.associated_;
972 ACE_TEXT(
"(%P|%t) ERROR: DataLink::notify: ")
973 ACE_TEXT(
"unknown notice to datareader.\n")));
981 ACE_TEXT(
"(%P|%t) DataLink::notify: ")
982 ACE_TEXT(
"not notify sub %C subscription lost.\n"),
1028 n_subs = iter->second.associated_.size();
1032 if (iter->second.associated_.count(in[i])) {
1033 if (res.ptr() == 0) {
1051 ACE_TEXT(
"(%P|%t) DataLink::prepare_release: ")
1052 ACE_TEXT(
"already prepared for release.\n")));
1066 set_to_seq(iter->second.associated_, sub_ids);
1067 tsl->remove_associations(sub_ids,
false);
1073 set_to_seq(iter->second.associated_, pub_ids);
1105 if (h == ACE_INVALID_HANDLE && m ==
TIMER_MASK) {
1126 const char* which =
"IPV4 TOS";
1127 #if defined (ACE_HAS_IPV6) 1133 }
else if (local_address.
get_type() == AF_INET6)
1134 #
if !defined (IPV6_TCLASS)
1138 ACE_TEXT(
"(%P|%t) ERROR: DataLink::set_dscp_codepoint() - ")
1139 ACE_TEXT(
"IPV6 TCLASS not supported yet, not setting codepoint %d.\n"),
1148 which =
"IPV6 TCLASS";
1167 if ((result == -1) && (errno !=
ENOTSUP)
1169 && (errno != WSAEINVAL)
1174 ACE_TEXT(
"(%P|%t) DataLink::set_dscp_codepoint() - ")
1175 ACE_TEXT(
"failed to set the %C codepoint to %d: %m, ")
1176 ACE_TEXT(
"try running as superuser.\n"),
1182 ACE_TEXT(
"(%P|%t) DataLink::set_dscp_codepoint() - ")
1183 ACE_TEXT(
"set %C codepoint to %d.\n"),
1206 client_lock->use_datalink(remote_, link_);
1214 IdToSendListenerMap send_listeners;
1215 IdToRecvListenerMap recv_listeners;
1221 for (IdToSendListenerMap::const_iterator itr = send_listeners.begin();
1222 itr != send_listeners.end(); ++itr) {
1225 tsl->transport_discovery_change();
1229 for (IdToRecvListenerMap::const_iterator itr = recv_listeners.begin();
1230 itr != recv_listeners.end(); ++itr) {
1244 if (send_listener) {
1245 send_listener->replay_durable_data_for(remote_sub_id);
1249 #ifndef OPENDDS_SAFETY_PROFILE 1254 <<
" local entities currently using this link";
1257 str <<
" comprising following associations:";
1261 typedef DataLink::AssocByLocal::const_iterator assoc_iter_t;
1263 for (assoc_iter_t ait = abl.begin(); ait != abl.end(); ++ait) {
1264 const RepoIdSet&
set = ait->second.associated_;
1265 for (RepoIdSet::const_iterator rit =
set.begin(); rit !=
set.end(); ++rit) {
DataSampleHeader header_
The demarshalled sample header.
IdToSendListenerMap send_listeners_
RcHandle< T > rchandle_from(T *pointer)
TransportImpl_rch impl() const
const TimeDuration & datalink_release_delay() const
const LogLevel::Value value
static const ACE_Time_Value max_time
char message_id_
The enum MessageId.
void transport_shutdown()
void remove_on_start_callback(const TransportClient_wrch &client, const GUID_t &remote)
unsigned long ACE_Reactor_Mask
const GUID_t GUID_UNKNOWN
Nil value for GUID.
void link_released(bool flag)
static const size_t DEFAULT_DATALINK_CONTROL_CHUNKS
void data_received_include(ReceivedDataSample &sample, const RepoIdSet &incl)
LockType pub_sub_maps_lock_
WeakRcHandle< TransportImpl > impl_
A weak rchandle to the TransportImpl that created this DataLink.
int data_received(ReceivedDataSample &sample, const GUID_t &readerId=GUID_UNKNOWN)
int handle_timeout(const ACE_Time_Value &tv, const void *arg)
ACE_Message_Block * create_control(char submessage_id, DataSampleHeader &header, Message_Block_Ptr data)
void set_scheduling_release(bool scheduling_release)
virtual void transport_discovery_change()
const char * c_str() const
TransportSendListener_rch send_listener_for(const GUID_t &pub_id) const
TransportSendStrategy_rch get_send_strategy()
void terminate_send_if_suspended()
void schedule_stop(const MonotonicTimePoint &schedule_to_stop_at)
CommandPtr execute_or_enqueue(CommandPtr command)
static ACE_UINT64 get_next_datalink_id()
Used to provide unique Ids to all DataLink methods.
const ACE_Time_Value & value() const
T::rv_reference move(T &p)
AssocByLocal assoc_by_local_
RcHandle< TransportReceiveListener > TransportReceiveListener_rch
#define ACE_CDR_BYTE_ORDER
int set_option(int level, int option, void *optval, int optlen) const
void send_stop(GUID_t repoId)
void remove_startup_callbacks(const GUID_t &local, const GUID_t &remote)
Conversion processing and value testing utilities for RTPS GUID_t types.
void clear(SendMode new_mode, SendMode old_mode=MODE_NOT_SET)
bool is_target(const GUID_t &remote_id)
static TimePoint_T< MonotonicClock > now()
bool thread_per_connection_
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
sequence< GUID_t > ReaderIdSeq
AssocByLocal assoc_releasing_
virtual void remove_associations(const WriterIdSeq &pubids, bool notify)=0
static const TimePoint_T< MonotonicClock > zero_value
TransportStrategy_rch receive_strategy_
The transport receive strategy object for this DataLink.
#define ACE_NEW_MALLOC_RETURN(POINTER, ALLOCATOR, CONSTRUCTOR, RET_VAL)
friend class ThreadPerConnectionSendTask
int notify(ACE_Event_Handler *event_handler=0, ACE_Reactor_Mask masks=ACE_Event_Handler::EXCEPT_MASK, ACE_Time_Value *timeout=0)
static const long DEFAULT_DATALINK_RELEASE_DELAY
void send(TransportQueueElement *element)
virtual void notify_subscription_disconnected(const WriterIdSeq &pubids)=0
void notify(ConnectionNotice notice)
static TimeDuration from_msec(const ACE_UINT64 &ms)
static size_t get_max_serialized_size()
Similar to IDL compiler generated methods.
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Holds a data sample received by the transport.
TransportSendStrategy_rch send_strategy_
The transport send strategy object for this DataLink.
virtual void data_received(const ReceivedDataSample &sample)=0
TransportReceiveListener_wrch default_listener_
void set_dscp_codepoint(int cp, ACE_SOCK &socket)
PendingOnStartsMap pending_on_starts_
friend OpenDDS_Dcps_Export std::ostream & operator<<(std::ostream &str, const DataLink &value)
Convenience function for diagnostic information.
void clear_associations()
size_t datalink_control_chunks_
void replay_durable_data(const GUID_t &local_pub_id, const GUID_t &remote_sub_id) const
int get_local_addr(ACE_Addr &) const
GUIDSeq content_filter_entries_
virtual int cancel_timer(long timer_id, const void **arg=0, int dont_call_handle_close=1)=0
sequence< GUID_t > WriterIdSeq
size_t total_length(void) const
virtual void terminate_send_if_suspended()
void schedule_delayed_release()
DataLinkIdType id() const
Obtain a unique identifier for this DataLink object.
ACE_UINT32 message_length_
TimeDuration datalink_release_delay_
bool add_on_start_callback(const TransportClient_wrch &client, const GUID_t &remote)
int handle_close(ACE_HANDLE h, ACE_Reactor_Mask m)
GUIDSeq * peer_ids(const GUID_t &local_id) const
ACE_Reactor_Timer_Interface * timer() const
Interface to the transport's reactor for scheduling timers.
void set_to_seq(const RepoIdSet &rids, Seq &seq)
virtual ACE_Reactor * reactor(void) const
virtual void release_remote_i(const GUID_t &)
GUIDSeq * target_intersection(const GUID_t &pub_id, const GUIDSeq &in, size_t &n_subs)
bool reactor_is_shut_down() const
char submessage_id_
Implementation-specific sub-message Ids.
unsigned long long ACE_UINT64
long datalink_release_delay_
ReactorTask_rch reactor_task()
sequence< GUID_t > GUIDSeq
Cached_Allocator_With_Overflow< ACE_Message_Block, ACE_Thread_Mutex > MessageBlockAllocator
bool release_link_resources(DataLink *link)
void push_back(Seq &seq, const typename Seq::value_type &val)
std::vector-style push_back() for CORBA Sequences
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
unique_ptr< ThreadPerConnectionSendTask > thr_per_con_send_task_
const char * connection_notice_as_str(ConnectionNotice notice)
Helper function to output the enum as a string to help debugging.
SendResponseListener send_response_listener_
Listener for TransportSendControlElements created in send_control.
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
void stop()
The stop method is used to stop the DataLink prior to shutdown.
virtual void unbind_link(DataLink *link)
Remove any pending_release mappings.
OnStartCallbackMap on_start_callbacks_
static const ACE_Time_Value zero
virtual int make_reservation(const GUID_t &remote_subscription_id, const GUID_t &local_publication_id, const TransportSendListener_wrch &send_listener, bool reliable)
#define VDBG_LVL(DBG_ARGS, LEVEL)
void network_change() const
TransportReceiveListener_rch recv_listener_for(const GUID_t &sub_id) const
unique_ptr< MessageBlockAllocator > mb_allocator_
SendControlStatus send_control(const DataSampleHeader &header, Message_Block_Ptr data)
AssocByRemote assoc_by_remote_
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
virtual void pre_stop_i()
IdToRecvListenerMap recv_listeners_
virtual void release_reservations_i(const GUID_t &, const GUID_t &)
ACE_UINT64 id_
The id for this DataLink.
const char * to_string(MessageId value)
#define ACE_DES_FREE(POINTER, DEALLOCATOR, CLASS)
virtual long schedule_timer(ACE_Event_Handler *event_handler, const void *arg, const ACE_Time_Value &delay, const ACE_Time_Value &interval=ACE_Time_Value::zero)=0
RcHandle< T > lock() const
DataLink(const TransportImpl_rch &impl, Priority priority, bool is_loopback, bool is_active)
Only called by our TransportImpl object.
MonotonicTimePoint scheduled_to_stop_at_
virtual void notify_subscription_reconnected(const WriterIdSeq &pubids)=0
virtual void notify_subscription_lost(const WriterIdSeq &pubids)=0
#define ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY
void remove_all(const GUIDSeq &to_remove)
#define TheServiceParticipant
int handle_exception(ACE_HANDLE)
Reactor invokes this after being notified in schedule_stop or cancel_release.
void release_reservations(GUID_t remote_id, GUID_t local_id, DataLinkSetMap &released_locals)
The Internal API and Implementation of OpenDDS.
void data_received_i(ReceivedDataSample &sample, const GUID_t &readerId, const RepoIdSet &incl_excl, ReceiveListenerSet::ConstrainReceiveSet constrain)
Cached_Allocator_With_Overflow< ACE_Data_Block, ACE_Thread_Mutex > DataBlockAllocator
Base wrapper class around a data/control sample to be sent.
unique_ptr< DataBlockAllocator > db_allocator_
SendControlStatus
Return code type for send_control() operations.
virtual void release_datalink(DataLink *link)=0
TransportInst_rch config() const
virtual bool handle_send_request_ack(TransportQueueElement *element)
void invoke_on_start_callbacks(bool success)