From ae1d962b9ba5915501272e69377766b99feb799d Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Sun, 14 Jan 2024 00:46:55 +0100 Subject: [PATCH] Fixed large resource transfers failing under some conditions --- RNS/Link.py | 17 +++++++++++----- RNS/Resource.py | 52 ++++++++++++++++++++++++++++++------------------- 2 files changed, 44 insertions(+), 25 deletions(-) diff --git a/RNS/Link.py b/RNS/Link.py index 390f0df..6a0ffc2 100644 --- a/RNS/Link.py +++ b/RNS/Link.py @@ -687,7 +687,9 @@ class Link: remove = pending_request try: pending_request.response_size = response_size - pending_request.response_transfer_size = response_transfer_size + if pending_request.response_transfer_size == None: + pending_request.response_transfer_size = 0 + pending_request.response_transfer_size += response_transfer_size pending_request.response_received(response_data) except Exception as e: RNS.log("Error occurred while handling response. The contained exception was: "+str(e), RNS.LOG_ERROR) @@ -836,9 +838,13 @@ class Link: if pending_request.request_id == request_id: response_resource = RNS.Resource.accept(packet, callback=self.response_resource_concluded, progress_callback=pending_request.response_resource_progress, request_id = request_id) if response_resource != None: - pending_request.response_size = RNS.ResourceAdvertisement.read_size(packet) - pending_request.response_transfer_size = RNS.ResourceAdvertisement.read_transfer_size(packet) - pending_request.started_at = time.time() + if pending_request.response_size == None: + pending_request.response_size = RNS.ResourceAdvertisement.read_size(packet) + if pending_request.response_transfer_size == None: + pending_request.response_transfer_size = 0 + pending_request.response_transfer_size += RNS.ResourceAdvertisement.read_transfer_size(packet) + if pending_request.started_at == None: + pending_request.started_at = time.time() pending_request.response_resource_progress(response_resource) elif self.resource_strategy == Link.ACCEPT_NONE: @@ -1133,7 +1139,8 @@ class RequestReceipt(): def request_resource_concluded(self, resource): if resource.status == RNS.Resource.COMPLETE: RNS.log("Request "+RNS.prettyhexrep(self.request_id)+" successfully sent as resource.", RNS.LOG_DEBUG) - self.started_at = time.time() + if self.started_at == None: + self.started_at = time.time() self.status = RequestReceipt.DELIVERED self.__resource_response_timeout = time.time()+self.timeout response_timeout_thread = threading.Thread(target=self.__response_timeout_job) diff --git a/RNS/Resource.py b/RNS/Resource.py index 533172a..b74e9a2 100644 --- a/RNS/Resource.py +++ b/RNS/Resource.py @@ -25,6 +25,7 @@ import os import bz2 import math import time +import tempfile import threading from threading import Lock from .vendor import umsgpack as umsgpack @@ -106,7 +107,8 @@ class Resource: PART_TIMEOUT_FACTOR_AFTER_RTT = 2 MAX_RETRIES = 16 MAX_ADV_RETRIES = 4 - SENDER_GRACE_TIME = 10 + SENDER_GRACE_TIME = 10.0 + PROCESSING_GRACE = 1.0 RETRY_GRACE_TIME = 0.25 PER_RETRY_DELAY = 0.5 @@ -203,9 +205,19 @@ class Resource: resource_data = None self.assembly_lock = False + if data != None: + if not hasattr(data, "read") and len(data) > Resource.MAX_EFFICIENT_SIZE: + original_data = data + data_size = len(original_data) + data = tempfile.TemporaryFile() + data.write(original_data) + del original_data + if hasattr(data, "read"): - data_size = os.stat(data.name).st_size - self.total_size = data_size + if data_size == None: + data_size = os.stat(data.name).st_size + + self.total_size = data_size self.grand_total_parts = math.ceil(data_size/Resource.SDU) if data_size <= Resource.MAX_EFFICIENT_SIZE: @@ -278,7 +290,7 @@ class Resource: self.uncompressed_data = data compression_began = time.time() - if (auto_compress and len(self.uncompressed_data) < Resource.AUTO_COMPRESS_MAX_SIZE): + if (auto_compress and len(self.uncompressed_data) <= Resource.AUTO_COMPRESS_MAX_SIZE): RNS.log("Compressing resource data...", RNS.LOG_DEBUG) self.compressed_data = bz2.compress(self.uncompressed_data) RNS.log("Compression completed in "+str(round(time.time()-compression_began, 3))+" seconds", RNS.LOG_DEBUG) @@ -440,7 +452,7 @@ class Resource: sleep_time = None if self.status == Resource.ADVERTISED: - sleep_time = (self.adv_sent+self.timeout)-time.time() + sleep_time = (self.adv_sent+self.timeout+Resource.PROCESSING_GRACE)-time.time() if sleep_time < 0: if self.retries_left <= 0: RNS.log("Resource transfer timeout after sending advertisement", RNS.LOG_DEBUG) @@ -612,10 +624,24 @@ class Resource: self.callback(self) except Exception as e: RNS.log("Error while executing resource concluded callback from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR) + finally: + if hasattr(self.input_file, "close") and callable(self.input_file.close): + try: + self.input_file.close() + except Exception as e: + RNS.log("Error while closing resource input file: "+str(e), RNS.LOG_ERROR) else: # Otherwise we'll recursively create the # next segment of the resource - Resource(self.input_file, self.link, callback = self.callback, segment_index = self.segment_index+1, original_hash=self.original_hash, progress_callback = self.__progress_callback) + Resource( + self.input_file, self.link, + callback = self.callback, + segment_index = self.segment_index+1, + original_hash=self.original_hash, + progress_callback = self.__progress_callback, + request_id = self.request_id, + is_response = self.is_response, + ) else: pass else: @@ -731,20 +757,6 @@ class Resource: i = 0; pn = self.consecutive_completed_height+1 search_start = pn search_size = self.window - - # TODO: Remove - # tpm = [] - # tpi = 0 - # try: - # for p in self.parts: - # if p == None: - # tpm.append(None) - # else: - # tpm.append(tpi) - # tpi+=1 - # except Exception as e: - # print(str(e)) - # RNS.log(f"Partmap: "+str(tpm)) for part in self.parts[search_start:search_start+search_size]: if part == None: