Line data Source code
1 : /*
2 : *
3 : *
4 : * Distributed under the OpenDDS License.
5 : * See: http://www.opendds.org/license.html
6 : */
7 :
8 : #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
9 : #include "TransportInst.h"
10 : #include "TransportImpl.h"
11 : #include "TransportExceptions.h"
12 : #include "EntryExit.h"
13 : #include "DCPS/SafetyProfileStreams.h"
14 :
15 : #include "ace/Configuration.h"
16 :
17 : #include <cstring>
18 : #include <algorithm>
19 :
20 : #ifdef OPENDDS_TESTING_FEATURES
21 : #include <cstdlib>
22 : #ifdef _MSC_VER
23 : #define OPENDDS_DRAND48() (rand()*(1./RAND_MAX))
24 : #else
25 : #define OPENDDS_DRAND48 drand48
26 : #endif
27 : #endif
28 :
29 : #if !defined (__ACE_INLINE__)
30 : # include "TransportInst.inl"
31 : #endif /* !__ACE_INLINE__ */
32 :
33 : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
34 :
35 0 : OpenDDS::DCPS::TransportInst::~TransportInst()
36 : {
37 : DBG_ENTRY_LVL("TransportInst","~TransportInst",6);
38 0 : }
39 :
40 : int
41 0 : OpenDDS::DCPS::TransportInst::load(ACE_Configuration_Heap& cf,
42 : ACE_Configuration_Section_Key& sect)
43 : {
44 0 : GET_CONFIG_VALUE(cf, sect, ACE_TEXT("queue_messages_per_pool"), queue_messages_per_pool_, size_t)
45 0 : GET_CONFIG_VALUE(cf, sect, ACE_TEXT("queue_initial_pools"), queue_initial_pools_, size_t)
46 0 : GET_CONFIG_VALUE(cf, sect, ACE_TEXT("max_packet_size"), max_packet_size_, ACE_UINT32)
47 0 : GET_CONFIG_VALUE(cf, sect, ACE_TEXT("max_samples_per_packet"), max_samples_per_packet_, size_t)
48 0 : GET_CONFIG_VALUE(cf, sect, ACE_TEXT("optimum_packet_size"), optimum_packet_size_, ACE_UINT32)
49 0 : GET_CONFIG_VALUE(cf, sect, ACE_TEXT("thread_per_connection"), thread_per_connection_, bool)
50 0 : GET_CONFIG_VALUE(cf, sect, ACE_TEXT("datalink_release_delay"), datalink_release_delay_, int)
51 0 : GET_CONFIG_TIME_VALUE(cf, sect, ACE_TEXT("fragment_reassembly_timeout"), fragment_reassembly_timeout_);
52 :
53 : // Undocumented - this option is not in the Developer's Guide
54 : // Controls the number of chunks in the allocators used by the datalink
55 : // for control messages.
56 0 : GET_CONFIG_VALUE(cf, sect, ACE_TEXT("datalink_control_chunks"), datalink_control_chunks_, size_t)
57 :
58 0 : GET_CONFIG_VALUE(cf, sect, ACE_TEXT("receive_preallocated_message_blocks"), receive_preallocated_message_blocks_, size_t)
59 0 : GET_CONFIG_VALUE(cf, sect, ACE_TEXT("receive_preallocated_data_blocks"), receive_preallocated_data_blocks_, size_t)
60 :
61 0 : ACE_TString stringvalue;
62 0 : if (cf.get_string_value (sect, ACE_TEXT("passive_connect_duration"), stringvalue) == 0) {
63 0 : ACE_DEBUG ((LM_WARNING,
64 : ACE_TEXT ("(%P|%t) WARNING: passive_connect_duration option ")
65 : ACE_TEXT ("is deprecated in the transport inst, must be ")
66 : ACE_TEXT ("defined in transport config.\n")));
67 : }
68 :
69 0 : adjust_config_value();
70 0 : return 0;
71 0 : }
72 :
73 : void
74 0 : OpenDDS::DCPS::TransportInst::dump() const
75 : {
76 0 : ACE_DEBUG((LM_DEBUG,
77 : ACE_TEXT("\n(%P|%t) TransportInst::dump() -\n%C"),
78 : dump_to_str().c_str()));
79 0 : }
80 :
81 : namespace {
82 : static const int NAME_INDENT(3);
83 : static const int NAME_WIDTH(30); // Includes ":"
84 : }
85 :
86 : OPENDDS_STRING
87 0 : OpenDDS::DCPS::TransportInst::formatNameForDump(const char* name)
88 : {
89 0 : OPENDDS_STRING formatted_name;
90 0 : formatted_name.reserve(NAME_INDENT + NAME_WIDTH);
91 0 : formatted_name += OPENDDS_STRING(NAME_INDENT, ' ');
92 0 : formatted_name += name;
93 0 : formatted_name += ":";
94 0 : if ((NAME_WIDTH + NAME_INDENT) > formatted_name.length()) {
95 0 : formatted_name += OPENDDS_STRING((NAME_WIDTH + NAME_INDENT- formatted_name.length()), ' ');
96 : }
97 0 : return formatted_name;
98 0 : }
99 :
100 : OPENDDS_STRING
101 0 : OpenDDS::DCPS::TransportInst::dump_to_str() const
102 : {
103 0 : OPENDDS_STRING ret;
104 0 : ret += formatNameForDump("transport_type") + transport_type_ + '\n';
105 0 : ret += formatNameForDump("name") + name_ + '\n';
106 0 : ret += formatNameForDump("queue_messages_per_pool") + to_dds_string(unsigned(queue_messages_per_pool_)) + '\n';
107 0 : ret += formatNameForDump("queue_initial_pools") + to_dds_string(unsigned(queue_initial_pools_)) + '\n';
108 0 : ret += formatNameForDump("max_packet_size") + to_dds_string(unsigned(max_packet_size_)) + '\n';
109 0 : ret += formatNameForDump("max_samples_per_packet") + to_dds_string(unsigned(max_samples_per_packet_)) + '\n';
110 0 : ret += formatNameForDump("optimum_packet_size") + to_dds_string(unsigned(optimum_packet_size_)) + '\n';
111 0 : ret += formatNameForDump("thread_per_connection") + (thread_per_connection_ ? "true" : "false") + '\n';
112 0 : ret += formatNameForDump("datalink_release_delay") + to_dds_string(datalink_release_delay_) + '\n';
113 0 : ret += formatNameForDump("datalink_control_chunks") + to_dds_string(unsigned(datalink_control_chunks_)) + '\n';
114 0 : ret += formatNameForDump("fragment_reassembly_timeout") + fragment_reassembly_timeout_.str() + '\n';
115 0 : ret += formatNameForDump("receive_preallocated_message_blocks") + to_dds_string(unsigned(receive_preallocated_message_blocks_)) + '\n';
116 0 : ret += formatNameForDump("receive_preallocated_data_blocks") + to_dds_string(unsigned(receive_preallocated_data_blocks_)) + '\n';
117 0 : return ret;
118 0 : }
119 :
120 : void
121 0 : OpenDDS::DCPS::TransportInst::shutdown()
122 : {
123 0 : TransportImpl_rch impl;
124 : {
125 0 : ACE_GUARD(ACE_SYNCH_MUTEX, g, lock_);
126 0 : impl_.swap(impl);
127 0 : shutting_down_ = true;
128 0 : }
129 0 : if (impl) {
130 0 : impl->shutdown();
131 : }
132 0 : }
133 :
134 : OpenDDS::DCPS::TransportImpl_rch
135 0 : OpenDDS::DCPS::TransportInst::get_or_create_impl()
136 : {
137 0 : ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, g, lock_, TransportImpl_rch());
138 0 : if (!impl_ && !shutting_down_) {
139 : try {
140 0 : impl_ = new_impl();
141 0 : } catch (const OpenDDS::DCPS::Transport::UnableToCreate&) {
142 0 : return TransportImpl_rch();
143 0 : }
144 : }
145 0 : return impl_;
146 0 : }
147 :
148 : OpenDDS::DCPS::TransportImpl_rch
149 0 : OpenDDS::DCPS::TransportInst::get_impl()
150 : {
151 0 : ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, g, lock_, TransportImpl_rch());
152 0 : return impl_;
153 0 : }
154 :
155 : void
156 0 : OpenDDS::DCPS::TransportInst::set_port_in_addr_string(OPENDDS_STRING& addr_str, u_short port_number)
157 : {
158 : #ifdef BUFSIZE
159 : #undef BUFSIZE
160 : #endif
161 0 : const int BUFSIZE=1024;
162 : char result[BUFSIZE];
163 :
164 0 : if (std::count(addr_str.begin(), addr_str.end(), ':') < 2) {
165 0 : OPENDDS_STRING::size_type pos = addr_str.find_last_of(":");
166 0 : ACE_OS::snprintf(result, BUFSIZE, "%.*s:%hu", static_cast<int>(pos), addr_str.c_str(), port_number);
167 : }
168 : else {
169 : // this is the numeric form of ipv6 address because it has more than one ':'
170 0 : if (addr_str[0] != '[') {
171 0 : ACE_OS::snprintf(result, BUFSIZE, "[%s]:%hu", addr_str.c_str(), port_number);
172 : }
173 : else {
174 0 : OPENDDS_STRING::size_type pos = addr_str.find_last_of("]");
175 0 : ACE_OS::snprintf(result, BUFSIZE, "%.*s:%hu", static_cast<int>(pos+1), addr_str.c_str(), port_number);
176 : }
177 : }
178 0 : addr_str = result;
179 0 : }
180 :
181 : OpenDDS::DCPS::WeakRcHandle<OpenDDS::ICE::Endpoint>
182 0 : OpenDDS::DCPS::TransportInst::get_ice_endpoint()
183 : {
184 0 : const OpenDDS::DCPS::TransportImpl_rch temp = get_or_create_impl();
185 0 : return temp ? temp->get_ice_endpoint() : OpenDDS::DCPS::WeakRcHandle<OpenDDS::ICE::Endpoint>();
186 0 : }
187 :
188 : void
189 0 : OpenDDS::DCPS::TransportInst::rtps_relay_only_now(bool flag)
190 : {
191 0 : const OpenDDS::DCPS::TransportImpl_rch temp = get_or_create_impl();
192 0 : if (temp) {
193 0 : temp->rtps_relay_only_now(flag);
194 : }
195 0 : }
196 :
197 : void
198 0 : OpenDDS::DCPS::TransportInst::use_rtps_relay_now(bool flag)
199 : {
200 0 : const OpenDDS::DCPS::TransportImpl_rch temp = get_or_create_impl();
201 0 : if (temp) {
202 0 : temp->use_rtps_relay_now(flag);
203 : }
204 0 : }
205 :
206 : void
207 0 : OpenDDS::DCPS::TransportInst::use_ice_now(bool flag)
208 : {
209 0 : const OpenDDS::DCPS::TransportImpl_rch temp = get_or_create_impl();
210 0 : if (temp) {
211 0 : temp->use_ice_now(flag);
212 : }
213 0 : }
214 :
215 : OpenDDS::DCPS::ReactorTask_rch
216 0 : OpenDDS::DCPS::TransportInst::reactor_task()
217 : {
218 0 : const OpenDDS::DCPS::TransportImpl_rch temp = get_or_create_impl();
219 0 : return temp ? temp->reactor_task() : OpenDDS::DCPS::ReactorTask_rch();
220 0 : }
221 :
222 : OpenDDS::DCPS::EventDispatcher_rch
223 0 : OpenDDS::DCPS::TransportInst::event_dispatcher()
224 : {
225 0 : const TransportImpl_rch temp = get_or_create_impl();
226 0 : return temp ? temp->event_dispatcher() : EventDispatcher_rch();
227 0 : }
228 :
229 : bool
230 0 : OpenDDS::DCPS::TransportInst::should_drop(ssize_t length) const
231 : {
232 : #ifdef OPENDDS_TESTING_FEATURES
233 : ACE_GUARD_RETURN(ACE_Thread_Mutex, g, config_lock_, false);
234 : return drop_messages_ && (OPENDDS_DRAND48() < (length * drop_messages_m_ + drop_messages_b_));
235 : #else
236 : ACE_UNUSED_ARG(length);
237 0 : ACE_ERROR((LM_ERROR,
238 : "(%P|%t) ERROR: TransportInst::should_drop: "
239 : "caller not conditioned on OPENDDS_TESTING_FEATURES\n"));
240 0 : return false;
241 : #endif
242 : }
243 :
244 : OPENDDS_END_VERSIONED_NAMESPACE_DECL
|