00001
00002
00003
00004
00005
00006
00007
00008 #ifndef OPENDDS_DCPS_DATALINK_H
00009 #define OPENDDS_DCPS_DATALINK_H
00010
00011 #include "dds/DCPS/dcps_export.h"
00012 #include "dds/DCPS/Definitions.h"
00013 #include "dds/DCPS/RcObject_T.h"
00014 #include "dds/DCPS/PoolAllocator.h"
00015 #include "ReceiveListenerSetMap.h"
00016 #include "SendResponseListener.h"
00017 #include "TransportDefs.h"
00018 #include "TransportImpl_rch.h"
00019 #include "TransportSendStrategy.h"
00020 #include "TransportSendStrategy_rch.h"
00021 #include "TransportStrategy.h"
00022 #include "TransportStrategy_rch.h"
00023 #include "TransportSendControlElement.h"
00024 #include "TransportSendListener.h"
00025 #include "TransportReceiveListener.h"
00026 #include "dds/DCPS/transport/framework/QueueTaskBase_T.h"
00027
00028 #include "ace/Synch.h"
00029 #include "ace/Event_Handler.h"
00030
00031 #include <utility>
00032
00033 #include <iosfwd>
00034
00035 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00036 class ACE_SOCK;
00037 ACE_END_VERSIONED_NAMESPACE_DECL
00038
00039 namespace OpenDDS {
00040 namespace DCPS {
00041
00042 class TransportReceiveListener;
00043 class TransportSendListener;
00044 class TransportQueueElement;
00045 class ReceivedDataSample;
00046 class DataSampleElement;
00047 class ThreadPerConnectionSendTask;
00048 class TransportClient;
00049
00050 typedef OPENDDS_MAP_CMP(RepoId, DataLinkSet_rch, GUID_tKeyLessThan) DataLinkSetMap;
00051
00052
00053
00054
00055
00056
00057
00058
00059
00060
00061
00062 class OpenDDS_Dcps_Export DataLink
00063 : public RcObject<ACE_SYNCH_MUTEX>,
00064 public ACE_Event_Handler {
00065
00066 friend class DataLinkCleanupTask;
00067
00068 public:
00069
00070 enum ConnectionNotice {
00071 DISCONNECTED,
00072 RECONNECTED,
00073 LOST
00074 };
00075
00076
00077
00078
00079
00080
00081 DataLink(TransportImpl* impl, Priority priority, bool is_loopback, bool is_active);
00082 virtual ~DataLink();
00083
00084
00085 int handle_exception(ACE_HANDLE );
00086
00087
00088
00089
00090 void schedule_stop(ACE_Time_Value& schedule_to_stop_at);
00091
00092 void stop();
00093
00094
00095
00096 void resume_send();
00097
00098
00099
00100
00101
00102 int make_reservation(const RepoId& remote_subscription_id,
00103 const RepoId& local_publication_id,
00104 TransportSendListener* send_listener);
00105
00106
00107
00108
00109
00110 int make_reservation(const RepoId& remote_publication_id,
00111 const RepoId& local_subcription_id,
00112 TransportReceiveListener* receive_listener);
00113
00114
00115
00116
00117
00118
00119 void release_reservations(RepoId remote_id,
00120 RepoId local_id,
00121 DataLinkSetMap& released_locals);
00122
00123 void schedule_delayed_release();
00124
00125 const ACE_Time_Value& datalink_release_delay() const;
00126
00127
00128
00129
00130 void remove_listener(const RepoId& local_id);
00131
00132
00133
00134
00135
00136
00137
00138 void send_start();
00139 void send(TransportQueueElement* element);
00140 void send_stop(RepoId repoId);
00141
00142
00143
00144
00145
00146 RemoveResult remove_sample(const DataSampleElement* sample);
00147
00148
00149 void remove_all_msgs(RepoId pub_id);
00150
00151
00152
00153
00154
00155
00156
00157 int data_received(ReceivedDataSample& sample,
00158 const RepoId& readerId = GUID_UNKNOWN);
00159
00160
00161
00162
00163 void data_received_include(ReceivedDataSample& sample, const RepoIdSet& incl);
00164
00165
00166 DataLinkIdType id() const;
00167
00168
00169
00170 void transport_shutdown();
00171
00172
00173
00174
00175 void notify(ConnectionNotice notice);
00176
00177 void notify_connection_deleted();
00178 virtual bool issues_on_deleted_callback() const;
00179
00180
00181
00182 virtual void pre_stop_i();
00183
00184
00185
00186
00187
00188 bool release_resources();
00189
00190
00191
00192 void terminate_send();
00193
00194
00195
00196 bool is_target(const RepoId& remote_sub_id);
00197
00198
00199
00200 void clear_associations();
00201
00202 int handle_timeout(const ACE_Time_Value& tv, const void* arg);
00203 int handle_close(ACE_HANDLE h, ACE_Reactor_Mask m);
00204
00205
00206
00207
00208
00209 void set_dscp_codepoint(int cp, ACE_SOCK& socket);
00210
00211
00212
00213 Priority& transport_priority();
00214 Priority transport_priority() const;
00215
00216 bool& is_loopback();
00217 bool is_loopback() const;
00218
00219 bool& is_active();
00220 bool is_active() const;
00221
00222 bool cancel_release();
00223
00224
00225
00226 ACE_Message_Block* create_control(char submessage_id,
00227 DataSampleHeader& header,
00228 ACE_Message_Block* data);
00229
00230
00231
00232
00233
00234 SendControlStatus send_control(const DataSampleHeader& header, ACE_Message_Block* data);
00235
00236
00237
00238
00239
00240 GUIDSeq* target_intersection(const RepoId& pub_id, const GUIDSeq& in, size_t& n_subs);
00241
00242 TransportImpl_rch impl() const;
00243
00244 void default_listener(TransportReceiveListener* trl);
00245 TransportReceiveListener* default_listener() const;
00246
00247 typedef std::pair<TransportClient*, RepoId> OnStartCallback;
00248 bool add_on_start_callback(TransportClient* client, const RepoId& remote);
00249 void remove_on_start_callback(TransportClient* client, const RepoId& remote);
00250 void invoke_on_start_callbacks(bool success);
00251
00252 void set_scheduling_release(bool scheduling_release);
00253
00254 virtual void send_final_acks (const RepoId& readerid);
00255
00256 protected:
00257
00258
00259
00260
00261
00262
00263
00264
00265
00266
00267
00268
00269
00270 int start(const TransportSendStrategy_rch& send_strategy,
00271 const TransportStrategy_rch& receive_strategy);
00272
00273
00274
00275
00276
00277
00278
00279
00280 virtual void stop_i();
00281
00282
00283 static ACE_UINT64 get_next_datalink_id();
00284
00285
00286 TransportStrategy_rch receive_strategy_;
00287
00288 friend class ThreadPerConnectionSendTask;
00289
00290
00291
00292
00293 void send_start_i();
00294 virtual void send_i(TransportQueueElement* element, bool relink = true);
00295 void send_stop_i(RepoId repoId);
00296
00297
00298
00299
00300 GUIDSeq* peer_ids(const RepoId& local_id) const;
00301
00302 private:
00303
00304
00305 const char* connection_notice_as_str(ConnectionNotice notice);
00306
00307 TransportSendListener* send_listener_for(const RepoId& pub_id) const;
00308 TransportReceiveListener* recv_listener_for(const RepoId& sub_id) const;
00309
00310
00311
00312 void prepare_release();
00313
00314
00315
00316 virtual TransportQueueElement* customize_queue_element(
00317 TransportQueueElement* element)
00318 {
00319 return element;
00320 }
00321
00322 virtual void release_remote_i(const RepoId& ) {}
00323 virtual void release_reservations_i(const RepoId& ,
00324 const RepoId& ) {}
00325
00326 void data_received_i(ReceivedDataSample& sample,
00327 const RepoId& readerId,
00328 const RepoIdSet& incl_excl,
00329 ReceiveListenerSet::ConstrainReceiveSet constrain);
00330
00331 typedef ACE_SYNCH_MUTEX LockType;
00332
00333
00334 #ifndef OPENDDS_SAFETY_PROFILE
00335 friend OpenDDS_Dcps_Export
00336 std::ostream& operator<<(std::ostream& str, const DataLink& value);
00337 #endif
00338
00339
00340
00341 bool stopped_;
00342 ACE_Time_Value scheduled_to_stop_at_;
00343
00344
00345 typedef OPENDDS_MAP_CMP(RepoId, TransportSendListener*, GUID_tKeyLessThan) IdToSendListenerMap;
00346 IdToSendListenerMap send_listeners_;
00347
00348
00349 typedef OPENDDS_MAP_CMP(RepoId, TransportReceiveListener*, GUID_tKeyLessThan) IdToRecvListenerMap;
00350 IdToRecvListenerMap recv_listeners_;
00351
00352
00353
00354
00355 TransportReceiveListener* default_listener_;
00356
00357 mutable LockType pub_sub_maps_lock_;
00358
00359 typedef OPENDDS_MAP_CMP(RepoId, ReceiveListenerSet_rch, GUID_tKeyLessThan) AssocByRemote;
00360 AssocByRemote assoc_by_remote_;
00361
00362 typedef OPENDDS_MAP_CMP(RepoId, RepoIdSet, GUID_tKeyLessThan) AssocByLocal;
00363 AssocByLocal assoc_by_local_;
00364
00365 mutable LockType released_assoc_by_local_lock_;
00366 AssocByLocal released_assoc_by_local_;
00367
00368
00369 TransportImpl_rch impl_;
00370
00371
00372 ACE_UINT64 id_;
00373
00374
00375
00376
00377 ThreadPerConnectionSendTask* thr_per_con_send_task_;
00378
00379
00380 AssocByLocal assoc_releasing_;
00381
00382
00383 Priority transport_priority_;
00384
00385 bool scheduling_release_;
00386
00387 protected:
00388
00389 typedef ACE_Guard<LockType> GuardType;
00390
00391
00392 TransportSendStrategy_rch send_strategy_;
00393
00394 LockType strategy_lock_;
00395 OPENDDS_VECTOR(OnStartCallback) on_start_callbacks_;
00396
00397
00398
00399 ACE_Time_Value datalink_release_delay_;
00400
00401
00402
00403 TransportSendControlElementAllocator* send_control_allocator_;
00404
00405
00406
00407 MessageBlockAllocator* mb_allocator_;
00408 DataBlockAllocator* db_allocator_;
00409
00410
00411 bool is_loopback_;
00412
00413 bool is_active_;
00414 bool started_;
00415
00416
00417 SendResponseListener send_response_listener_;
00418 };
00419
00420 }
00421 }
00422
00423 #if defined (__ACE_INLINE__)
00424 #include "DataLink.inl"
00425 #endif
00426
00427 #endif