diff --git a/RNS/Channel.py b/RNS/Channel.py index bc98b6e..f5e96aa 100644 --- a/RNS/Channel.py +++ b/RNS/Channel.py @@ -234,13 +234,16 @@ class Channel(contextlib.AbstractContextManager): WINDOW = 2 # Absolute minimum window size - WINDOW_MIN = 1 + WINDOW_MIN = 2 + WINDOW_MIN_LIMIT_SLOW = 2 + WINDOW_MIN_LIMIT_MEDIUM = 5 + WINDOW_MIN_LIMIT_FAST = 16 # The maximum window size for transfers on slow links WINDOW_MAX_SLOW = 5 # The maximum window size for transfers on mid-speed links - WINDOW_MAX_MEDIUM = 16 + WINDOW_MAX_MEDIUM = 12 # The maximum window size for transfers on fast links WINDOW_MAX_FAST = 48 @@ -255,7 +258,7 @@ class Channel(contextlib.AbstractContextManager): # If the RTT rate is higher than this value, # the max window size for fast links will be used. - RTT_FAST = 0.25 + RTT_FAST = 0.18 RTT_MEDIUM = 0.75 RTT_SLOW = 1.45 @@ -385,9 +388,8 @@ class Channel(contextlib.AbstractContextManager): RNS.log(f"Envelope: Emplacement of duplicate envelope with sequence "+str(envelope.sequence), RNS.LOG_EXTREME) return False - if envelope.sequence < existing.sequence and not envelope.sequence < window_overflow: + if envelope.sequence < existing.sequence and not (self._next_rx_sequence - envelope.sequence) > (Channel.SEQ_MAX//2): ring.insert(i, envelope) - RNS.log("Inserted seq "+str(envelope.sequence)+" at "+str(i), RNS.LOG_DEBUG) envelope.tracked = True return True @@ -396,6 +398,7 @@ class Channel(contextlib.AbstractContextManager): envelope.tracked = True ring.append(envelope) + return True def _run_callbacks(self, message: MessageBase): @@ -429,13 +432,18 @@ class Channel(contextlib.AbstractContextManager): if not is_new: RNS.log("Duplicate message received on channel "+str(self), RNS.LOG_EXTREME) return - else: + else: with self._lock: contigous = [] for e in self._rx_ring: if e.sequence == self._next_rx_sequence: contigous.append(e) self._next_rx_sequence = (self._next_rx_sequence + 1) % Channel.SEQ_MODULUS + if self._next_rx_sequence == 0: + for e in self._rx_ring: + if e.sequence == self._next_rx_sequence: + contigous.append(e) + self._next_rx_sequence = (self._next_rx_sequence + 1) % Channel.SEQ_MODULUS for e in contigous: if not e.unpacked: @@ -474,6 +482,7 @@ class Channel(contextlib.AbstractContextManager): with self._lock: envelope = next(filter(lambda e: self._outlet.get_packet_id(e.packet) == self._outlet.get_packet_id(packet), self._tx_ring), None) + if envelope and op(envelope): envelope.tracked = False if envelope in self._tx_ring: @@ -481,11 +490,9 @@ class Channel(contextlib.AbstractContextManager): if self.window < self.window_max: self.window += 1 - if (self.window - self.window_min) > (self.window_flexibility-1): - self.window_min += 1 # TODO: Remove at some point - # RNS.log("Increased "+str(self)+" window to "+str(self.window), RNS.LOG_EXTREME) + # RNS.log("Increased "+str(self)+" window to "+str(self.window), RNS.LOG_DEBUG) if self._outlet.rtt != 0: if self._outlet.rtt > Channel.RTT_FAST: @@ -498,15 +505,20 @@ class Channel(contextlib.AbstractContextManager): self.medium_rate_rounds += 1 if self.window_max < Channel.WINDOW_MAX_MEDIUM and self.medium_rate_rounds == Channel.FAST_RATE_THRESHOLD: self.window_max = Channel.WINDOW_MAX_MEDIUM + self.window_min = Channel.WINDOW_MIN_LIMIT_MEDIUM # TODO: Remove at some point - # RNS.log("Increased "+str(self)+" max window to "+str(self.window_max), RNS.LOG_EXTREME) + # RNS.log("Increased "+str(self)+" max window to "+str(self.window_max), RNS.LOG_DEBUG) + # RNS.log("Increased "+str(self)+" min window to "+str(self.window_min), RNS.LOG_DEBUG) else: self.fast_rate_rounds += 1 if self.window_max < Channel.WINDOW_MAX_FAST and self.fast_rate_rounds == Channel.FAST_RATE_THRESHOLD: - self.window_max = Channel.WINDOW_MAX_FAST + self.window_max = Channel.WINDOW_MAX_FAST + self.window_min = Channel.WINDOW_MIN_LIMIT_FAST # TODO: Remove at some point - # RNS.log("Increased "+str(self)+" max window to "+str(self.window_max), RNS.LOG_EXTREME) + # RNS.log("Increased "+str(self)+" max window to "+str(self.window_max), RNS.LOG_DEBUG) + # RNS.log("Increased "+str(self)+" min window to "+str(self.window_min), RNS.LOG_DEBUG) + else: RNS.log("Envelope not found in TX ring for "+str(self), RNS.LOG_EXTREME) @@ -516,8 +528,15 @@ class Channel(contextlib.AbstractContextManager): def _packet_delivered(self, packet: TPacket): self._packet_tx_op(packet, lambda env: True) + def _update_packet_timeouts(self): + for envelope in self._tx_ring: + updated_timeout = self._get_packet_timeout_time(envelope.tries) + if updated_timeout > envelope.packet.receipt.timeout: + envelope.packet.receipt.set_timeout(updated_timeout) + def _get_packet_timeout_time(self, tries: int) -> float: - return pow(2, tries - 1) * max(self._outlet.rtt, 0.01) * 5 + to = pow(1.5, tries - 1) * max(self._outlet.rtt*2.5, 0.025) * (len(self._tx_ring)+1.5) + return to def _packet_timeout(self, packet: TPacket): def retry_envelope(envelope: Envelope) -> bool: @@ -526,17 +545,22 @@ class Channel(contextlib.AbstractContextManager): self._shutdown() # start on separate thread? self._outlet.timed_out() return True + envelope.tries += 1 self._outlet.resend(envelope.packet) self._outlet.set_packet_delivered_callback(envelope.packet, self._packet_delivered) self._outlet.set_packet_timeout_callback(envelope.packet, self._packet_timeout, self._get_packet_timeout_time(envelope.tries)) + self._update_packet_timeouts() if self.window > self.window_min: self.window -= 1 - if self.window_max > self.window_min: + # TODO: Remove at some point + # RNS.log("Decreased "+str(self)+" window to "+str(self.window), RNS.LOG_DEBUG) + + if self.window_max > (self.window_min+self.window_flexibility): self.window_max -= 1 - if (self.window_max - self.window) > (self.window_flexibility-1): - self.window_max -= 1 + # TODO: Remove at some point + # RNS.log("Decreased "+str(self)+" max window to "+str(self.window_max), RNS.LOG_DEBUG) # TODO: Remove at some point # RNS.log("Decreased "+str(self)+" window to "+str(self.window), RNS.LOG_EXTREME) @@ -573,6 +597,7 @@ class Channel(contextlib.AbstractContextManager): envelope.tries += 1 self._outlet.set_packet_delivered_callback(envelope.packet, self._packet_delivered) self._outlet.set_packet_timeout_callback(envelope.packet, self._packet_timeout, self._get_packet_timeout_time(envelope.tries)) + self._update_packet_timeouts() return envelope