DataLink.h
Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008 #ifndef OPENDDS_DCPS_DATALINK_H
00009 #define OPENDDS_DCPS_DATALINK_H
00010
00011 #include "dds/DCPS/dcps_export.h"
00012 #include "dds/DCPS/Definitions.h"
00013 #include "dds/DCPS/RcObject.h"
00014 #include "dds/DCPS/PoolAllocator.h"
00015 #include "dds/DCPS/RcEventHandler.h"
00016 #include "ReceiveListenerSetMap.h"
00017 #include "SendResponseListener.h"
00018 #include "TransportDefs.h"
00019 #include "TransportSendStrategy.h"
00020 #include "TransportSendStrategy_rch.h"
00021 #include "TransportStrategy.h"
00022 #include "TransportStrategy_rch.h"
00023 #include "TransportSendControlElement.h"
00024 #include "TransportSendListener.h"
00025 #include "TransportReceiveListener.h"
00026 #include "dds/DCPS/transport/framework/QueueTaskBase_T.h"
00027
00028 #include "ace/Event_Handler.h"
00029 #include "ace/Synch_Traits.h"
00030
00031 #include <utility>
00032
00033 #include <iosfwd>
00034
00035 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00036 class ACE_SOCK;
00037 ACE_END_VERSIONED_NAMESPACE_DECL
00038
00039 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00040
00041 namespace OpenDDS {
00042 namespace DCPS {
00043
00044
00045 class TransportQueueElement;
00046 class ReceivedDataSample;
00047 class DataSampleElement;
00048 class ThreadPerConnectionSendTask;
00049 class TransportClient;
00050 class TransportImpl;
00051
00052 typedef OPENDDS_MAP_CMP(RepoId, DataLinkSet_rch, GUID_tKeyLessThan) DataLinkSetMap;
00053
00054 typedef WeakRcHandle<TransportSendListener> TransportSendListener_wrch;
00055
00056
00057
00058
00059
00060
00061
00062
00063
00064
00065
00066 class OpenDDS_Dcps_Export DataLink
00067 : public RcEventHandler {
00068
00069 friend class DataLinkCleanupTask;
00070
00071 public:
00072
00073 enum ConnectionNotice {
00074 DISCONNECTED,
00075 RECONNECTED,
00076 LOST
00077 };
00078
00079
00080
00081
00082
00083
00084 DataLink(TransportImpl& impl, Priority priority, bool is_loopback, bool is_active);
00085 virtual ~DataLink();
00086
00087
00088 int handle_exception(ACE_HANDLE );
00089
00090
00091
00092
00093 void schedule_stop(const ACE_Time_Value& schedule_to_stop_at);
00094
00095 void stop();
00096
00097
00098
00099 void resume_send();
00100
00101
00102
00103
00104
00105 int make_reservation(const RepoId& remote_subscription_id,
00106 const RepoId& local_publication_id,
00107 const TransportSendListener_wrch& send_listener);
00108
00109
00110
00111
00112
00113 int make_reservation(const RepoId& remote_publication_id,
00114 const RepoId& local_subcription_id,
00115 const TransportReceiveListener_wrch& receive_listener);
00116
00117
00118
00119
00120
00121
00122 void release_reservations(RepoId remote_id,
00123 RepoId local_id,
00124 DataLinkSetMap& released_locals);
00125
00126 void schedule_delayed_release();
00127
00128 const ACE_Time_Value& datalink_release_delay() const;
00129
00130
00131
00132
00133 void remove_listener(const RepoId& local_id);
00134
00135
00136
00137
00138
00139
00140
00141 void send_start();
00142 void send(TransportQueueElement* element);
00143 void send_stop(RepoId repoId);
00144
00145
00146
00147
00148
00149 virtual RemoveResult remove_sample(const DataSampleElement* sample,
00150 void* context);
00151
00152
00153 void remove_all_msgs(RepoId pub_id);
00154
00155
00156
00157
00158
00159
00160
00161 int data_received(ReceivedDataSample& sample,
00162 const RepoId& readerId = GUID_UNKNOWN);
00163
00164
00165
00166
00167 void data_received_include(ReceivedDataSample& sample, const RepoIdSet& incl);
00168
00169
00170 DataLinkIdType id() const;
00171
00172
00173
00174 void transport_shutdown();
00175
00176
00177
00178
00179 void notify(ConnectionNotice notice);
00180
00181
00182
00183 virtual void pre_stop_i();
00184
00185
00186
00187
00188
00189 bool release_resources();
00190
00191
00192
00193 void terminate_send();
00194
00195
00196
00197 bool is_target(const RepoId& remote_sub_id);
00198
00199
00200
00201 void clear_associations();
00202
00203 int handle_timeout(const ACE_Time_Value& tv, const void* arg);
00204 int handle_close(ACE_HANDLE h, ACE_Reactor_Mask m);
00205
00206
00207
00208
00209
00210 void set_dscp_codepoint(int cp, ACE_SOCK& socket);
00211
00212
00213
00214 Priority& transport_priority();
00215 Priority transport_priority() const;
00216
00217 bool& is_loopback();
00218 bool is_loopback() const;
00219
00220 bool& is_active();
00221 bool is_active() const;
00222
00223 bool cancel_release();
00224
00225
00226
00227 ACE_Message_Block* create_control(char submessage_id,
00228 DataSampleHeader& header,
00229 Message_Block_Ptr data);
00230
00231
00232
00233
00234
00235 SendControlStatus send_control(const DataSampleHeader& header, Message_Block_Ptr data);
00236
00237
00238
00239
00240
00241 GUIDSeq* target_intersection(const RepoId& pub_id, const GUIDSeq& in, size_t& n_subs);
00242
00243 TransportImpl& impl() const;
00244
00245 void default_listener(const TransportReceiveListener_wrch& trl);
00246 TransportReceiveListener_wrch default_listener() const;
00247
00248 typedef WeakRcHandle<TransportClient> TransportClient_wrch;
00249 typedef std::pair<TransportClient_wrch, RepoId> OnStartCallback;
00250 bool add_on_start_callback(const TransportClient_wrch& client, const RepoId& remote);
00251 void remove_on_start_callback(const TransportClient_wrch& client, const RepoId& remote);
00252 void invoke_on_start_callbacks(bool success);
00253
00254 void set_scheduling_release(bool scheduling_release);
00255
00256 virtual void send_final_acks (const RepoId& readerid);
00257
00258 protected:
00259
00260
00261
00262
00263
00264
00265
00266
00267
00268
00269
00270
00271
00272 int start(const TransportSendStrategy_rch& send_strategy,
00273 const TransportStrategy_rch& receive_strategy);
00274
00275
00276
00277
00278
00279
00280
00281
00282 virtual void stop_i();
00283
00284
00285 static ACE_UINT64 get_next_datalink_id();
00286
00287
00288 TransportStrategy_rch receive_strategy_;
00289
00290 friend class ThreadPerConnectionSendTask;
00291
00292
00293
00294
00295 void send_start_i();
00296 virtual void send_i(TransportQueueElement* element, bool relink = true);
00297 void send_stop_i(RepoId repoId);
00298
00299
00300
00301
00302 GUIDSeq* peer_ids(const RepoId& local_id) const;
00303
00304 private:
00305
00306
00307 const char* connection_notice_as_str(ConnectionNotice notice);
00308
00309 TransportSendListener_rch send_listener_for(const RepoId& pub_id) const;
00310 TransportReceiveListener_rch recv_listener_for(const RepoId& sub_id) const;
00311
00312
00313
00314 void prepare_release();
00315
00316 virtual bool handle_send_request_ack(TransportQueueElement* element);
00317
00318
00319
00320 virtual TransportQueueElement* customize_queue_element(
00321 TransportQueueElement* element)
00322 {
00323 return element;
00324 }
00325
00326 virtual void release_remote_i(const RepoId& ) {}
00327 virtual void release_reservations_i(const RepoId& ,
00328 const RepoId& ) {}
00329
00330 void data_received_i(ReceivedDataSample& sample,
00331 const RepoId& readerId,
00332 const RepoIdSet& incl_excl,
00333 ReceiveListenerSet::ConstrainReceiveSet constrain);
00334
00335 void notify_reactor();
00336
00337 typedef ACE_SYNCH_MUTEX LockType;
00338
00339
00340 #ifndef OPENDDS_SAFETY_PROFILE
00341 friend OpenDDS_Dcps_Export
00342 std::ostream& operator<<(std::ostream& str, const DataLink& value);
00343 #endif
00344
00345
00346
00347 bool stopped_;
00348 ACE_Time_Value scheduled_to_stop_at_;
00349
00350
00351 typedef OPENDDS_MAP_CMP(RepoId, TransportSendListener_wrch, GUID_tKeyLessThan) IdToSendListenerMap;
00352 IdToSendListenerMap send_listeners_;
00353
00354
00355 typedef OPENDDS_MAP_CMP(RepoId, TransportReceiveListener_wrch, GUID_tKeyLessThan) IdToRecvListenerMap;
00356 IdToRecvListenerMap recv_listeners_;
00357
00358
00359
00360
00361 TransportReceiveListener_wrch default_listener_;
00362
00363 mutable LockType pub_sub_maps_lock_;
00364
00365 typedef OPENDDS_MAP_CMP(RepoId, ReceiveListenerSet_rch, GUID_tKeyLessThan) AssocByRemote;
00366 AssocByRemote assoc_by_remote_;
00367
00368 typedef OPENDDS_MAP_CMP(RepoId, RepoIdSet, GUID_tKeyLessThan) AssocByLocal;
00369 AssocByLocal assoc_by_local_;
00370
00371
00372 TransportImpl& impl_;
00373
00374
00375 ACE_UINT64 id_;
00376
00377
00378
00379
00380 unique_ptr<ThreadPerConnectionSendTask> thr_per_con_send_task_;
00381
00382
00383 AssocByLocal assoc_releasing_;
00384
00385
00386 Priority transport_priority_;
00387
00388 bool scheduling_release_;
00389
00390 protected:
00391
00392 typedef ACE_Guard<LockType> GuardType;
00393
00394
00395 TransportSendStrategy_rch send_strategy_;
00396
00397 LockType strategy_lock_;
00398 OPENDDS_VECTOR(OnStartCallback) on_start_callbacks_;
00399
00400
00401
00402 ACE_Time_Value datalink_release_delay_;
00403
00404
00405
00406 unique_ptr<MessageBlockAllocator> mb_allocator_;
00407 unique_ptr<DataBlockAllocator> db_allocator_;
00408
00409
00410 bool is_loopback_;
00411
00412 bool is_active_;
00413 bool started_;
00414
00415
00416 SendResponseListener send_response_listener_;
00417 };
00418
00419 }
00420 }
00421
00422 OPENDDS_END_VERSIONED_NAMESPACE_DECL
00423
00424 #if defined (__ACE_INLINE__)
00425 #include "DataLink.inl"
00426 #endif
00427
00428 #endif