From 42a3d23e9996a5640340d20bc01bcfb0baa13f0c Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Fri, 3 Sep 2021 18:53:37 +0200 Subject: [PATCH] Optimised resource transfer timings. Improved request/response timeout handling. --- RNS/Link.py | 16 ++++------------ RNS/Resource.py | 28 +++++++++++++++++----------- RNS/Transport.py | 5 +---- 3 files changed, 22 insertions(+), 27 deletions(-) diff --git a/RNS/Link.py b/RNS/Link.py index 754a2c7..4a4b284 100644 --- a/RNS/Link.py +++ b/RNS/Link.py @@ -562,6 +562,9 @@ class Link: self.handle_response(request_id, response_data) else: RNS.log("Incoming response resource failed with status: "+RNS.hexrep([resource.status]), RNS.LOG_DEBUG) + for pending_request in self.pending_requests: + if pending_request.request_id == resource.request_id: + pending_request.request_timed_out(None) def receive(self, packet): self.watchdog_lock = True @@ -640,7 +643,7 @@ class Link: request_id = RNS.ResourceAdvertisement.get_request_id(packet) for pending_request in self.pending_requests: if pending_request.request_id == request_id: - RNS.Resource.accept(packet, callback=self.response_resource_concluded, progress_callback=pending_request.response_resource_progress) + RNS.Resource.accept(packet, callback=self.response_resource_concluded, progress_callback=pending_request.response_resource_progress, request_id = request_id) pending_request.response_size = RNS.ResourceAdvertisement.get_size(packet) pending_request.response_transfer_size = RNS.ResourceAdvertisement.get_transfer_size(packet) pending_request.started_at = time.time() @@ -911,9 +914,6 @@ class RequestReceipt(): self.started_at = time.time() self.status = RequestReceipt.DELIVERED self.__resource_response_timeout = time.time()+self.timeout - load_thread = threading.Thread(target=self.__resource_response_timeout_job) - load_thread.setDaemon(True) - load_thread.start() else: RNS.log("Sending request "+RNS.prettyhexrep(self.request_id)+" as resource failed with status: "+RNS.hexrep([resource.status]), RNS.LOG_DEBUG) self.status = RequestReceipt.FAILED @@ -951,14 +951,6 @@ class RequestReceipt(): else: resource.cancel() - def __resource_response_timeout_job(self): - while self.status == RequestReceipt.DELIVERED: - now = time.time() - if now > self.__resource_response_timeout: - self.request_timed_out(None) - - time.sleep(0.1) - def response_received(self, response): if not self.status == RequestReceipt.FAILED: diff --git a/RNS/Resource.py b/RNS/Resource.py index 1056b7c..5bbb963 100644 --- a/RNS/Resource.py +++ b/RNS/Resource.py @@ -46,10 +46,11 @@ class Resource: # bz2 before sending. AUTO_COMPRESS_MAX_SIZE = MAX_EFFICIENT_SIZE - PART_TIMEOUT_FACTOR = 3 - MAX_RETRIES = 5 - SENDER_GRACE_TIME = 10 - RETRY_GRACE_TIME = 0.25 + PART_TIMEOUT_FACTOR = 4 + PART_TIMEOUT_FACTOR_AFTER_RTT = 2 + MAX_RETRIES = 5 + SENDER_GRACE_TIME = 10 + RETRY_GRACE_TIME = 0.25 HASHMAP_IS_NOT_EXHAUSTED = 0x00 HASHMAP_IS_EXHAUSTED = 0xFF @@ -66,11 +67,11 @@ class Resource: CORRUPT = 0x08 @staticmethod - def accept(advertisement_packet, callback=None, progress_callback = None): + def accept(advertisement_packet, callback=None, progress_callback = None, request_id = None): try: adv = ResourceAdvertisement.unpack(advertisement_packet.plaintext) - resource = Resource(None, advertisement_packet.link) + resource = Resource(None, advertisement_packet.link, request_id = request_id) resource.status = Resource.TRANSFERRING resource.flags = adv.f @@ -539,11 +540,16 @@ class Resource: if self.req_resp == None: self.req_resp = self.last_activity rtt = self.req_resp-self.req_sent - if self.rtt == None: - self.rtt = rtt - self.watchdog_job() - elif self.rtt < rtt: - self.rtt = rtt + + if rtt >= self.link.rtt: + self.part_timeout_factor = Resource.PART_TIMEOUT_FACTOR_AFTER_RTT + if self.rtt == None: + self.rtt = rtt + self.watchdog_job() + elif rtt < self.rtt: + self.rtt = max(self.rtt - self.rtt*0.05, rtt) + elif rtt > self.rtt: + self.rtt = min(self.rtt + self.rtt*0.05, rtt) if not self.status == Resource.FAILED: self.status = Resource.TRANSFERRING diff --git a/RNS/Transport.py b/RNS/Transport.py index 25b380f..07559b6 100755 --- a/RNS/Transport.py +++ b/RNS/Transport.py @@ -525,13 +525,10 @@ class Transport: # accordingly if we are. if packet.transport_id != None and packet.packet_type != RNS.Packet.ANNOUNCE: if packet.transport_id == Transport.identity.hash: - # TODO: Remove at some point - # RNS.log("Received packet in transport for "+RNS.prettyhexrep(packet.destination_hash)+" with matching transport ID, transporting it...", RNS.LOG_DEBUG) if packet.destination_hash in Transport.destination_table: next_hop = Transport.destination_table[packet.destination_hash][1] remaining_hops = Transport.destination_table[packet.destination_hash][2] - # TODO: Remove at some point - # RNS.log("Next hop to destination is "+RNS.prettyhexrep(next_hop)+" with "+str(remaining_hops)+" hops remaining, transporting it.", RNS.LOG_DEBUG) + if remaining_hops > 1: # Just increase hop count and transmit new_raw = packet.raw[0:1]