16 #include "dds/DdsDcpsGuidTypeSupportImpl.h" 19 #if !defined (__ACE_INLINE__) 25 const Encoding encoding_unaligned_native(Encoding::KIND_UNALIGNED_CDR);
36 :
DataLink(transport_impl, priority, is_loopback, is_active)
37 , remote_address_(remote_address)
38 , graceful_disconnect_sent_(false)
39 , release_is_pending_(false)
63 connection->disconnect();
102 strategy->remove_all_msgs(local_id);
121 bool disconnected = rs->gracefully_disconnected();
130 connection->shutdown();
152 "(%P|%t) ERROR: TcpDataLink::connect failed to set " 153 "ACE_NONBLOCK %p\n",
ACE_TEXT(
"enable")), -1);
161 if (this->
start(send_strategy, receive_strategy,
false) != 0) {
197 if (old_connection) {
199 "trying to reuse existing connection\n"), 0);
200 old_connection->transfer(connection.
in());
220 int rs_result = rs->
reset(0, connection.
in());
224 int ss_result = ss->
reset(
true);
226 if (rs_result == 0 && ss_result == 0) {
246 if (!existing_connection) {
248 "(%P|%t) ERROR: TcpDataLink::reconnect old connection is nil.\n")
253 existing_connection->transfer(connection.
in());
255 bool released =
false;
289 int rs_result = trs->
reset(existing_connection.in(), connection.
in());
293 int ss_result = tss->
reset();
295 if (rs_result == 0 && ss_result == 0) {
306 DBG_ENTRY_LVL(
"TcpDataLink",
"send_graceful_disconnect_message",6);
367 *message << header_data;
373 this->
send_i(send_element,
false);
392 ACE_DEBUG((
LM_DEBUG,
ACE_TEXT(
"(%P|%t) TcpDataLink::handle_send_request_ack(%@) sequence number %q, publication_id=%C\n"),
412 if (sequence == -1) {
425 PendingRequestAcks::iterator it;
487 *message << header_data;
494 this->
send_i(send_element,
false);
502 typedef std::vector<std::pair<GUID_t, GUID_t> > PairVec;
503 PairVec to_call_and_send;
513 for (RepoToClientMap::const_iterator it2 = it->second.begin(); it2 != it->second.end(); ++it2) {
515 to_call_and_send.push_back(std::make_pair(it2->first, it->first));
523 for (PairVec::const_iterator it = to_call_and_send.begin(); it != to_call_and_send.end(); ++it) {
553 *message << header_data;
559 this->
send_i(send_element,
false);
566 PendingRequestAcks::iterator it;
568 (*it)->data_dropped(
true);
589 const GUID_t& local_publication_id,
604 const GUID_t& local_subscription_id,
DataSampleHeader header_
The demarshalled sample header.
RcHandle< T > rchandle_from(T *pointer)
virtual SequenceNumber sequence() const
bool handle_send_request_ack(TransportQueueElement *element)
TransportImpl_rch impl() const
WeakRcHandle< TcpConnection > connection_
static const ACE_Time_Value max_time
char message_id_
The enum MessageId.
void request_ack_received(const ReceivedDataSample &sample)
size_t length(void) const
ACE_Thread_Mutex stopped_clients_mutex_
bool is_active_
Is pub or sub ?
bool graceful_disconnect_sent_
bool data_dropped(bool dropped_by_transport=false)
void link_released(bool flag)
bool isReader() const
Returns true if the GUID represents a reader entity.
const size_t guid_cdr_size
int reset(bool reset_mode=false)
PendingRequestAcks pending_request_acks_
const char * c_str() const
virtual void send_i(TransportQueueElement *element, bool relink=true)
virtual void send_i(TransportQueueElement *element, bool relink=true)
void send_graceful_disconnect_message()
T::rv_reference move(T &p)
#define ACE_CDR_BYTE_ORDER
Conversion processing and value testing utilities for RTPS GUID_t types.
int start(const TransportSendStrategy_rch &send_strategy, const TransportStrategy_rch &receive_strategy, bool invoke_all=true)
void ack_received(const ReceivedDataSample &sample)
TcpDataLink(const TcpTransport_rch &transport_impl, const ACE_INET_Addr &remote_address, Priority priority, bool is_loopback, bool is_active)
void send_association_msg(const GUID_t &local, const GUID_t &remote)
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
TransportStrategy_rch receive_strategy_
The transport receive strategy object for this DataLink.
virtual void pre_stop_i()
virtual void send_stop_i(GUID_t repoId)
int make_reservation(const GUID_t &remote_subscription_id, const GUID_t &local_publication_id, const TransportSendListener_wrch &send_listener, bool reliable)
void drop_pending_request_acks()
Class to serialize and deserialize data for DDS.
static size_t get_max_serialized_size()
Similar to IDL compiler generated methods.
int connect(const TcpConnection_rch &connection, const RcHandle< TcpSendStrategy > &send_strategy, const RcHandle< TcpReceiveStrategy > &receive_strategy)
int reconnect(const TcpConnection_rch &connection)
TcpSendStrategy_rch send_strategy()
Holds a data sample received by the transport.
TransportSendStrategy_rch send_strategy_
The transport send strategy object for this DataLink.
AtomicBool release_is_pending_
TcpReceiveStrategy_rch receive_strategy()
bool check_active_client(const GUID_t &local_id)
void send_stop_i(GUID_t repoId)
char * wr_ptr(void) const
ACE_UINT32 message_length_
void set_release_pending(bool flag)
Set release pending flag.
RepoIdSetType stopped_clients_
void terminate_send(bool graceful_disconnecting=false)
Remove all samples in the backpressure queue and packet queue.
ACE_SYNCH_MUTEX pending_request_acks_lock_
void client_stop(const GUID_t &local_id)
void do_association_actions()
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Sequence number abstraction. Only allows positive 64 bit values.
OnStartCallbackMap on_start_callbacks_
static const ACE_Time_Value zero
RcHandle< T > static_rchandle_cast(const RcHandle< U > &h)
virtual int make_reservation(const GUID_t &remote_subscription_id, const GUID_t &local_publication_id, const TransportSendListener_wrch &send_listener, bool reliable)
#define VDBG_LVL(DBG_ARGS, LEVEL)
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
virtual void pre_stop_i()
virtual GUID_t publication_id() const =0
Accessor for the publication id that sent the sample.
int connect_tcp_datalink(TcpDataLink &link, const TcpConnection_rch &connection)
Common code used by accept_datalink(), passive_connection(), and active completion.
#define ACE_ERROR_RETURN(X, Y)
RcHandle< T > dynamic_rchandle_cast(const RcHandle< U > &h)
bool is_release_pending() const
Get release pending flag.
#define ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY
int reuse_existing_connection(const TcpConnection_rch &connection)
int reset(TcpConnection *old_connection, TcpConnection *new_connection)
Base wrapper class around a data/control sample to be sent.
void invoke_on_start_callbacks(bool success)