OpenDDS  Snapshot(2023/04/28-20:55)
DataLinkSet.inl
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #include "EntryExit.h"
9 #include "DataLink.h"
10 #include "TransportSendElement.h"
12 #include "dds/DCPS/Util.h"
13 #include "dds/DCPS/Definitions.h"
14 #include "dds/DCPS/GuidConverter.h"
15 
16 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
17 #include "dds/DdsDcpsGuidTypeSupportImpl.h"
19 #endif
20 
21 ACE_INLINE void
23 {
24  DBG_ENTRY_LVL("DataLinkSet", "send", 6);
25  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: DataLinkSet::send element %@.\n",
26  sample), 5);
27 
28 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
29  const bool customHeader =
31 #endif
32 
33  MapType map_copy;
34  {
35  GuardType guard(lock_);
36  map_copy = map_;
37  }
38 
39  if (map_copy.size()) {
40  TransportSendElement* send_element = new TransportSendElement(static_cast<int>(map_copy.size()), sample);
41  for (MapType::iterator itr = map_copy.begin(); itr != map_copy.end(); ++itr) {
42 
43 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
44  if (customHeader) {
45  typedef std::map<DataLinkIdType, GUIDSeq_var>::iterator FilterIter;
46  FilterIter fi = sample->get_filter_per_link().find(itr->first);
47  GUIDSeq* guids = 0;
48  if (fi != sample->get_filter_per_link().end()) {
49  guids = fi->second.ptr();
50  }
51 
53  "(%P|%t) DBG: DataLink %@ filtering %d subscribers.\n",
54  itr->second.in(), guids ? guids->length() : 0), 5);
55 
56  Message_Block_Ptr mb (send_element->msg()->duplicate());
57 
58  DataSampleHeader::add_cfentries(guids, mb.get());
59 
61  tce->set_msg(move(mb)); // tce now owns ACE_Message_Block chain
62 
63  itr->second->send(tce);
64 
65  } else {
66 #endif /* OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE */
67 
68  // Tell the DataLink to send it.
69  itr->second->send(send_element);
70 
71 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
72  }
73 #endif
74  }
75  } else if (sample->get_send_listener()) {
76  sample->get_send_listener()->data_dropped(sample, true);
77  }
78 }
79 
80 ACE_INLINE void
82 {
83  DBG_ENTRY_LVL("DataLinkSet", "send_control", 6);
84  VDBG((LM_DEBUG, "(%P|%t) DBG: DataLinkSet::send_control %@.\n", sample));
85  MapType map_copy;
86  {
87  GuardType guard(lock_);
88  map_copy = map_;
89  }
90 
91  TransportSendControlElement* send_element =
92  new TransportSendControlElement(static_cast<int>(map_copy.size()), sample);
93 
94  for (MapType::iterator itr = map_copy.begin(); itr != map_copy.end(); ++itr) {
95  itr->second->send(send_element);
96  }
97 }
98 
101  const TransportSendListener_rch& listener,
102  const DataSampleHeader& header,
103  Message_Block_Ptr msg)
104 {
105  DBG_ENTRY_LVL("DataLinkSet","send_control",6);
106  //Optimized - use cached allocator.
107 
108  MapType dup_map;
109  copy_map_to(dup_map);
110 
111  if (dup_map.empty()) {
112  // similar to the "no links" case in TransportClient::send()
113  if (DCPS_debug_level > 4) {
114  const LogGuid logger(pub_id);
116  ACE_TEXT("(%P|%t) DataLinkSet::send_control: ")
117  ACE_TEXT("no links for publication %C, ")
118  ACE_TEXT("not sending control message.\n"),
119  logger.c_str()));
120  }
121  listener->control_delivered(msg);
122  return SEND_CONTROL_OK;
123  }
124 
125  TransportSendControlElement* const send_element =
126  new TransportSendControlElement(static_cast<int>(dup_map.size()), pub_id,
127  listener.in(), header, move(msg));
128 
129  for (MapType::iterator itr = dup_map.begin();
130  itr != dup_map.end();
131  ++itr) {
132  itr->second->send_start();
133  itr->second->send(send_element);
134  itr->second->send_stop(pub_id);
135  }
136 
137  return SEND_CONTROL_OK;
138 }
139 
140 ACE_INLINE void
142  GUID_t pub_id,
143  const DataSampleHeader& header,
144  Message_Block_Ptr response)
145 {
146  DBG_ENTRY_LVL("DataLinkSet","send_response",6);
147  GuardType guard(this->lock_);
148 
149  TransportSendControlElement* const send_element =
150  new TransportSendControlElement(static_cast<int>(map_.size()), pub_id,
151  &send_response_listener_, header,
152  move(response));
153  if (!send_element) return;
155 
156  for (MapType::iterator itr = map_.begin();
157  itr != map_.end();
158  ++itr) {
159  itr->second->send_start();
160  itr->second->send(send_element);
161  itr->second->send_stop(pub_id);
162  }
163 }
164 
165 ACE_INLINE bool
167 {
168  DBG_ENTRY_LVL("DataLinkSet", "remove_sample", 6);
169  MapType map_copy;
170  {
171  GuardType guard(lock_);
172  map_copy = map_;
173  }
174  const MapType::iterator end = map_copy.end();
175  for (MapType::iterator itr = map_copy.begin(); itr != end; ++itr) {
176  if (itr->second->remove_sample(sample) == REMOVE_RELEASED) {
177  return true;
178  }
179  }
180 
181  return false;
182 }
183 
184 ACE_INLINE bool
186 {
187  DBG_ENTRY_LVL("DataLinkSet", "remove_all_msgs", 6);
188  MapType map_copy;
189  {
190  GuardType guard(lock_);
191  map_copy = map_;
192  }
193  const MapType::iterator end = map_copy.end();
194  for (MapType::iterator itr = map_copy.begin(); itr != end; ++itr) {
195  itr->second->remove_all_msgs(pub_id);
196  }
197 
198  return true;
199 }
200 
201 ACE_INLINE void
203 {
204  DBG_ENTRY_LVL("DataLinkSet","send_start",6);
205 
206  typedef OPENDDS_VECTOR(DataLink_rch) SendStartVec;
207  SendStartVec send_start_vec;
208 
209  {
210  GuardType guard1(lock_);
211  GuardType guard2(in->lock_);
212  for (MapType::iterator itr = in->map_.begin(); itr != in->map_.end(); ++itr) {
213  // Attempt to add the current DataLink to this set.
214  int result = OpenDDS::DCPS::bind(map_, itr->first, itr->second);
215 
216  if (result == 0) {
217  // We successfully added the current DataLink to this set,
218  // meaning that it wasn't already a member. We should tell
219  // the DataLink about the send_start() event.
220  send_start_vec.push_back(itr->second);
221 
222  } else if (result == -1) {
224  "(%P|%t) ERROR: Failed to bind data link into set.\n"));
225  }
226 
227  // Note that there is a possibility that the result == 1, which
228  // means that the DataLink already exists in our map_-> We skip
229  // all of these cases.
230  }
231  }
232 
233  for (SendStartVec::iterator it = send_start_vec.begin(); it != send_start_vec.end(); ++it) {
234  (*it)->send_start();
235  }
236 }
237 
238 ACE_INLINE void
240 {
241  DBG_ENTRY_LVL("DataLinkSet","send_stop",6);
242  // Iterate over our map_ and tell each DataLink about the send_stop() event.
243  MapType map_copy;
244  {
245  GuardType guard(lock_);
246  map_copy = map_;
247  map_.clear();
248  }
249 
250  for (MapType::iterator itr = map_copy.begin(); itr != map_copy.end(); ++itr) {
251  itr->second->send_stop(repoId);
252  }
253 }
254 
255 ACE_INLINE void
257 {
258  target.clear();
259 
260  // Lock the existing map
261  GuardType guard(this->lock_);
262 
263 
264  // Copy to target
265  for (MapType::iterator itr = map_.begin();
266  itr != map_.end();
267  ++itr) {
268  target.insert(*itr);
269  }
270 }
271 
272 ACE_INLINE void
274 {
275  MapType map_copy;
276  {
277  GuardType guard(lock_);
278  map_copy = map_;
279  map_.clear();
280  }
281 
282  for (MapType::iterator itr = map_copy.begin(); itr != map_copy.end(); ++itr) {
283  itr->second->send_final_acks(readerid);
284  }
285 }
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
static void add_cfentries(const GUIDSeq *guids, ACE_Message_Block *mb)
TransportSendListener * get_send_listener() const
DataLinkIdTypeGUIDMap & get_filter_per_link()
MapType map_
Hash map for DataLinks.
Definition: DataLinkSet.h:96
void send_final_acks(const GUID_t &readerid)
void copy_map_to(MapType &target)
lock and copy map for lock-free access
bool remove_sample(const DataSampleElement *sample)
int bind(Container &c, const FirstType &first, const SecondType &second)
Definition: Util.h:20
static bool test_flag(DataSampleHeaderFlag flag, const ACE_Message_Block *buffer)
void send(DataSampleElement *sample)
Send to each DataLink in the set.
Definition: DataLinkSet.inl:22
const char * c_str() const
void send_stop(GUID_t repoId)
virtual const ACE_Message_Block * msg() const
Accessor for the ACE_Message_Block.
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
LM_DEBUG
#define VDBG(DBG_ARGS)
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8
virtual void data_dropped(const DataSampleElement *sample, bool dropped_by_transport)
virtual ACE_Message_Block * duplicate(void) const
ACE_TEXT("TCP_Factory")
sequence< GUID_t > GUIDSeq
Definition: DdsDcpsGuid.idl:62
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
#define VDBG_LVL(DBG_ARGS, LEVEL)
bool remove_all_msgs(const GUID_t &pub_id)
#define ACE_INLINE
typedef OPENDDS_VECTOR(ActionConnectionRecord) ConnectionRecords
void send_control(DataSampleElement *sample)
Send a control message that is wrapped in a DataSampleElement.
Definition: DataLinkSet.inl:81
void send_response(GUID_t sub_id, const DataSampleHeader &header, Message_Block_Ptr response)
LM_ERROR
void send_start(DataLinkSet *link_set)
SendResponseListener send_response_listener_
Listener for TransportSendControlElements created in send_response.
Definition: DataLinkSet.h:103
SendControlStatus
Return code type for send_control() operations.