From c595fdd54be8cd9ba1e5672692f05f105ba044a3 Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Tue, 12 May 2020 16:45:51 +0200 Subject: [PATCH] Optimised resource transfer implementation --- Examples/Filetransfer.py | 17 +- RNS/Interfaces/TCPInterface.py | 3 +- RNS/Link.py | 21 ++- RNS/Packet.py | 5 +- RNS/Resource.py | 290 ++++++++++++++++++++++----------- RNS/Transport.py | 2 + 6 files changed, 227 insertions(+), 111 deletions(-) diff --git a/Examples/Filetransfer.py b/Examples/Filetransfer.py index 1647f34..3f3fb66 100644 --- a/Examples/Filetransfer.py +++ b/Examples/Filetransfer.py @@ -123,7 +123,10 @@ def client_request(message, packet): # read it and pack it as a resource RNS.log("Client requested \""+filename+"\"") file = open(os.path.join(serve_path, filename), "rb") - file_resource = RNS.Resource(file.read(), packet.link, callback=resource_sending_concluded) + file_data = file.read() + file.close() + + file_resource = RNS.Resource(file_data, packet.link, callback=resource_sending_concluded) file_resource.filename = filename except: # If somethign went wrong, we close @@ -138,10 +141,15 @@ def client_request(message, packet): # This function is called on the server when a # resource transfer concludes. def resource_sending_concluded(resource): + if hasattr(resource, "filename"): + name = resource.filename + else: + name = "resource" + if resource.status == RNS.Resource.COMPLETE: - RNS.log("Done sending \""+resource.filename+"\" to client") + RNS.log("Done sending \""+name+"\" to client") elif resource.status == RNS.Resource.FAILED: - RNS.log("Sending \""+resource.filename+"\" to client failed") + RNS.log("Sending \""+name+"\" to client failed") def list_delivered(receipt): RNS.log("The file list was received by the client") @@ -297,8 +305,7 @@ def print_menu(): if time.time() > download_began+APP_TIMEOUT: print("The download timed out") time.sleep(1) - menu_mode = "main" - print_menu() + server_link.teardown() if menu_mode == "downloading": print("Download started") diff --git a/RNS/Interfaces/TCPInterface.py b/RNS/Interfaces/TCPInterface.py index 875d1dc..dd76d84 100644 --- a/RNS/Interfaces/TCPInterface.py +++ b/RNS/Interfaces/TCPInterface.py @@ -118,7 +118,8 @@ class TCPClientInterface(Interface): self.online = False self.OUT = False self.IN = False - RNS.Transport.interfaces.remove(self) + if self in RNS.Transport.interfaces: + RNS.Transport.interfaces.remove(self) def __str__(self): diff --git a/RNS/Link.py b/RNS/Link.py index af92b15..61f7f9f 100644 --- a/RNS/Link.py +++ b/RNS/Link.py @@ -34,6 +34,7 @@ class Link: # first-hop RTT latency and distance DEFAULT_TIMEOUT = 15.0 TIMEOUT_FACTOR = 3 + STALE_GRACE = 2 KEEPALIVE = 180 PENDING = 0x00 @@ -201,7 +202,9 @@ class Link: self.status = Link.ACTIVE if self.callbacks.link_established != None: - self.callbacks.link_established(self) + thread = threading.Thread(target=self.callbacks.link_established, args=(self,)) + thread.setDaemon(True) + thread.start() else: RNS.log("Invalid link proof signature received by "+str(self), RNS.LOG_VERBOSE) # TODO: should we really do this, or just wait @@ -309,7 +312,7 @@ class Link: elif self.status == Link.ACTIVE: if time.time() >= self.last_inbound + self.keepalive: - sleep_time = self.rtt * self.timeout_factor + sleep_time = self.rtt * self.timeout_factor + Link.STALE_GRACE self.status = Link.STALE if self.initiator: self.send_keepalive() @@ -326,8 +329,8 @@ class Link: if sleep_time == 0: RNS.log("Warning! Link watchdog sleep time of 0!", RNS.LOG_ERROR) if sleep_time == None or sleep_time < 0: - RNS.log("Timing error! Closing Reticulum now.", RNS.LOG_CRITICAL) - RNS.panic() + RNS.log("Timing error! Tearing down link "+str(self)+" now.", RNS.LOG_ERROR) + self.teardown() sleep(sleep_time) @@ -352,7 +355,9 @@ class Link: if packet.context == RNS.Packet.NONE: plaintext = self.decrypt(packet.data) if self.callbacks.packet != None: - self.callbacks.packet(plaintext, packet) + thread = threading.Thread(target=self.callbacks.packet, args=(plaintext, packet)) + thread.setDaemon(True) + thread.start() if self.destination.proof_strategy == RNS.Destination.PROVE_ALL: packet.prove() @@ -374,7 +379,9 @@ class Link: pass elif self.resource_strategy == Link.ACCEPT_APP: if self.callbacks.resource != None: - self.callbacks.resource(packet) + thread = threading.Thread(target=self.callbacks.resource, args=(packet)) + thread.setDaemon(True) + thread.start() elif self.resource_strategy == Link.ACCEPT_ALL: RNS.Resource.accept(packet, self.callbacks.resource_concluded) @@ -497,7 +504,7 @@ class Link: if resource in self.outgoing_resources: self.outgoing_resources.remove(resource) else: - RNS.log("Attempt to cancel a non-existing incoming resource", RNS.LOG_ERROR) + RNS.log("Attempt to cancel a non-existing outgoing resource", RNS.LOG_ERROR) def cancel_incoming_resource(self, resource): if resource in self.incoming_resources: diff --git a/RNS/Packet.py b/RNS/Packet.py index c6cfad2..3e1791f 100755 --- a/RNS/Packet.py +++ b/RNS/Packet.py @@ -189,8 +189,9 @@ class Packet: if RNS.Transport.outbound(self): return self.receipt else: - # TODO: Don't raise error here, handle gracefully - raise IOError("Packet could not be sent! Do you have any outbound interfaces configured?") + # TODO: Decide whether this failure should simply + # return none, or raise an error + raise IOError("No interfaces could process the outbound packet") else: raise IOError("Packet was already sent") diff --git a/RNS/Resource.py b/RNS/Resource.py index f829fd3..9dbcdd6 100644 --- a/RNS/Resource.py +++ b/RNS/Resource.py @@ -7,12 +7,17 @@ from .vendor import umsgpack as umsgpack from time import sleep class Resource: - WINDOW_MIN = 1 - WINDOW_MAX = 7 - WINDOW = 4 - MAPHASH_LEN = 4 - SDU = RNS.Packet.MDU - RANDOM_HASH_SIZE = 4 + WINDOW_FLEXIBILITY = 4 + WINDOW_MIN = 1 + WINDOW_MAX = 10 + WINDOW = 4 + MAPHASH_LEN = 4 + SDU = RNS.Packet.MDU + RANDOM_HASH_SIZE = 4 + + # The maximum size to auto-compress with + # bz2 before sending. + AUTO_COMPRESS_MAX_SIZE = 32 * 1024 * 1024 # TODO: Should be allocated more # intelligently @@ -58,11 +63,16 @@ class Resource: resource.outstanding_parts = 0 resource.parts = [None] * resource.total_parts resource.window = Resource.WINDOW + resource.window_max = Resource.WINDOW_MAX + resource.window_min = Resource.WINDOW_MIN + resource.window_flexibility = Resource.WINDOW_FLEXIBILITY resource.last_activity = time.time() resource.hashmap = [None] * resource.total_parts resource.hashmap_height = 0 resource.waiting_for_hmu = False + + resource.consecutive_completed_height = 0 resource.link.register_incoming_resource(resource) @@ -78,7 +88,7 @@ class Resource: RNS.log("Could not decode resource advertisement, dropping resource", RNS.LOG_DEBUG) return None - def __init__(self, data, link, advertise=True, auto_compress=True, callback=None, progress_callback=None): + def __init__(self, data, link, advertise=True, auto_compress=True, must_compress=False, callback=None, progress_callback=None): self.status = Resource.NONE self.link = link self.max_retries = Resource.MAX_RETRIES @@ -92,69 +102,100 @@ class Resource: self.__progress_callback = progress_callback self.rtt = None + self.receiver_min_consecutive_height = 0 + if data != None: + self.initiator = True + self.callback = callback + self.uncompressed_data = data + + compression_began = time.time() + if must_compress or (auto_compress and len(self.uncompressed_data) < Resource.AUTO_COMPRESS_MAX_SIZE): + RNS.log("Compressing resource data...", RNS.LOG_DEBUG) + self.compressed_data = bz2.compress(self.uncompressed_data) + RNS.log("Compression completed in "+str(round(time.time()-compression_began, 3))+" seconds", RNS.LOG_DEBUG) + else: + self.compressed_data = self.uncompressed_data + + self.uncompressed_size = len(self.uncompressed_data) + self.compressed_size = len(self.compressed_data) + + if (self.compressed_size < self.uncompressed_size and auto_compress): + saved_bytes = len(self.uncompressed_data) - len(self.compressed_data) + RNS.log("Compression saved "+str(saved_bytes)+" bytes, sending compressed", RNS.LOG_DEBUG) + + self.data = b"" + self.data += RNS.Identity.getRandomHash()[:Resource.RANDOM_HASH_SIZE] + self.data += self.compressed_data + + self.compressed = True + self.uncompressed_data = None + + else: + self.data = b"" + self.data += RNS.Identity.getRandomHash()[:Resource.RANDOM_HASH_SIZE] + self.data += self.uncompressed_data + self.uncompressed_data = self.data + + self.compressed = False + self.compressed_data = None + if auto_compress: + RNS.log("Compression did not decrease size, sending uncompressed", RNS.LOG_DEBUG) + + # Resources handle encryption directly to + # make optimal use of packet MTU on an entire + # encrypted stream. The Resource instance will + # use it's underlying link directly to encrypt. + if not self.link.encryption_disabled(): + self.data = self.link.encrypt(self.data) + self.encrypted = True + else: + self.encrypted = False + + self.size = len(self.data) + self.sent_parts = 0 + hashmap_entries = int(math.ceil(self.size/float(Resource.SDU))) + hashmap_ok = False while not hashmap_ok: - self.initiator = True - self.callback = callback - # TODO: Remove - #self.progress_callback = progress_callback - self.random_hash = RNS.Identity.getRandomHash()[:Resource.RANDOM_HASH_SIZE] - self.uncompressed_data = data - self.compressed_data = bz2.compress(self.uncompressed_data) - self.uncompressed_size = len(self.uncompressed_data) - self.compressed_size = len(self.compressed_data) + hashmap_computation_began = time.time() + RNS.log("Starting resource hashmap computation with "+str(hashmap_entries)+" entries...", RNS.LOG_DEBUG) + self.random_hash = RNS.Identity.getRandomHash()[:Resource.RANDOM_HASH_SIZE] self.hash = RNS.Identity.fullHash(data+self.random_hash) self.expected_proof = RNS.Identity.fullHash(data+self.hash) - if (self.compressed_size < self.uncompressed_size and auto_compress): - self.data = self.compressed_data - self.compressed = True - self.uncompressed_data = None - else: - self.data = self.uncompressed_data - self.compressed = False - self.compressed_data = None - - if not self.link.encryption_disabled(): - self.data = self.link.encrypt(self.data) - self.encrypted = True - else: - self.encrypted = False - - self.size = len(self.data) - - self.hashmap = b"" - self.sent_parts = 0 self.parts = [] - for i in range(0,int(math.ceil(self.size/float(Resource.SDU)))): + self.hashmap = b"" + collision_guard_list = [] + for i in range(0,hashmap_entries): data = self.data[i*Resource.SDU:(i+1)*Resource.SDU] - part = RNS.Packet(link, data, context=RNS.Packet.RESOURCE) - part.pack() - part.map_hash = self.getMapHash(data) - self.hashmap += part.map_hash - self.parts.append(part) + map_hash = self.getMapHash(data) - hashmap_ok = self.checkHashMap() - if not hashmap_ok: - RNS.log("Found hash collision in resource map, remapping...", RNS.LOG_VERBOSE) + if map_hash in collision_guard_list: + RNS.log("Found hash collision in resource map, remapping...", RNS.LOG_VERBOSE) + hashmap_ok = False + break + else: + hashmap_ok = True + collision_guard_list.append(map_hash) + if len(collision_guard_list) > ResourceAdvertisement.COLLISION_GUARD_SIZE: + collision_guard_list.pop(0) - if advertise: - self.advertise() + part = RNS.Packet(link, data, context=RNS.Packet.RESOURCE) + part.pack() + part.map_hash = map_hash + + self.hashmap += part.map_hash + self.parts.append(part) + + RNS.log("Hashmap computation concluded in "+str(round(time.time()-hashmap_computation_began, 3))+" seconds", RNS.LOG_DEBUG) + + if advertise: + self.advertise() else: pass - - def checkHashMap(self): - checked_hashes = [] - for part in self.parts: - if part.map_hash in checked_hashes: - return False - checked_hashes.append(part.map_hash) - - return True - def hashmap_update_packet(self, plaintext): if not self.status == Resource.FAILED: self.last_activity = time.time() @@ -178,6 +219,10 @@ class Resource: self.request_next() def getMapHash(self, data): + # TODO: This will break if running unencrypted, + # uncompressed transfers on streams with long blocks + # of identical bytes. Doing so would be very silly + # anyways but maybe it should be handled gracefully. return RNS.Identity.fullHash(data+self.random_hash)[:Resource.MAPHASH_LEN] def advertise(self): @@ -192,12 +237,18 @@ class Resource: self.status = Resource.QUEUED sleep(0.25) - self.advertisement_packet.send() - self.last_activity = time.time() - self.adv_sent = self.last_activity - self.rtt = None - self.status = Resource.ADVERTISED - self.link.register_outgoing_resource(self) + try: + self.advertisement_packet.send() + self.last_activity = time.time() + self.adv_sent = self.last_activity + self.rtt = None + self.status = Resource.ADVERTISED + self.link.register_outgoing_resource(self) + RNS.log("Sent resource advertisement for "+RNS.prettyhexrep(self.hash), RNS.LOG_DEBUG) + except Exception as e: + RNS.log("Could not advertise resource, the contained exception was: "+str(e), RNS.LOG_ERROR) + self.cancel() + return self.watchdog_job() @@ -224,12 +275,16 @@ class Resource: self.cancel() sleep_time = 0.001 else: - RNS.log("No part requests received, retrying resource advertisement...", RNS.LOG_DEBUG) - self.retries_left -= 1 - self.advertisement_packet.resend() - self.last_activity = time.time() - self.adv_sent = self.last_activity - sleep_time = 0.001 + try: + RNS.log("No part requests received, retrying resource advertisement...", RNS.LOG_DEBUG) + self.retries_left -= 1 + self.advertisement_packet.resend() + self.last_activity = time.time() + self.adv_sent = self.last_activity + sleep_time = 0.001 + except Exception as e: + RNS.log("Could not resend advertisement packet, cancelling resource", RNS.LOG_VERBOSE) + self.cancel() elif self.status == Resource.TRANSFERRING: @@ -240,6 +295,13 @@ class Resource: if sleep_time < 0: if self.retries_left > 0: RNS.log("Timeout waiting for parts, requesting retry", RNS.LOG_DEBUG) + if self.window > self.window_min: + self.window -= 1 + if self.window_max > self.window_min: + self.window_max -= 1 + if (self.window_max - self.window) > (self.window_flexibility-1): + self.window_max -= 1 + sleep_time = 0.001 self.retries_left -= 1 self.waiting_for_hmu = False @@ -275,11 +337,11 @@ class Resource: if sleep_time == 0: RNS.log("Warning! Link watchdog sleep time of 0!", RNS.LOG_WARNING) if sleep_time == None or sleep_time < 0: - # TODO: This should probably not be here forever - RNS.log("Timing error! Closing Reticulum now.", RNS.LOG_CRITICAL) - RNS.panic() - - sleep(sleep_time) + RNS.log("Timing error, cancelling resource transfer.", RNS.LOG_ERROR) + self.cancel() + + if sleep_time != None: + sleep(sleep_time) def assemble(self): if not self.status == Resource.FAILED: @@ -294,6 +356,9 @@ class Resource: else: data = stream + # Strip off random hash + data = data[Resource.RANDOM_HASH_SIZE:] + if self.compressed: self.data = bz2.decompress(data) else: @@ -307,6 +372,7 @@ class Resource: else: self.status = Resource.CORRUPT + except Exception as e: RNS.log("Error while assembling received resource.", RNS.LOG_ERROR) RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR) @@ -319,10 +385,15 @@ class Resource: def prove(self): if not self.status == Resource.FAILED: - proof = RNS.Identity.fullHash(self.data+self.hash) - proof_data = self.hash+proof - proof_packet = RNS.Packet(self.link, proof_data, packet_type=RNS.Packet.PROOF, context=RNS.Packet.RESOURCE_PRF) - proof_packet.send() + try: + proof = RNS.Identity.fullHash(self.data+self.hash) + proof_data = self.hash+proof + proof_packet = RNS.Packet(self.link, proof_data, packet_type=RNS.Packet.PROOF, context=RNS.Packet.RESOURCE_PRF) + proof_packet.send() + except Exception as e: + RNS.log("Could not send proof packet, cancelling resource", RNS.LOG_DEBUG) + RNS.log("The contained exception was: "+str(e), RNS.LOG_DEBUG) + self.cancel() def validateProof(self, proof_data): if not self.status == Resource.FAILED: @@ -356,13 +427,22 @@ class Resource: part_data = packet.data part_hash = self.getMapHash(part_data) - i = 0 - for map_hash in self.hashmap: + i = self.consecutive_completed_height + for map_hash in self.hashmap[self.consecutive_completed_height:self.consecutive_completed_height+self.window]: if map_hash == part_hash: if self.parts[i] == None: + # Insert data into parts list self.parts[i] = part_data self.received_count += 1 self.outstanding_parts -= 1 + + # Update consecutive completed pointer + if i == 0 or self.parts[i-1] != None and i == self.consecutive_completed_height: + self.consecutive_completed_height = i+1 + cp = i+1 + while cp < len(self.parts) and self.parts[cp] != None: + self.consecutive_completed_height = cp + cp += 1 i += 1 if self.__progress_callback != None: @@ -371,8 +451,13 @@ class Resource: if self.outstanding_parts == 0 and self.received_count == self.total_parts: self.assemble() elif self.outstanding_parts == 0: - if self.window < Resource.WINDOW_MAX: + # TODO: Figure out if ther is a mathematically + # optimal way to adjust windows + if self.window < self.window_max: self.window += 1 + if (self.window - self.window_min) > (self.window_flexibility-1): + self.window_min += 1 + self.request_next() # Called on incoming resource to send a request for more data @@ -383,8 +468,8 @@ class Resource: hashmap_exhausted = Resource.HASHMAP_IS_NOT_EXHAUSTED requested_hashes = b"" - i = 0; pn = 0 - for part in self.parts: + i = 0; pn = self.consecutive_completed_height + for part in self.parts[self.consecutive_completed_height:self.consecutive_completed_height+self.window]: if part == None: part_hash = self.hashmap[pn] @@ -409,10 +494,15 @@ class Resource: request_data = hmu_part + self.hash + requested_hashes request_packet = RNS.Packet(self.link, request_data, context = RNS.Packet.RESOURCE_REQ) - request_packet.send() - self.last_activity = time.time() - self.req_sent = self.last_activity - self.req_resp = None + try: + request_packet.send() + self.last_activity = time.time() + self.req_sent = self.last_activity + self.req_resp = None + except Exception as e: + RNS.log("Could not send resource request packet, cancelling resource", RNS.LOG_DEBUG) + RNS.log("The contained exception was: "+str(e), RNS.LOG_DEBUG) + self.cancel() # Called on outgoing resource to make it send more data def request(self, request_data): @@ -435,8 +525,9 @@ class Resource: for i in range(0,len(requested_hashes)//Resource.MAPHASH_LEN): requested_hash = requested_hashes[i*Resource.MAPHASH_LEN:(i+1)*Resource.MAPHASH_LEN] - pi = 0 - for part in self.parts: + search_start = self.receiver_min_consecutive_height + search_end = self.receiver_min_consecutive_height+ResourceAdvertisement.COLLISION_GUARD_SIZE + for part in self.parts[search_start:search_end]: if part.map_hash == requested_hash: try: if not part.sent: @@ -448,20 +539,21 @@ class Resource: self.last_part_sent = self.last_activity break except Exception as e: - RNS.log("Resource could not send parts, cancelling transfer!", RNS.LOG_ERROR) - RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR) + RNS.log("Resource could not send parts, cancelling transfer!", RNS.LOG_DEBUG) + RNS.log("The contained exception was: "+str(e), RNS.LOG_DEBUG) self.cancel() - pi += 1 if wants_more_hashmap: last_map_hash = request_data[1:Resource.MAPHASH_LEN+1] - part_index = 0 - for part in self.parts: + part_index = self.receiver_min_consecutive_height + for part in self.parts[self.receiver_min_consecutive_height:]: part_index += 1 if part.map_hash == last_map_hash: break + self.receiver_min_consecutive_height = max(part_index-1-Resource.WINDOW_MAX, 0) + if part_index % ResourceAdvertisement.HASHMAP_MAX_LEN != 0: RNS.log("Resource sequencing error, cancelling transfer!", RNS.LOG_ERROR) self.cancel() @@ -479,8 +571,13 @@ class Resource: hmu = self.hash+umsgpack.packb([segment, hashmap]) hmu_packet = RNS.Packet(self.link, hmu, context = RNS.Packet.RESOURCE_HMU) - hmu_packet.send() - self.last_activity = time.time() + try: + hmu_packet.send() + self.last_activity = time.time() + except Exception as e: + RNS.log("Could not send resource HMU packet, cancelling resource", RNS.LOG_DEBUG) + RNS.log("The contained exception was: "+str(e), RNS.LOG_DEBUG) + self.cancel() if self.sent_parts == len(self.parts): self.status = Resource.AWAITING_PROOF @@ -522,7 +619,8 @@ class Resource: class ResourceAdvertisement: # TODO: Can this be allocated dynamically? Keep in mind hashmap_update inference - HASHMAP_MAX_LEN = 84 + HASHMAP_MAX_LEN = 84 + COLLISION_GUARD_SIZE = 2*Resource.WINDOW_MAX+HASHMAP_MAX_LEN def __init__(self, resource=None): if resource != None: diff --git a/RNS/Transport.py b/RNS/Transport.py index 244dea7..1c4d71e 100755 --- a/RNS/Transport.py +++ b/RNS/Transport.py @@ -306,6 +306,8 @@ class Transport: return True if packet.context == RNS.Packet.RESOURCE_PRF: return True + if packet.context == RNS.Packet.RESOURCE: + return True if not packet.packet_hash in Transport.packet_hashlist: return True else: