From 2678aeb6a1229f8bf0b86a7a3b34cb1c344d0ca4 Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Sat, 28 Aug 2021 20:01:01 +0200 Subject: [PATCH] Improved timeout calculation and handling. --- RNS/Destination.py | 2 +- RNS/Link.py | 100 ++++++++++++++++++++++++++++++++------------- RNS/Packet.py | 19 +++++---- RNS/Resource.py | 16 +++++++- RNS/Transport.py | 16 +++++++- 5 files changed, 112 insertions(+), 41 deletions(-) diff --git a/RNS/Destination.py b/RNS/Destination.py index 7f9be66..0418d56 100755 --- a/RNS/Destination.py +++ b/RNS/Destination.py @@ -22,7 +22,7 @@ class Destination: encrypted communication with it. :param identity: An instance of :ref:`RNS.Identity`. Can hold only public keys for an outgoing destination, or holding private keys for an ingoing. - :param direction: ``RNS.Destination.IN`` or ``RNS.Destination.OUT`` + :param direction: ``RNS.Destination.IN`` or ``RNS.Destination.OUT``. :param type: ``RNS.Destination.SINGLE``, ``RNS.Destination.GROUP`` or ``RNS.Destination.PLAIN``. :param app_name: A string specifying the app name. :param \*aspects: Any non-zero number of string arguments. diff --git a/RNS/Link.py b/RNS/Link.py index 521a812..bc6a616 100644 --- a/RNS/Link.py +++ b/RNS/Link.py @@ -46,14 +46,15 @@ class Link: MDU = math.floor((RNS.Reticulum.MDU-RNS.Identity.AES_HMAC_OVERHEAD)/RNS.Identity.AES128_BLOCKSIZE)*RNS.Identity.AES128_BLOCKSIZE - 1 - # TODO: This should not be hardcoded, - # but calculated from something like - # first-hop RTT latency and distance - DEFAULT_TIMEOUT = 60.0 + # This value is set at a reasonable + # level for a 1 Kb/s channel. + ESTABLISHMENT_TIMEOUT_PER_HOP = 3 """ - Default timeout for link establishment in seconds. + Default timeout for link establishment in seconds per hop to destination. """ - TIMEOUT_FACTOR = 3 + + TRAFFIC_TIMEOUT_FACTOR = 20 + KEEPALIVE_TIMEOUT_FACTOR = 4 STALE_GRACE = 2 KEEPALIVE = 360 """ @@ -119,9 +120,10 @@ class Link: self.rx = 0 self.txbytes = 0 self.rxbytes = 0 - self.default_timeout = Link.DEFAULT_TIMEOUT - self.proof_timeout = self.default_timeout - self.timeout_factor = Link.TIMEOUT_FACTOR + self.establishment_timeout = Link.ESTABLISHMENT_TIMEOUT_PER_HOP * max(1, RNS.Transport.hops_to(destination.hash)) + RNS.log("Establishment timeout set to: "+str(self.establishment_timeout)) + self.traffic_timeout_factor = Link.TRAFFIC_TIMEOUT_FACTOR + self.keepalive_timeout_factor = Link.KEEPALIVE_TIMEOUT_FACTOR self.keepalive = Link.KEEPALIVE self.watchdog_lock = False self.status = Link.PENDING @@ -273,31 +275,35 @@ class Link: self.had_outbound() - def request(self, path, data = None, response_callback = None, failed_callback = None, timeout = None): + def request(self, path, data = None, response_callback = None, failed_callback = None, progress_callback = None, timeout = None): """ Sends a request to the remote peer. :param path: The request path. :param response_callback: An optional function or method with the signature *response_callback(request_receipt)* to be called when a response is received. See the :ref:`Request Example` for more info. :param failed_callback: An optional function or method with the signature *failed_callback(request_receipt)* to be called when a request fails. See the :ref:`Request Example` for more info. - :param timeout: An optional timeout in seconds for the request. If *None* is supplied, this defaults to ``RNS.Packet.TIMEOUT``. + :param progress_callback: An optional function or method with the signature *progress_callback(request_receipt)* to be called when progress is made receiving the response. Progress can be accessed as a float between 0.0 and 1.0 by the *request_receipt.progress* property. + :param timeout: An optional timeout in seconds for the request. If *None* is supplied it will be calculated based on link RTT. """ request_path_hash = RNS.Identity.truncated_hash(path.encode("utf-8")) unpacked_request = [time.time(), request_path_hash, data] packed_request = umsgpack.packb(unpacked_request) + if timeout == None: + timeout = self.rtt * self.traffic_timeout_factor + if len(packed_request) <= Link.MDU: request_packet = RNS.Packet(self, packed_request, RNS.Packet.DATA, context = RNS.Packet.REQUEST) packet_receipt = request_packet.send() - if timeout != None: - packet_receipt.set_timeout(timeout) + packet_receipt.set_timeout(timeout) return RequestReceipt( self, packet_receipt = packet_receipt, response_callback = response_callback, failed_callback = failed_callback, + progress_callback = progress_callback, timeout = timeout ) @@ -311,6 +317,7 @@ class Link: resource = request_resource, response_callback = response_callback, failed_callback = failed_callback, + progress_callback = progress_callback, timeout = timeout ) @@ -425,9 +432,9 @@ class Link: # Link was initiated, but no response # from destination yet if self.status == Link.PENDING: - next_check = self.request_time + self.proof_timeout + next_check = self.request_time + self.establishment_timeout sleep_time = next_check - time.time() - if time.time() >= self.request_time + self.proof_timeout: + if time.time() >= self.request_time + self.establishment_timeout: RNS.log("Link establishment timed out", RNS.LOG_VERBOSE) self.status = Link.CLOSED self.teardown_reason = Link.TIMEOUT @@ -435,9 +442,9 @@ class Link: sleep_time = 0.001 elif self.status == Link.HANDSHAKE: - next_check = self.request_time + self.proof_timeout + next_check = self.request_time + self.establishment_timeout sleep_time = next_check - time.time() - if time.time() >= self.request_time + self.proof_timeout: + if time.time() >= self.request_time + self.establishment_timeout: RNS.log("Timeout waiting for RTT packet from link initiator", RNS.LOG_DEBUG) self.status = Link.CLOSED self.teardown_reason = Link.TIMEOUT @@ -446,7 +453,7 @@ class Link: elif self.status == Link.ACTIVE: if time.time() >= self.last_inbound + self.keepalive: - sleep_time = self.rtt * self.timeout_factor + Link.STALE_GRACE + sleep_time = self.rtt * self.keepalive_timeout_factor + Link.STALE_GRACE self.status = Link.STALE if self.initiator: self.send_keepalive() @@ -620,7 +627,13 @@ class Link: if RNS.ResourceAdvertisement.is_request(packet): RNS.Resource.accept(packet, callback=self.request_resource_concluded) elif RNS.ResourceAdvertisement.is_response(packet): - RNS.Resource.accept(packet, callback=self.response_resource_concluded) + request_id = RNS.ResourceAdvertisement.get_request_id(packet) + for pending_request in self.pending_requests: + if pending_request.request_id == request_id: + RNS.Resource.accept(packet, callback=self.response_resource_concluded, progress_callback=pending_request.response_resource_progress) + pending_request.response_size = RNS.ResourceAdvertisement.get_size(packet) + pending_request.response_transfer_size = RNS.ResourceAdvertisement.get_transfer_size(packet) + pending_request.started_at = time.time() elif self.resource_strategy == Link.ACCEPT_NONE: pass elif self.resource_strategy == Link.ACCEPT_APP: @@ -846,34 +859,41 @@ class RequestReceipt(): DELIVERED = 0x02 READY = 0x03 - def __init__(self, link, packet_receipt = None, resource = None, response_callback = None, failed_callback = None, timeout = None): + def __init__(self, link, packet_receipt = None, resource = None, response_callback = None, failed_callback = None, progress_callback = None, timeout = None): self.packet_receipt = packet_receipt self.resource = resource + self.started_at = None if self.packet_receipt != None: self.hash = packet_receipt.truncated_hash self.packet_receipt.set_timeout_callback(self.request_timed_out) + self.started_at = time.time() elif self.resource != None: self.hash = resource.request_id resource.set_callback(self.request_resource_concluded) - self.link = link - self.request_id = self.hash + self.link = link + self.request_id = self.hash - self.response = None - self.status = RequestReceipt.SENT - self.sent_at = time.time() - self.concluded_at = None + self.response = None + self.response_transfer_size = None + self.response_size = None + self.status = RequestReceipt.SENT + self.sent_at = time.time() + self.progress = 0 + self.concluded_at = None + self.response_concluded_at = None if timeout != None: self.timeout = timeout else: - self.timeout = RNS.Packet.TIMEOUT + raise ValueError("No timeout specified for request receipt") self.callbacks = RequestReceiptCallbacks() self.callbacks.response = response_callback self.callbacks.failed = failed_callback + self.callbacks.progress = progress_callback self.link.pending_requests.append(self) @@ -881,6 +901,7 @@ class RequestReceipt(): def request_resource_concluded(self, resource): if resource.status == RNS.Resource.COMPLETE: RNS.log("Request "+RNS.prettyhexrep(self.request_id)+" successfully sent as resource.", RNS.LOG_DEBUG) + self.started_at = time.time() self.status = RequestReceipt.DELIVERED self.__resource_response_timeout = time.time()+self.timeout load_thread = threading.Thread(target=self.__resource_response_timeout_job) @@ -904,7 +925,13 @@ class RequestReceipt(): if self.callbacks.failed != None: self.callbacks.failed(self) - + def response_resource_progress(self, resource): + self.progress = resource.progress() + self.__resource_response_timeout = time.time()+self.timeout + + if self.callbacks.progress != None: + self.callbacks.progress(self) + def __resource_response_timeout_job(self): while self.status == RequestReceipt.DELIVERED: if time.time() > self.__resource_response_timeout: @@ -914,8 +941,14 @@ class RequestReceipt(): def response_received(self, response): + self.progress = 1.0 self.response = response self.status = RequestReceipt.READY + self.response_concluded_at = time.time() + + if len(response) <= Link.MDU: + self.response_size = len(response) + self.response_transfer_size = len(response) if self.packet_receipt != None: self.packet_receipt.status = RNS.PacketReceipt.DELIVERED @@ -924,11 +957,20 @@ class RequestReceipt(): if self.packet_receipt.callbacks.delivery != None: self.packet_receipt.callbacks.delivery(self) + if self.callbacks.progress != None: + self.callbacks.progress(self) + if self.callbacks.response != None: self.callbacks.response(self) + def response_time(self): + if self.status == RequestReceipt.READY: + return self.response_concluded_at - self.started_at + + class RequestReceiptCallbacks: def __init__(self): self.response = None - self.failed = None \ No newline at end of file + self.failed = None + self.progress = None \ No newline at end of file diff --git a/RNS/Packet.py b/RNS/Packet.py index 15cfd0f..09588a7 100755 --- a/RNS/Packet.py +++ b/RNS/Packet.py @@ -71,19 +71,18 @@ class Packet: # With an MTU of 500, the maximum of data we can # send in a single encrypted packet is given by # the below calculation; 383 bytes. - ENCRYPTED_MDU = math.floor((RNS.Reticulum.MDU-RNS.Identity.AES_HMAC_OVERHEAD-RNS.Identity.KEYSIZE//16)/RNS.Identity.AES128_BLOCKSIZE)*RNS.Identity.AES128_BLOCKSIZE - 1 + ENCRYPTED_MDU = math.floor((RNS.Reticulum.MDU-RNS.Identity.AES_HMAC_OVERHEAD-RNS.Identity.KEYSIZE//16)/RNS.Identity.AES128_BLOCKSIZE)*RNS.Identity.AES128_BLOCKSIZE - 1 """ The maximum size of the payload data in a single encrypted packet """ - PLAIN_MDU = MDU + PLAIN_MDU = MDU """ The maximum size of the payload data in a single unencrypted packet """ - # TODO: This should be calculated - # more intelligently - # Default packet timeout - TIMEOUT = 60 + # This value is set at a reasonable + # level for a 1 Kb/s channel. + TIMEOUT_PER_HOP = 5 def __init__(self, destination, data, packet_type = DATA, context = NONE, transport_type = RNS.Transport.BROADCAST, header_type = HEADER_1, transport_id = None, attached_interface = None, create_receipt = True): if destination != None: @@ -329,13 +328,18 @@ class PacketReceipt: self.truncated_hash = packet.getTruncatedHash() self.sent = True self.sent_at = time.time() - self.timeout = Packet.TIMEOUT self.proved = False self.status = PacketReceipt.SENT self.destination = packet.destination self.callbacks = PacketReceiptCallbacks() self.concluded_at = None + if packet.destination.type == RNS.Destination.LINK: + self.timeout = packet.destination.rtt * packet.destination.traffic_timeout_factor + else: + self.timeout = Packet.TIMEOUT_PER_HOP * RNS.Transport.hops_to(self.destination.hash) + + def get_status(self): """ :returns: The status of the associated :ref:`RNS.Packet` instance. Can be one of ``RNS.PacketReceipt.SENT``, ``RNS.PacketReceipt.DELIVERED``, ``RNS.PacketReceipt.FAILED`` or ``RNS.PacketReceipt.CULLED``. @@ -448,7 +452,6 @@ class PacketReceipt: thread = threading.Thread(target=self.callbacks.timeout, args=(self,)) thread.setDaemon(True) thread.start() - #self.callbacks.timeout(self) def set_timeout(self, timeout): diff --git a/RNS/Resource.py b/RNS/Resource.py index f367bca..ecefcb7 100644 --- a/RNS/Resource.py +++ b/RNS/Resource.py @@ -182,7 +182,7 @@ class Resource: self.link = link self.max_retries = Resource.MAX_RETRIES self.retries_left = self.max_retries - self.timeout_factor = self.link.timeout_factor + self.timeout_factor = self.link.traffic_timeout_factor self.sender_grace_time = Resource.SENDER_GRACE_TIME self.hmu_retry_ok = False self.watchdog_lock = False @@ -197,7 +197,7 @@ class Resource: if timeout != None: self.timeout = timeout else: - self.timeout = self.link.rtt * 20 + self.timeout = self.link.rtt * self.link.traffic_timeout_factor if data != None: self.initiator = True @@ -807,6 +807,18 @@ class ResourceAdvertisement: return adv.q + @staticmethod + def get_transfer_size(advertisement_packet): + adv = ResourceAdvertisement.unpack(advertisement_packet.plaintext) + return adv.t + + + @staticmethod + def get_size(advertisement_packet): + adv = ResourceAdvertisement.unpack(advertisement_packet.plaintext) + return adv.d + + def __init__(self, resource=None, request_id=None, is_response=False): if resource != None: self.t = resource.size # Transfer size diff --git a/RNS/Transport.py b/RNS/Transport.py index c918b6e..a400a78 100755 --- a/RNS/Transport.py +++ b/RNS/Transport.py @@ -22,7 +22,10 @@ class Transport: APP_NAME = "rnstransport" - PATHFINDER_M = 18 # Max hops + PATHFINDER_M = 128 # Max hops + """ + Maximum amount of hops that Reticulum will transport a packet. + """ PATHFINDER_C = 2.0 # Decay constant PATHFINDER_R = 1 # Retransmit retries PATHFINDER_T = 10 # Retry grace period @@ -1025,6 +1028,17 @@ class Transport: else: return False + @staticmethod + def hops_to(destination_hash): + """ + :param destination_hash: A destination hash as *bytes*. + :returns: The number of hops to the specified destination, or ``RNS.Transport.PATHFINDER_M`` if the number of hops is unknown. + """ + if destination_hash in Transport.destination_table: + return Transport.destination_table[destination_hash][2] + else: + return Transport.PATHFINDER_M + @staticmethod def request_path(destination_hash): """