OpenDDS  Snapshot(2023/04/28-20:55)
TransportInst.cpp
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
9 #include "TransportInst.h"
10 #include "TransportImpl.h"
11 #include "TransportExceptions.h"
12 #include "EntryExit.h"
14 
15 #include "ace/Configuration.h"
16 
17 #include <cstring>
18 #include <algorithm>
19 
20 #ifdef OPENDDS_TESTING_FEATURES
21 #include <cstdlib>
22 #ifdef _MSC_VER
23 #define OPENDDS_DRAND48() (rand()*(1./RAND_MAX))
24 #else
25 #define OPENDDS_DRAND48 drand48
26 #endif
27 #endif
28 
29 #if !defined (__ACE_INLINE__)
30 # include "TransportInst.inl"
31 #endif /* !__ACE_INLINE__ */
32 
34 
36 {
37  DBG_ENTRY_LVL("TransportInst","~TransportInst",6);
38 }
39 
40 int
43 {
44  GET_CONFIG_VALUE(cf, sect, ACE_TEXT("queue_messages_per_pool"), queue_messages_per_pool_, size_t)
45  GET_CONFIG_VALUE(cf, sect, ACE_TEXT("queue_initial_pools"), queue_initial_pools_, size_t)
46  GET_CONFIG_VALUE(cf, sect, ACE_TEXT("max_packet_size"), max_packet_size_, ACE_UINT32)
47  GET_CONFIG_VALUE(cf, sect, ACE_TEXT("max_samples_per_packet"), max_samples_per_packet_, size_t)
48  GET_CONFIG_VALUE(cf, sect, ACE_TEXT("optimum_packet_size"), optimum_packet_size_, ACE_UINT32)
49  GET_CONFIG_VALUE(cf, sect, ACE_TEXT("thread_per_connection"), thread_per_connection_, bool)
50  GET_CONFIG_VALUE(cf, sect, ACE_TEXT("datalink_release_delay"), datalink_release_delay_, int)
51  GET_CONFIG_TIME_VALUE(cf, sect, ACE_TEXT("fragment_reassembly_timeout"), fragment_reassembly_timeout_);
52 
53  // Undocumented - this option is not in the Developer's Guide
54  // Controls the number of chunks in the allocators used by the datalink
55  // for control messages.
56  GET_CONFIG_VALUE(cf, sect, ACE_TEXT("datalink_control_chunks"), datalink_control_chunks_, size_t)
57 
58  GET_CONFIG_VALUE(cf, sect, ACE_TEXT("receive_preallocated_message_blocks"), receive_preallocated_message_blocks_, size_t)
59  GET_CONFIG_VALUE(cf, sect, ACE_TEXT("receive_preallocated_data_blocks"), receive_preallocated_data_blocks_, size_t)
60 
61  ACE_TString stringvalue;
62  if (cf.get_string_value (sect, ACE_TEXT("passive_connect_duration"), stringvalue) == 0) {
64  ACE_TEXT ("(%P|%t) WARNING: passive_connect_duration option ")
65  ACE_TEXT ("is deprecated in the transport inst, must be ")
66  ACE_TEXT ("defined in transport config.\n")));
67  }
68 
70  return 0;
71 }
72 
73 void
75 {
77  ACE_TEXT("\n(%P|%t) TransportInst::dump() -\n%C"),
78  dump_to_str().c_str()));
79 }
80 
81 namespace {
82  static const int NAME_INDENT(3);
83  static const int NAME_WIDTH(30); // Includes ":"
84 }
85 
88 {
89  OPENDDS_STRING formatted_name;
90  formatted_name.reserve(NAME_INDENT + NAME_WIDTH);
91  formatted_name += OPENDDS_STRING(NAME_INDENT, ' ');
92  formatted_name += name;
93  formatted_name += ":";
94  if ((NAME_WIDTH + NAME_INDENT) > formatted_name.length()) {
95  formatted_name += OPENDDS_STRING((NAME_WIDTH + NAME_INDENT- formatted_name.length()), ' ');
96  }
97  return formatted_name;
98 }
99 
102 {
103  OPENDDS_STRING ret;
104  ret += formatNameForDump("transport_type") + transport_type_ + '\n';
105  ret += formatNameForDump("name") + name_ + '\n';
106  ret += formatNameForDump("queue_messages_per_pool") + to_dds_string(unsigned(queue_messages_per_pool_)) + '\n';
107  ret += formatNameForDump("queue_initial_pools") + to_dds_string(unsigned(queue_initial_pools_)) + '\n';
108  ret += formatNameForDump("max_packet_size") + to_dds_string(unsigned(max_packet_size_)) + '\n';
109  ret += formatNameForDump("max_samples_per_packet") + to_dds_string(unsigned(max_samples_per_packet_)) + '\n';
110  ret += formatNameForDump("optimum_packet_size") + to_dds_string(unsigned(optimum_packet_size_)) + '\n';
111  ret += formatNameForDump("thread_per_connection") + (thread_per_connection_ ? "true" : "false") + '\n';
112  ret += formatNameForDump("datalink_release_delay") + to_dds_string(datalink_release_delay_) + '\n';
113  ret += formatNameForDump("datalink_control_chunks") + to_dds_string(unsigned(datalink_control_chunks_)) + '\n';
114  ret += formatNameForDump("fragment_reassembly_timeout") + fragment_reassembly_timeout_.str() + '\n';
115  ret += formatNameForDump("receive_preallocated_message_blocks") + to_dds_string(unsigned(receive_preallocated_message_blocks_)) + '\n';
116  ret += formatNameForDump("receive_preallocated_data_blocks") + to_dds_string(unsigned(receive_preallocated_data_blocks_)) + '\n';
117  return ret;
118 }
119 
120 void
122 {
123  TransportImpl_rch impl;
124  {
126  impl_.swap(impl);
127  shutting_down_ = true;
128  }
129  if (impl) {
130  impl->shutdown();
131  }
132 }
133 
136 {
138  if (!impl_ && !shutting_down_) {
139  try {
140  impl_ = new_impl();
142  return TransportImpl_rch();
143  }
144  }
145  return impl_;
146 }
147 
150 {
152  return impl_;
153 }
154 
155 void
157 {
158 #ifdef BUFSIZE
159 #undef BUFSIZE
160 #endif
161  const int BUFSIZE=1024;
162  char result[BUFSIZE];
163 
164  if (std::count(addr_str.begin(), addr_str.end(), ':') < 2) {
165  OPENDDS_STRING::size_type pos = addr_str.find_last_of(":");
166  ACE_OS::snprintf(result, BUFSIZE, "%.*s:%hu", static_cast<int>(pos), addr_str.c_str(), port_number);
167  }
168  else {
169  // this is the numeric form of ipv6 address because it has more than one ':'
170  if (addr_str[0] != '[') {
171  ACE_OS::snprintf(result, BUFSIZE, "[%s]:%hu", addr_str.c_str(), port_number);
172  }
173  else {
174  OPENDDS_STRING::size_type pos = addr_str.find_last_of("]");
175  ACE_OS::snprintf(result, BUFSIZE, "%.*s:%hu", static_cast<int>(pos+1), addr_str.c_str(), port_number);
176  }
177  }
178  addr_str = result;
179 }
180 
183 {
186 }
187 
188 void
190 {
192  if (temp) {
193  temp->rtps_relay_only_now(flag);
194  }
195 }
196 
197 void
199 {
201  if (temp) {
202  temp->use_rtps_relay_now(flag);
203  }
204 }
205 
206 void
208 {
210  if (temp) {
211  temp->use_ice_now(flag);
212  }
213 }
214 
217 {
219  return temp ? temp->reactor_task() : OpenDDS::DCPS::ReactorTask_rch();
220 }
221 
224 {
225  const TransportImpl_rch temp = get_or_create_impl();
226  return temp ? temp->event_dispatcher() : EventDispatcher_rch();
227 }
228 
229 bool
231 {
232 #ifdef OPENDDS_TESTING_FEATURES
234  return drop_messages_ && (OPENDDS_DRAND48() < (length * drop_messages_m_ + drop_messages_b_));
235 #else
236  ACE_UNUSED_ARG(length);
238  "(%P|%t) ERROR: TransportInst::should_drop: "
239  "caller not conditioned on OPENDDS_TESTING_FEATURES\n"));
240  return false;
241 #endif
242 }
243 
#define ACE_DEBUG(X)
TransportImpl_rch get_or_create_impl()
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
#define ACE_SYNCH_MUTEX
ACE_UINT32 optimum_packet_size_
Optimum size (in bytes) of a packet (packet header + sample(s)).
const OPENDDS_STRING name_
virtual int get_string_value(const ACE_Configuration_Section_Key &key, const ACE_TCHAR *name, ACE_TString &value)
void set_port_in_addr_string(OPENDDS_STRING &addr_str, u_short port_number)
virtual void rtps_relay_only_now(bool)
virtual void use_rtps_relay_now(bool)
RcHandle< EventDispatcher > EventDispatcher_rch
virtual void use_ice_now(bool)
int snprintf(char *buf, size_t maxlen, const char *format,...) ACE_GCC_FORMAT_ATTRIBUTE(printf
String to_dds_string(unsigned short to_convert)
EventDispatcher_rch event_dispatcher()
void dump() const
Diagnostic aid.
const OPENDDS_STRING & name() const
Definition: TransportInst.h:70
void rtps_relay_only_now(bool flag)
int ssize_t
ACE_Thread_Mutex config_lock_
RcHandle< TransportImpl > TransportImpl_rch
The type definition for the smart-pointer to the underlying type.
bool should_drop(ssize_t length) const
size_t max_samples_per_packet_
Max number of samples that should ever be in a single packet.
Definition: TransportInst.h:99
#define OPENDDS_STRING
const OPENDDS_STRING transport_type_
Definition: TransportInst.h:85
LM_DEBUG
TimeDuration fragment_reassembly_timeout_
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
#define GET_CONFIG_VALUE(CF, SECT, KEY, VALUE, TYPE)
Definition: TransportDefs.h:45
virtual TransportImpl_rch new_impl()=0
EventDispatcher_rch event_dispatcher()
void swap(RcHandle &rhs)
Definition: RcHandle_T.h:102
LM_WARNING
ACE_UINT32 max_packet_size_
Max size (in bytes) of a packet (packet header + sample(s))
Definition: TransportInst.h:96
#define GET_CONFIG_TIME_VALUE(CF, SECT, KEY, VALUE)
ACE_TEXT("TCP_Factory")
ReactorTask_rch reactor_task()
virtual int load(ACE_Configuration_Heap &cf, ACE_Configuration_Section_Key &sect)
ReactorTask_rch reactor_task()
TransportImpl_rch get_impl()
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
String str(unsigned decimal_places=3, bool just_sec=false) const
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
DCPS::WeakRcHandle< ICE::Endpoint > get_ice_endpoint()
static OPENDDS_STRING formatNameForDump(const char *name)
LM_ERROR
RcHandle< ReactorTask > ReactorTask_rch
The type definition for the smart-pointer to the underlying type.
void use_rtps_relay_now(bool flag)
virtual WeakRcHandle< ICE::Endpoint > get_ice_endpoint()
virtual OPENDDS_STRING dump_to_str() const