30 const Encoding encoding_unaligned_native(encoding_kind);
93 ACE_TEXT(
" tseq: %q data seq: %q from %C being WITHHELD because can't receive yet\n"),
101 held_.insert(std::pair<const SequenceNumber, ReceivedDataSample>(header.
sequence_, data));
106 ACE_TEXT(
" held_ data currently contains: %d samples\n"),
108 while (it != held_.end()) {
109 LogGuid writer(it->second.header_.publication_id_);
111 ACE_TEXT(
" held_ data currently contains: tseq: %q dseq: %q from %C HELD\n"),
112 it->first.getValue(),
113 it->second.header_.sequence_.getValue(),
125 ACE_TEXT(
" tseq: %q data seq: %q from %C OK to deliver\n"),
146 const iter end = this->held_.upper_bound(ca);
147 for (iter it = this->held_.begin(); it != end; ) {
149 LogGuid writer(it->second.header_.publication_id_);
151 ACE_TEXT(
" deliver tseq: %q dseq: %q from %C\n"),
152 it->first.getValue(),
153 it->second.header_.sequence_.getValue(),
156 to_deliver.push_back(it->second);
157 this->held_.erase(it++);
160 for (
size_t i = 0; i < to_deliver.size(); ++i) {
169 if (!held_.empty()) {
171 while (it != held_.end()) {
172 if (it->second.header_.publication_id_ == remote) {
190 switch (submessage_id) {
202 ACE_TEXT(
"ReliableSession::control_received: ")
203 ACE_TEXT(
"unknown TRANSPORT_CONTROL submessage: 0x%x!\n"),
213 const std::vector<SequenceRange> ranges(this->
nak_sequence_.present_sequence_ranges());
217 for (std::vector<SequenceRange>::const_iterator iter = ranges.begin();
218 iter != ranges.end(); ++iter) {
232 NakRequestMap::iterator last(this->
nak_requests_.upper_bound(deadline));
234 if (first == last)
return;
242 std::vector<SequenceRange> dropped;
244 lastSeq), dropped)) {
246 for (
size_t i = 0; i < dropped.size(); ++i) {
252 ACE_TEXT(
"timed out waiting on remote peer %#08x%08x to send missing samples: %q - %q!\n"),
269 if (!this->
acked()) {
272 ACE_TEXT(
"remote %#08x%08x session NOT acked yet, don't send naks\n"),
273 (
unsigned int)(this->
link()->local_peer() >> 32),
274 (
unsigned int) this->
link()->local_peer(),
283 ACE_TEXT(
"remote %#08x%08x nak request size %d\n"),
284 (
unsigned int)(this->
link()->local_peer() >> 32),
285 (
unsigned int) this->
link()->local_peer(),
294 ACE_TEXT(
"remote %#08x%08x nak sequence not disjoint, don't send naks\n"),
295 (
unsigned int)(this->
link()->local_peer() >> 32),
296 (
unsigned int) this->
link()->local_peer(),
302 const std::vector<SequenceRange> ranges(this->
nak_sequence_.present_sequence_ranges());
303 for (std::vector<SequenceRange>::const_iterator iter = ranges.begin();
304 iter != ranges.end(); ++iter) {
305 ACE_DEBUG((
LM_DEBUG,
"(%P|%t) ReliableSession::send_naks - local %#08x%08x remote %#08x%08x nak_sequence includes: [%q - %q]\n",
306 (
unsigned int)(this->
link()->local_peer() >> 32),
307 (
unsigned int) this->
link()->local_peer(),
310 iter->first.getValue(),
311 iter->second.getValue()));
327 typedef std::vector<SequenceRange> RangeVector;
334 NakRequestMap::reverse_iterator itr(this->
nak_requests_.rbegin());
353 for (
size_t i = 1; i < sz; ++i) {
354 if ((i * 1.0) / (nak_delay_intervals + 1) > nak_max) {
359 ignored.push_back(std::make_pair(this->
nak_requests_.begin()->second, itr->second));
364 if (i % (nak_delay_intervals + 1) == 1) {
365 second = itr->second;
371 if (i % (nak_delay_intervals + 1) == 0) {
375 ignored.push_back(std::make_pair(first, second));
385 ignored.push_back(std::make_pair(first, second));
395 size_t sz = ignored.size();
396 for (
size_t i = 0; i < sz; ++i) {
407 ACE_TEXT(
"remote %#08x%08x ignore missing [%q - %q]\n"),
408 (
unsigned int)(this->
link()->local_peer() >> 32),
409 (
unsigned int) this->
link()->local_peer(),
420 for (NakPeerSet::iterator it(this->
nak_peers_.begin());
426 bool sending_naks =
false;
427 if (received.
low() > 1){
431 std::vector<SequenceRange> ranges;
442 Serializer serializer(data.get(), encoding_unaligned_native);
446 for (std::vector<SequenceRange>::const_iterator iter = ranges.begin();
447 iter != ranges.end(); ++iter) {
448 serializer << iter->first;
449 serializer << iter->second;
452 ACE_TEXT (
" local %#08x%08x remote %#08x%08x [%q - %q]\n"),
453 (
unsigned int)(this->
link()->local_peer() >> 32),
454 (
unsigned int) this->
link()->local_peer(),
455 (
unsigned int)(this->remote_peer_ >> 32),
456 (
unsigned int) this->remote_peer_,
457 iter->first.getValue(), iter->second.getValue()));
470 ACE_TEXT(
"remote %#08x%08x received sequence not disjoint, don't send naks\n"),
471 (
unsigned int)(this->
link()->local_peer() >> 32),
472 (
unsigned int) this->
link()->local_peer(),
493 serializer >> local_peer;
496 std::vector<SequenceRange> ranges;
500 serializer >> range.first;
501 serializer >> range.second;
502 ranges.push_back(range);
516 if (!proxy.
empty() && proxy.
low() > ranges.begin()->first) {
519 ACE_TEXT (
"(%P|%t) ReliableSession::nak_received")
520 ACE_TEXT (
" local %#08x%08x remote %#08x%08x sending nakack for lowest available: %q\n"),
521 (
unsigned int)(this->
link()->local_peer() >> 32),
522 (
unsigned int) this->
link()->local_peer(),
535 bool ret = send_buffer->
resend(ranges[i]);
538 ACE_TEXT (
"(%P|%t) ReliableSession::nak_received")
539 ACE_TEXT (
" local %#08x%08x remote %#08x%08x [%q - %q] resend result %C\n"),
540 (
unsigned int)(this->
link()->local_peer() >> 32),
541 (
unsigned int) this->
link()->local_peer(),
544 ranges[i].first.getValue(), ranges[i].second.getValue(),
545 ret ?
"SUCCESS" :
"FAILED"));
553 const std::vector<SequenceRange> ranges(received.missing_sequence_ranges());
563 Serializer serializer(data.get(), encoding_unaligned_native);
567 for (std::vector<SequenceRange>::const_iterator iter = ranges.begin();
568 iter != ranges.end(); ++iter) {
569 serializer << iter->first;
570 serializer << iter->second;
573 ACE_TEXT (
" local %#08x%08x remote %#08x%08x [%q - %q]\n"),
574 (
unsigned int)(this->
link()->local_peer() >> 32),
575 (
unsigned int) this->
link()->local_peer(),
576 (
unsigned int)(this->remote_peer_ >> 32),
577 (
unsigned int) this->remote_peer_,
578 iter->first.getValue(), iter->second.getValue()));
605 std::vector<SequenceRange> dropped;
615 for (
size_t i = 0; i < dropped.size(); ++i) {
622 ACE_TEXT(
"(%P|%t) WARNING: ReliableSession::nakack_received ")
623 ACE_TEXT(
"local %#08x%08x remote %#08x%08x with low [%q] ")
624 ACE_TEXT(
"- some ranges dropped.\n"),
625 (
unsigned int)(this->
link()->local_peer() >> 32),
626 (
unsigned int) this->
link()->local_peer(),
638 size_t len =
sizeof(low.
getValue());
642 Serializer serializer(data.
get(), encoding_unaligned_native);
692 interval *=
static_cast<double>(std::rand()) /
693 static_cast<double>(RAND_MAX) + 1.0;
ACE_Thread_Mutex start_lock_
DataSampleHeader header_
The demarshalled sample header.
virtual bool control_received(char submessage_id, const Message_Block_Ptr &control)
RcHandle< T > rchandle_from(T *pointer)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
SequenceNumber cumulative_ack() const
virtual bool control_received(char submessage_id, const Message_Block_Ptr &control)
int data_received(ReceivedDataSample &sample, const GUID_t &readerId=GUID_UNKNOWN)
RcHandle< Sporadic > nak_watchdog_
bool swap_bytes() const
Determine if the serializer should swap bytes.
const char * c_str() const
NakRequestMap nak_requests_
static const long DEFAULT_NAK_TIMEOUT
T::rv_reference move(T &p)
SequenceNumber low() const
void nak_received(const Message_Block_Ptr &control)
MulticastDataLink * link()
void nakack_received(const Message_Block_Ptr &control)
OPENDDS_MULTIMAP(TransportHeaderSN, ReceivedDataSample) held_
static const long DEFAULT_NAK_INTERVAL
virtual void syn_hook(const SequenceNumber &seq)
static const long DEFAULT_NAK_DELAY_INTERVALS
MulticastPeer local_peer() const
virtual void record_header_received(const TransportHeader &header)
static TimePoint_T< MonotonicClock > now()
SingleSendBuffer * send_buffer()
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
MulticastPeer remote_peer_
TO truncate_cast(FROM val)
virtual void send_nakack(SequenceNumber low)
Class to serialize and deserialize data for DDS.
ReliableSession(RcHandle< ReactorInterceptor > interceptor, MulticastDataLink *link, MulticastPeer remote_peer)
static TimeDuration from_msec(const ACE_UINT64 &ms)
void data_unavailable(const FragmentRange &transportSeqDropped)
void send_control(char submessage_id, Message_Block_Ptr data)
std::pair< FragmentNumber, FragmentNumber > FragmentRange
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Holds a data sample received by the transport.
Reverse_Lock_t reverse_start_lock_
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
virtual bool ready_to_deliver(const TransportHeader &header, const ReceivedDataSample &data)
SequenceNumber low() const
virtual bool check_header(const TransportHeader &header)
const TH & received_header() const
Defines class that represents a transport packet header.
virtual bool start(bool active, bool acked)
bool insert(const SequenceRange &range, OPENDDS_VECTOR(SequenceRange)&added)
std::pair< SequenceNumber, SequenceNumber > SequenceRange
virtual void release_remote(const GUID_t &remote)
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
TransportReassembly reassembly_
Sequence number abstraction. Only allows positive 64 bit values.
bool resend(const SequenceRange &range, DisjointSequence *gaps=0)
static SequenceNumber SEQUENCENUMBER_UNKNOWN()
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
void process_naks(const MonotonicTimePoint &)
static const long DEFAULT_NAK_MAX
MulticastInst_rch config()
typedef OPENDDS_VECTOR(ActionConnectionRecord) ConnectionRecords
#define TheServiceParticipant
MulticastDataLink * link_
The Internal API and Implementation of OpenDDS.
DisjointSequence nak_sequence_
MulticastReceiveStrategy * receive_strategy()
ACE_Thread_Mutex held_lock_