OpenDDS  Snapshot(2023/04/28-20:55)
ShmemDataLink.cpp
Go to the documentation of this file.
1 /*
2  * Distributed under the OpenDDS License.
3  * See: http://www.opendds.org/license.html
4  */
5 
6 #include "ShmemDataLink.h"
7 
8 #include "ShmemTransport.h"
9 #include "ShmemInst.h"
10 #include "ShmemSendStrategy.h"
11 #include "ShmemReceiveStrategy.h"
12 
14 #include <dds/DCPS/GuidConverter.h>
15 #include <dds/DdsDcpsGuidTypeSupportImpl.h>
16 
17 #include <ace/Log_Msg.h>
18 
19 #include <cstdlib>
20 
21 #ifndef __ACE_INLINE__
22 # include "ShmemDataLink.inl"
23 #endif /* __ACE_INLINE__ */
24 
26 
27 namespace OpenDDS {
28 namespace DCPS {
29 
30 namespace {
31  const Encoding encoding_unaligned_native(Encoding::KIND_UNALIGNED_CDR);
32 }
33 
35  : DataLink(transport,
36  0, // priority
37  false, // is_loopback,
38  false) // is_active
39  , send_strategy_(make_rch<ShmemSendStrategy>(this))
40  , recv_strategy_(make_rch<ShmemReceiveStrategy>(this))
41  , peer_alloc_(0)
42  , reactor_task_(transport->reactor_task())
43 {
44 }
45 
46 bool
47 ShmemDataLink::open(const std::string& peer_address)
48 {
50  const ACE_TString name = ACE_TEXT_CHAR_TO_TCHAR(peer_address.c_str());
51 
52 #ifdef OPENDDS_SHMEM_WINDOWS
54  const ACE_TString name_under = name + ACE_TEXT('_');
55  // Find max size of peer's pool so enough local address space is reserved.
56  HANDLE fm = ACE_TEXT_CreateFileMapping(INVALID_HANDLE_VALUE, 0, PAGE_READONLY,
57  0, ACE_DEFAULT_PAGEFILE_POOL_CHUNK, name_under.c_str());
58  void* view;
59  if (fm == 0 || (view = MapViewOfFile(fm, FILE_MAP_READ, 0, 0, 0)) == 0) {
60  stop_i();
62  ACE_TEXT("(%P|%t) ERROR: ShmemDataLink::open: ")
63  ACE_TEXT("peer's shared memory area not found (%C)\n"),
64  peer_address.c_str()),
65  false);
66  }
67  // location of max_size_ in ctrl block: a size_t after two void*s
68  const size_t* pmax = (const size_t*)(((void**)view) + 2);
69  alloc_opts.max_size_ = *pmax;
70  UnmapViewOfFile(view);
71  CloseHandle(fm);
72 #endif
73 
74  peer_alloc_ = new ShmemAllocator(name.c_str(), 0 /*lock_name*/
75 #ifdef OPENDDS_SHMEM_WINDOWS
76  , &alloc_opts
77 #endif
78  );
79 
80  if (-1 == peer_alloc_->find("Semaphore")) {
81  stop_i();
83  ACE_TEXT("(%P|%t) ERROR: ShmemDataLink::open: ")
84  ACE_TEXT("peer's shared memory area not found (%C)\n"),
85  peer_address.c_str()),
86  false);
87  }
88 
89  if (start(static_rchandle_cast<TransportSendStrategy>(send_strategy_),
90  static_rchandle_cast<TransportStrategy>(recv_strategy_),
91  false)
92  != 0) {
93  stop_i();
95  ACE_TEXT("(%P|%t) ERROR: ")
96  ACE_TEXT("ShmemDataLink::open: start failed!\n")),
97  false);
98  }
99 
100  VDBG_LVL((LM_DEBUG, "(%P|%t) ShmemDataLink::open: link %@ open to peer %C\n",
101  this, peer_address_.c_str()), 1);
102 
103  assoc_resends_task_ = make_rch<SmPeriodicTask>(reactor_task_->interceptor(),
105  ShmemInst_rch cfg = config();
106  if (!cfg) {
107  return false;
108  }
109  assoc_resends_task_->enable(false, cfg->association_resend_period());
110 
111  return true;
112 }
113 
114 int ShmemDataLink::make_reservation(const GUID_t& remote_pub, const GUID_t& local_sub,
115  const TransportReceiveListener_wrch& receive_listener, bool reliable)
116 {
117  const int result = DataLink::make_reservation(remote_pub, local_sub, receive_listener, reliable);
118  if (result != 0) {
119  return result;
120  }
121 
122  // Tell writer we are ready and resend that message until we get a response.
124  if (assoc_resends_.insert(GuidPair(local_sub, remote_pub)).second) {
125  send_association_msg(local_sub, remote_pub);
126  }
127 
128  return 0;
129 }
130 
131 void
133 {
134  VDBG((LM_DEBUG, "(%P|%t) ShmemDataLink::send_association_msg from %C to %C\n",
135  LogGuid(local).c_str(), LogGuid(remote).c_str()));
136 
137  DataSampleHeader header_data;
138  header_data.message_id_ = REQUEST_ACK;
139  header_data.byte_order_ = ACE_CDR_BYTE_ORDER;
140  header_data.message_length_ = guid_cdr_size;
141  header_data.sequence_ = -1;
142  header_data.publication_id_ = local;
143  header_data.publisher_id_ = remote;
144 
145  Message_Block_Ptr message(
146  new ACE_Message_Block(header_data.get_max_serialized_size(),
148  0, //cont
149  0, //data
150  0, //allocator_strategy
151  0, //locking_strategy
155  0,
156  0));
157 
158  *message << header_data;
159  Serializer ser(message.get(), encoding_unaligned_native);
160  ser << remote;
162  TransportControlElement* send_element = new TransportControlElement(move(message));
163  this->send_i(send_element, false);
164 }
165 
167 {
168  VDBG((LM_DEBUG, "(%P|%t) ShmemDataLink::resend_association_msgs\n"));
169 
171  for (AssocResends::iterator i = assoc_resends_.begin(); i != assoc_resends_.end(); ++i) {
172  send_association_msg(i->local, i->remote);
173  }
174 }
175 
177 {
178  VDBG((LM_DEBUG, "(%P|%t) ShmemDataLink::stop_resend_association_msgs: "
179  "local %C remote %C\n", LogGuid(local).c_str(), LogGuid(remote).c_str()));
181  assoc_resends_.erase(GuidPair(local, remote));
182 }
183 
184 void
186 {
187  if (sample.header_.sequence_ == -1 && sample.header_.message_length_ == guid_cdr_size) {
188  VDBG((LM_DEBUG, "(%P|%t) ShmemDataLink::request_ack_received: association msg\n"));
189  GUID_t local;
190  Message_Block_Ptr payload(recv_strategy_->to_msgblock(sample));
191  Serializer ser(payload.get(), encoding_unaligned_native);
192  if (ser >> local) {
193  const GUID_t& remote = sample.header_.publication_id_;
194  const GuidConverter local_gc(local);
195  const bool local_is_writer = local_gc.isWriter();
196  VDBG((LM_DEBUG, "(%P|%t) ShmemDataLink::request_ack_received: "
197  "association msg from remote %C %C to local %C %C\n",
198  local_is_writer ? "reader" : "writer", LogGuid(remote).c_str(),
199  local_is_writer ? "writer" : "reader", std::string(local_gc).c_str()));
200  if (local_is_writer) {
201  // Reader has signaled it's ready to receive messages.
202  if (invoke_on_start_callbacks(local, remote, true)) {
203  // In case we're getting duplicates, only acknowledge if we can invoke
204  // the on start callback, which should only happen once.
205  send_association_msg(local, remote);
206  }
207  } else {
208  // Writer has responded to association ack, stop sending.
209  stop_resend_association_msgs(local, remote);
210  }
211  }
212  return;
213  }
214  data_received(sample);
215 }
216 
217 void
219 {
220 }
221 
222 void
224 {
225  {
227  assoc_resends_.clear();
228  assoc_resends_task_->disable();
229  }
230 
231  {
233  if (peer_alloc_) {
234  peer_alloc_->release(0 /*don't close*/);
235  }
236  delete peer_alloc_;
237  peer_alloc_ = 0;
238  }
239 }
240 
243 {
245 }
246 
249 {
251  return peer_alloc_;
252 }
253 
256 {
257  ShmemAllocator* result = 0;
259  return result;
260 }
261 
262 std::string
264 {
265  std::string result;
267  return result;
268 }
269 
270 void
272 {
274 }
275 
276 pid_t
278 {
279  return std::atoi(peer_address_.c_str() + peer_address_.find('-') + 1);
280 }
281 
284 {
285  return dynamic_rchandle_cast<ShmemInst>(transport()->config());
286 }
287 
288 } // namespace DCPS
289 } // namespace OpenDDS
290 
DataSampleHeader header_
The demarshalled sample header.
ACE_Message_Block * to_msgblock(const ReceivedDataSample &sample)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
int make_reservation(const GUID_t &remote_pub, const GUID_t &local_sub, const TransportReceiveListener_wrch &receive_listener, bool reliable)
const char * c_str(void) const
ShmemSendStrategy_rch send_strategy_
Definition: ShmemDataLink.h:81
TransportImpl_rch impl() const
Definition: DataLink.cpp:105
ACE_MEM_POOL_OPTIONS MEMORY_POOL_OPTIONS
ACE_Thread_Mutex assoc_resends_mutex_
Definition: ShmemDataLink.h:95
static const ACE_Time_Value max_time
char message_id_
The enum MessageId.
DCPS::RcHandle< SmPeriodicTask > assoc_resends_task_
Definition: ShmemDataLink.h:99
ShmemReceiveStrategy_rch recv_strategy_
Definition: ShmemDataLink.h:82
int data_received(ReceivedDataSample &sample, const GUID_t &readerId=GUID_UNKNOWN)
Definition: DataLink.cpp:690
void send_association_msg(const GUID_t &local, const GUID_t &remote)
const size_t guid_cdr_size
Definition: GuidUtils.h:115
RcHandle< T > make_rch()
Definition: RcHandle_T.h:256
int find(const char *name, void *&pointer)
virtual void send_i(TransportQueueElement *element, bool relink=true)
Definition: DataLink.inl:119
ACE_Malloc_T< ShmemPool, ACE_Process_Mutex, ACE_PI_Control_Block > ShmemAllocator
void resend_association_msgs(const MonotonicTimePoint &now)
ShmemAllocator * local_allocator()
ACE_Thread_Mutex peer_alloc_mutex_
Definition: ShmemDataLink.h:92
reference_wrapper< T > ref(T &r)
Definition: RcHandle_T.h:237
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
#define ACE_CDR_BYTE_ORDER
ShmemAllocator * peer_allocator()
#define ACE_DEFAULT_PAGEFILE_POOL_CHUNK
Conversion processing and value testing utilities for RTPS GUID_t types.
Definition: GuidConverter.h:62
void stop_resend_association_msgs(const GUID_t &local, const GUID_t &remote)
int start(const TransportSendStrategy_rch &send_strategy, const TransportStrategy_rch &receive_strategy, bool invoke_all=true)
Definition: DataLink.inl:212
LM_DEBUG
#define ACE_TEXT_CreateFileMapping
Class to serialize and deserialize data for DDS.
Definition: Serializer.h:369
static size_t get_max_serialized_size()
Similar to IDL compiler generated methods.
#define VDBG(DBG_ARGS)
Holds a data sample received by the transport.
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
void control_received(ReceivedDataSample &sample)
bool open(const std::string &peer_address)
ShmemDataLink(const ShmemTransport_rch &transport)
#define ACE_TEXT_CHAR_TO_TCHAR(STRING)
ShmemAllocator * peer_alloc_
Definition: ShmemDataLink.h:91
int release(int close=0)
const char *const name
Definition: debug.cpp:60
ACE_TEXT("TCP_Factory")
#define OPENDDS_TEST_AND_CALL(TYPE, TEST, CALL)
Definition: Definitions.h:75
void request_ack_received(ReceivedDataSample &sample)
ReactorInterceptor_rch interceptor() const
Definition: ReactorTask.inl:65
#define OPENDDS_TEST_AND_CALL_ASSIGN(TYPE, TEST, CALL, VAL)
Definition: Definitions.h:76
static const ACE_Time_Value zero
virtual int make_reservation(const GUID_t &remote_subscription_id, const GUID_t &local_publication_id, const TransportSendListener_wrch &send_listener, bool reliable)
Definition: DataLink.cpp:398
#define VDBG_LVL(DBG_ARGS, LEVEL)
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
ShmemInst_rch config() const
#define ACE_ERROR_RETURN(X, Y)
RcHandle< T > dynamic_rchandle_cast(const RcHandle< U > &h)
Definition: RcHandle_T.h:214
#define ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY
ShmemTransport_rch transport() const
bool isWriter() const
Returns true if the GUID represents a writer entity.
LM_ERROR
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
ReactorTask_rch reactor_task_
Definition: ShmemDataLink.h:93
void invoke_on_start_callbacks(bool success)
Definition: DataLink.cpp:194