From 07cf180ea8eb5d9de950d86077f81bbf410f9388 Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Fri, 3 Sep 2021 21:08:20 +0200 Subject: [PATCH] Added continous resource timeout adjustment. Fixes missing response timeout check. --- RNS/Link.py | 24 ++++++++++++++++++------ RNS/Resource.py | 31 ++++++++++++++++++------------- 2 files changed, 36 insertions(+), 19 deletions(-) diff --git a/RNS/Link.py b/RNS/Link.py index 4a4b284..a3e2957 100644 --- a/RNS/Link.py +++ b/RNS/Link.py @@ -292,7 +292,7 @@ class Link: packed_request = umsgpack.packb(unpacked_request) if timeout == None: - timeout = self.rtt * self.traffic_timeout_factor + timeout = self.rtt * self.traffic_timeout_factor + RNS.Resource.RESPONSE_MAX_GRACE_TIME if len(packed_request) <= Link.MDU: request_packet = RNS.Packet(self, packed_request, RNS.Packet.DATA, context = RNS.Packet.REQUEST) @@ -867,7 +867,8 @@ class RequestReceipt(): FAILED = 0x00 SENT = 0x01 DELIVERED = 0x02 - READY = 0x03 + RECEIVING = 0x03 + READY = 0x04 def __init__(self, link, packet_receipt = None, resource = None, response_callback = None, failed_callback = None, progress_callback = None, timeout = None): self.packet_receipt = packet_receipt @@ -914,6 +915,9 @@ class RequestReceipt(): self.started_at = time.time() self.status = RequestReceipt.DELIVERED self.__resource_response_timeout = time.time()+self.timeout + response_timeout_thread = threading.Thread(target=self.__response_timeout_job) + response_timeout_thread.setDaemon(True) + response_timeout_thread.start() else: RNS.log("Sending request "+RNS.prettyhexrep(self.request_id)+" as resource failed with status: "+RNS.hexrep([resource.status]), RNS.LOG_DEBUG) self.status = RequestReceipt.FAILED @@ -924,6 +928,15 @@ class RequestReceipt(): self.callbacks.failed(self) + def __response_timeout_job(self): + while self.status == RequestReceipt.DELIVERED: + now = time.time() + if now > self.__resource_response_timeout: + self.request_timed_out(None) + + time.sleep(0.1) + + def request_timed_out(self, packet_receipt): self.status = RequestReceipt.FAILED self.concluded_at = time.time() @@ -932,9 +945,10 @@ class RequestReceipt(): if self.callbacks.failed != None: self.callbacks.failed(self) + def response_resource_progress(self, resource): if not self.status == RequestReceipt.FAILED: - self.status = RequestReceipt.DELIVERED + self.status = RequestReceipt.RECEIVING if self.packet_receipt != None: self.packet_receipt.status = RNS.PacketReceipt.DELIVERED self.packet_receipt.proved = True @@ -943,9 +957,7 @@ class RequestReceipt(): self.packet_receipt.callbacks.delivery(self.packet_receipt) self.progress = resource.get_progress() - now = time.time() - self.__resource_response_timeout = time.time()+self.timeout - + if self.callbacks.progress != None: self.callbacks.progress(self) else: diff --git a/RNS/Resource.py b/RNS/Resource.py index 5bbb963..2de6cb8 100644 --- a/RNS/Resource.py +++ b/RNS/Resource.py @@ -32,15 +32,16 @@ class Resource: # maximum size a resource should be, if # it is to be handled within reasonable # time constraint, even on small systems. - + # # A small system in this regard is # defined as a Raspberry Pi, which should # be able to compress, encrypt and hash-map # the resource in about 10 seconds. - + # # This constant will be used when determining # how to sequence the sending of large resources. - MAX_EFFICIENT_SIZE = 16 * 1024 * 1024 + MAX_EFFICIENT_SIZE = 16 * 1024 * 1024 + RESPONSE_MAX_GRACE_TIME = 10 # The maximum size to auto-compress with # bz2 before sending. @@ -389,7 +390,12 @@ class Resource: elif self.status == Resource.TRANSFERRING: if not self.initiator: - rtt = self.link.rtt if self.rtt == None else self.rtt + + if self.rtt == None: + rtt = self.link.rtt + else: + rtt = self.rtt + sleep_time = self.last_activity + (rtt*self.part_timeout_factor) + Resource.RETRY_GRACE_TIME - time.time() if sleep_time < 0: @@ -541,15 +547,14 @@ class Resource: self.req_resp = self.last_activity rtt = self.req_resp-self.req_sent - if rtt >= self.link.rtt: - self.part_timeout_factor = Resource.PART_TIMEOUT_FACTOR_AFTER_RTT - if self.rtt == None: - self.rtt = rtt - self.watchdog_job() - elif rtt < self.rtt: - self.rtt = max(self.rtt - self.rtt*0.05, rtt) - elif rtt > self.rtt: - self.rtt = min(self.rtt + self.rtt*0.05, rtt) + self.part_timeout_factor = Resource.PART_TIMEOUT_FACTOR_AFTER_RTT + if self.rtt == None: + self.rtt = self.link.rtt + self.watchdog_job() + elif rtt < self.rtt: + self.rtt = max(self.rtt - self.rtt*0.05, rtt) + elif rtt > self.rtt: + self.rtt = min(self.rtt + self.rtt*0.05, rtt) if not self.status == Resource.FAILED: self.status = Resource.TRANSFERRING