15 #include <dds/DdsDcpsGuidTypeSupportImpl.h> 21 #ifndef __ACE_INLINE__ 42 , reactor_task_(transport->reactor_task())
52 #ifdef OPENDDS_SHMEM_WINDOWS 59 if (fm == 0 || (view = MapViewOfFile(fm, FILE_MAP_READ, 0, 0, 0)) == 0) {
62 ACE_TEXT(
"(%P|%t) ERROR: ShmemDataLink::open: ")
63 ACE_TEXT(
"peer's shared memory area not found (%C)\n"),
64 peer_address.c_str()),
68 const size_t* pmax = (
const size_t*)(((
void**)view) + 2);
69 alloc_opts.max_size_ = *pmax;
70 UnmapViewOfFile(view);
75 #ifdef OPENDDS_SHMEM_WINDOWS 83 ACE_TEXT(
"(%P|%t) ERROR: ShmemDataLink::open: ")
84 ACE_TEXT(
"peer's shared memory area not found (%C)\n"),
85 peer_address.c_str()),
95 ACE_TEXT(
"(%P|%t) ERROR: ")
96 ACE_TEXT(
"ShmemDataLink::open: start failed!\n")),
100 VDBG_LVL((
LM_DEBUG,
"(%P|%t) ShmemDataLink::open: link %@ open to peer %C\n",
134 VDBG((
LM_DEBUG,
"(%P|%t) ShmemDataLink::send_association_msg from %C to %C\n",
158 *message << header_data;
163 this->
send_i(send_element,
false);
168 VDBG((
LM_DEBUG,
"(%P|%t) ShmemDataLink::resend_association_msgs\n"));
178 VDBG((
LM_DEBUG,
"(%P|%t) ShmemDataLink::stop_resend_association_msgs: " 179 "local %C remote %C\n",
LogGuid(local).c_str(),
LogGuid(remote).c_str()));
188 VDBG((
LM_DEBUG,
"(%P|%t) ShmemDataLink::request_ack_received: association msg\n"));
191 Serializer ser(payload.get(), encoding_unaligned_native);
195 const bool local_is_writer = local_gc.
isWriter();
196 VDBG((
LM_DEBUG,
"(%P|%t) ShmemDataLink::request_ack_received: " 197 "association msg from remote %C %C to local %C %C\n",
198 local_is_writer ?
"reader" :
"writer",
LogGuid(remote).c_str(),
199 local_is_writer ?
"writer" :
"reader", std::string(local_gc).c_str()));
200 if (local_is_writer) {
DataSampleHeader header_
The demarshalled sample header.
ACE_Message_Block * to_msgblock(const ReceivedDataSample &sample)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
int make_reservation(const GUID_t &remote_pub, const GUID_t &local_sub, const TransportReceiveListener_wrch &receive_listener, bool reliable)
const char * c_str(void) const
ShmemSendStrategy_rch send_strategy_
TransportImpl_rch impl() const
ACE_MEM_POOL_OPTIONS MEMORY_POOL_OPTIONS
ACE_Thread_Mutex assoc_resends_mutex_
static const ACE_Time_Value max_time
char message_id_
The enum MessageId.
DCPS::RcHandle< SmPeriodicTask > assoc_resends_task_
ShmemReceiveStrategy_rch recv_strategy_
void link_released(bool flag)
int data_received(ReceivedDataSample &sample, const GUID_t &readerId=GUID_UNKNOWN)
void send_association_msg(const GUID_t &local, const GUID_t &remote)
const size_t guid_cdr_size
int find(const char *name, void *&pointer)
virtual void send_i(TransportQueueElement *element, bool relink=true)
ACE_Malloc_T< ShmemPool, ACE_Process_Mutex, ACE_PI_Control_Block > ShmemAllocator
std::string local_address()
void resend_association_msgs(const MonotonicTimePoint &now)
ShmemAllocator * local_allocator()
ACE_Thread_Mutex peer_alloc_mutex_
reference_wrapper< T > ref(T &r)
std::string peer_address()
T::rv_reference move(T &p)
#define ACE_CDR_BYTE_ORDER
ShmemAllocator * peer_allocator()
#define ACE_DEFAULT_PAGEFILE_POOL_CHUNK
Conversion processing and value testing utilities for RTPS GUID_t types.
void stop_resend_association_msgs(const GUID_t &local, const GUID_t &remote)
int start(const TransportSendStrategy_rch &send_strategy, const TransportStrategy_rch &receive_strategy, bool invoke_all=true)
#define ACE_TEXT_CreateFileMapping
Class to serialize and deserialize data for DDS.
static size_t get_max_serialized_size()
Similar to IDL compiler generated methods.
Holds a data sample received by the transport.
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
void control_received(ReceivedDataSample &sample)
bool open(const std::string &peer_address)
std::string peer_address_
ShmemDataLink(const ShmemTransport_rch &transport)
ACE_UINT32 message_length_
#define ACE_TEXT_CHAR_TO_TCHAR(STRING)
ShmemAllocator * peer_alloc_
#define OPENDDS_TEST_AND_CALL(TYPE, TEST, CALL)
void request_ack_received(ReceivedDataSample &sample)
ReactorInterceptor_rch interceptor() const
#define OPENDDS_TEST_AND_CALL_ASSIGN(TYPE, TEST, CALL, VAL)
static const ACE_Time_Value zero
virtual int make_reservation(const GUID_t &remote_subscription_id, const GUID_t &local_publication_id, const TransportSendListener_wrch &send_listener, bool reliable)
#define VDBG_LVL(DBG_ARGS, LEVEL)
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
ShmemInst_rch config() const
#define ACE_ERROR_RETURN(X, Y)
RcHandle< T > dynamic_rchandle_cast(const RcHandle< U > &h)
#define ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY
ShmemTransport_rch transport() const
bool isWriter() const
Returns true if the GUID represents a writer entity.
AssocResends assoc_resends_
The Internal API and Implementation of OpenDDS.
ReactorTask_rch reactor_task_
void invoke_on_start_callbacks(bool success)