Line data Source code
1 : /*
2 : *
3 : *
4 : * Distributed under the OpenDDS License.
5 : * See: http://www.opendds.org/license.html
6 : */
7 :
8 : #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
9 :
10 : #include "TransportSendControlElement.h"
11 : #include "TransportSendListener.h"
12 : #include "EntryExit.h"
13 :
14 : #include <dds/DCPS/DataSampleElement.h>
15 :
16 : #if !defined (__ACE_INLINE__)
17 : #include "TransportSendControlElement.inl"
18 : #endif /* __ACE_INLINE__ */
19 :
20 : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
21 :
22 : namespace OpenDDS {
23 : namespace DCPS {
24 :
25 :
26 0 : TransportSendControlElement::TransportSendControlElement(int initial_count,
27 : const GUID_t& publisher_id,
28 : TransportSendListener* listener,
29 : const DataSampleHeader& header,
30 0 : Message_Block_Ptr msg_block)
31 : : TransportQueueElement(initial_count),
32 0 : publisher_id_(publisher_id),
33 0 : listener_(listener),
34 0 : header_(header),
35 0 : msg_(msg_block.release()),
36 0 : dcps_elem_(0)
37 : {
38 : DBG_ENTRY_LVL("TransportSendControlElement", "TransportSendControlElement", 6);
39 0 : }
40 :
41 :
42 0 : TransportSendControlElement::TransportSendControlElement(int initial_count,
43 0 : const DataSampleElement* dcps_elem)
44 : : TransportQueueElement(initial_count)
45 0 : , publisher_id_(dcps_elem->get_pub_id())
46 0 : , listener_(dcps_elem->get_send_listener())
47 0 : , header_(dcps_elem->get_header())
48 0 : , dcps_elem_(dcps_elem)
49 : {
50 : DBG_ENTRY_LVL("TransportSendControlElement", "TransportSendControlElement", 6);
51 0 : }
52 :
53 0 : TransportSendControlElement::~TransportSendControlElement()
54 : {
55 : DBG_ENTRY_LVL("TransportSendControlElement", "~TransportSendControlElement", 6);
56 0 : }
57 :
58 : bool
59 0 : TransportSendControlElement::requires_exclusive_packet() const
60 : {
61 : DBG_ENTRY_LVL("TransportSendControlElement", "requires_exclusive_packet", 6);
62 0 : return true;
63 : }
64 :
65 : namespace
66 : {
67 0 : void handle_message(const bool dropped,
68 : const Message_Block_Ptr& msg,
69 : TransportSendListener* const listener,
70 : const bool dropped_by_transport)
71 : {
72 0 : if (dropped) {
73 0 : listener->control_dropped(msg, dropped_by_transport);
74 : } else {
75 0 : listener->control_delivered(msg);
76 : }
77 0 : }
78 :
79 0 : void handle_message(const bool dropped,
80 : const DataSampleElement* elem,
81 : const bool dropped_by_transport)
82 : {
83 0 : TransportSendListener* const listener = elem->get_send_listener();
84 0 : if (dropped) {
85 0 : listener->data_dropped(elem, dropped_by_transport);
86 : } else {
87 0 : listener->data_delivered(elem);
88 : }
89 0 : }
90 : }
91 :
92 : void
93 0 : TransportSendControlElement::release_element(bool dropped_by_transport)
94 : {
95 : ACE_UNUSED_ARG(dropped_by_transport);
96 :
97 : DBG_ENTRY_LVL("TransportSendControlElement", "release_element", 6);
98 :
99 : // store off local copies to use after "this" pointer deleted
100 0 : const bool dropped = was_dropped();
101 :
102 0 : Message_Block_Ptr msg(msg_.release());
103 :
104 0 : TransportSendListener* const listener = listener_;
105 0 : const DataSampleElement* dcps_elem = dcps_elem_;
106 :
107 :
108 0 : delete this;
109 :
110 :
111 : // reporting the message w/o using "this" pointer
112 0 : if (dcps_elem) {
113 0 : handle_message(dropped, dcps_elem, dropped_by_transport);
114 : } else {
115 0 : handle_message(dropped, msg, listener, dropped_by_transport);
116 : }
117 0 : }
118 :
119 : GUID_t
120 0 : TransportSendControlElement::publication_id() const
121 : {
122 : DBG_ENTRY_LVL("TransportSendControlElement", "publication_id", 6);
123 0 : return publisher_id_;
124 : }
125 :
126 : ACE_Message_Block*
127 0 : TransportSendControlElement::duplicate_msg() const
128 : {
129 : DBG_ENTRY_LVL("TransportSendControlElement", "duplicate_msg", 6);
130 0 : if (dcps_elem_) {
131 0 : return const_cast<DataSampleElement*>(dcps_elem_)->get_sample()->duplicate();
132 : }
133 0 : return msg_->duplicate();
134 : }
135 :
136 : const ACE_Message_Block*
137 0 : TransportSendControlElement::msg() const
138 : {
139 : DBG_ENTRY_LVL("TransportSendControlElement", "msg", 6);
140 0 : if (dcps_elem_) {
141 0 : return dcps_elem_->get_sample();
142 : }
143 0 : return msg_.get();
144 : }
145 :
146 : const ACE_Message_Block*
147 0 : TransportSendControlElement::msg_payload() const
148 : {
149 : DBG_ENTRY_LVL("TransportSendControlElement", "msg_payload", 6);
150 0 : return msg()->cont();
151 : }
152 :
153 : bool
154 0 : TransportSendControlElement::is_control(GUID_t pub_id) const
155 : {
156 0 : return (pub_id == publisher_id_);
157 : }
158 :
159 : }
160 : }
161 :
162 : OPENDDS_END_VERSIONED_NAMESPACE_DECL
|