From be1ff8ec214fbe4ae4d21c95d76e9a5053bdb794 Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Wed, 13 May 2020 13:08:48 +0200 Subject: [PATCH] Resource transfer and receipt management optimisation --- Examples/Filetransfer.py | 8 +++--- RNS/Interfaces/TCPInterface.py | 35 +++++++++++++------------ RNS/Packet.py | 33 +++++++++++++++-------- RNS/Resource.py | 6 +++-- RNS/Transport.py | 48 +++++++++++++++++++++++++++++----- 5 files changed, 90 insertions(+), 40 deletions(-) diff --git a/Examples/Filetransfer.py b/Examples/Filetransfer.py index 3f3fb66..cd07d9a 100644 --- a/Examples/Filetransfer.py +++ b/Examples/Filetransfer.py @@ -55,7 +55,8 @@ def server(configpath, path): def announceLoop(destination): # Let the user know that everything is ready - RNS.log("File server "+RNS.prettyhexrep(destination.hash)+" running, hit enter to manually send an announce (Ctrl-C to quit)") + RNS.log("File server "+RNS.prettyhexrep(destination.hash)+" running") + RNS.log("Hit enter to manually send an announce (Ctrl-C to quit)") # We enter a loop that runs until the users exits. # If the user hits enter, we will announce our server @@ -238,8 +239,9 @@ def download(filename): # We just create a packet containing the # requested filename, and send it down the - # link. - request_packet = RNS.Packet(server_link, filename.encode("utf-8")) + # link. We also specify we don't need a + # packet receipt. + request_packet = RNS.Packet(server_link, filename.encode("utf-8"), create_receipt=False) request_packet.send() print("") diff --git a/RNS/Interfaces/TCPInterface.py b/RNS/Interfaces/TCPInterface.py index dd76d84..b95f16d 100644 --- a/RNS/Interfaces/TCPInterface.py +++ b/RNS/Interfaces/TCPInterface.py @@ -24,32 +24,30 @@ class ThreadingTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): class TCPClientInterface(Interface): def __init__(self, owner, name, target_ip=None, target_port=None, connected_socket=None): - self.IN = True - self.OUT = False - self.socket = None + self.IN = True + self.OUT = False + self.socket = None self.parent_interface = None - self.name = name - - # TODO: Optimise so this is not needed - self.transmit_delay = 0.001 + self.name = name if connected_socket != None: - self.receives = True - self.target_ip = None + self.receives = True + self.target_ip = None self.target_port = None - self.socket = connected_socket + self.socket = connected_socket elif target_ip != None and target_port != None: - self.receives = True - self.target_ip = target_ip + self.receives = True + self.target_ip = target_ip self.target_port = target_port RNS.log("Client init: "+str(self)) self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.socket.connect((self.target_ip, self.target_port)) - self.owner = owner - self.online = True + self.owner = owner + self.online = True + self.writing = False if connected_socket == None: thread = threading.Thread(target=self.read_loop) @@ -61,10 +59,14 @@ class TCPClientInterface(Interface): def processOutgoing(self, data): if self.online: + while self.writing: + time.sleep(0.01) + try: - time.sleep(self.transmit_delay) + self.writing = True data = bytes([HDLC.FLAG])+HDLC.escape(data)+bytes([HDLC.FLAG]) self.socket.sendall(data) + self.writing = False except Exception as e: RNS.log("Exception occurred while transmitting via "+str(self)+", tearing down interface", RNS.LOG_ERROR) RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR) @@ -78,8 +80,7 @@ class TCPClientInterface(Interface): data_buffer = b"" while True: - data_in = self.socket.recv(1024) - + data_in = self.socket.recv(4096) if len(data_in) > 0: pointer = 0 while pointer < len(data_in): diff --git a/RNS/Packet.py b/RNS/Packet.py index 3e1791f..de1e582 100755 --- a/RNS/Packet.py +++ b/RNS/Packet.py @@ -1,3 +1,4 @@ +import threading import struct import math import time @@ -55,7 +56,7 @@ class Packet: # Default packet timeout TIMEOUT = 60 - def __init__(self, destination, data, packet_type = DATA, context = NONE, transport_type = RNS.Transport.BROADCAST, header_type = HEADER_1, transport_id = None, attached_interface = None): + def __init__(self, destination, data, packet_type = DATA, context = NONE, transport_type = RNS.Transport.BROADCAST, header_type = HEADER_1, transport_id = None, attached_interface = None, create_receipt = True): if destination != None: if transport_type == None: transport_type = RNS.Transport.BROADCAST @@ -65,17 +66,18 @@ class Packet: self.transport_type = transport_type self.context = context - self.hops = 0; + self.hops = 0; self.destination = destination self.transport_id = transport_id - self.data = data - self.flags = self.getPackedFlags() + self.data = data + self.flags = self.getPackedFlags() - self.raw = None - self.packed = False - self.sent = False - self.receipt = None - self.fromPacked = False + self.raw = None + self.packed = False + self.sent = False + self.create_receipt = create_receipt + self.receipt = None + self.fromPacked = False else: self.raw = data self.packed = True @@ -257,6 +259,7 @@ class PacketReceipt: FAILED = 0x00 SENT = 0x01 DELIVERED = 0x02 + CULLED = 0xFF EXPL_LENGTH = RNS.Identity.HASHLENGTH//8+RNS.Identity.SIGLENGTH//8 @@ -366,10 +369,18 @@ class PacketReceipt: def check_timeout(self): if self.is_timed_out(): - self.status = PacketReceipt.FAILED + if self.timeout == -1: + self.status = PacketReceipt.CULLED + else: + self.status = PacketReceipt.FAILED + self.concluded_at = time.time() + if self.callbacks.timeout: - self.callbacks.timeout(self) + thread = threading.Thread(target=self.callbacks.timeout, args=(self,)) + thread.setDaemon(True) + thread.start() + #self.callbacks.timeout(self) # Set the timeout in seconds diff --git a/RNS/Resource.py b/RNS/Resource.py index b2bade8..3b33f3c 100644 --- a/RNS/Resource.py +++ b/RNS/Resource.py @@ -548,8 +548,10 @@ class Resource: if wants_more_hashmap: last_map_hash = request_data[1:Resource.MAPHASH_LEN+1] - part_index = self.receiver_min_consecutive_height - for part in self.parts[self.receiver_min_consecutive_height:]: + part_index = self.receiver_min_consecutive_height + search_start = part_index + search_end = self.receiver_min_consecutive_height+ResourceAdvertisement.COLLISION_GUARD_SIZE + for part in self.parts[search_start:search_end]: part_index += 1 if part.map_hash == last_map_hash: break diff --git a/RNS/Transport.py b/RNS/Transport.py index 1c4d71e..a2b18de 100755 --- a/RNS/Transport.py +++ b/RNS/Transport.py @@ -41,6 +41,7 @@ class Transport: LINK_TIMEOUT = RNS.Link.KEEPALIVE * 2 REVERSE_TIMEOUT = 30*60 # Reverse table entries are removed after max 30 minutes DESTINATION_TIMEOUT = 60*60*24*7 # Destination table entries are removed if unused for one week + MAX_RECEIPTS = 1024 # Maximum number of receipts to keep track of interfaces = [] # All active interfaces destinations = [] # All active destinations @@ -49,6 +50,7 @@ class Transport: packet_hashlist = [] # A list of packet hashes for duplicate detection receipts = [] # Receipts of all outgoing packets for proof processing + # TODO: "destination_table" should really be renamed to "path_table" announce_table = {} # A table for storing announces currently waiting to be retransmitted destination_table = {} # A lookup table containing the next hop to a given destination reverse_table = {} # A lookup table for storing packet hashes used to return proofs and replies @@ -154,10 +156,13 @@ class Transport: if not Transport.jobs_locked: # Process receipts list for timed-out packets if time.time() > Transport.receipts_last_checked+Transport.receipts_check_interval: + while len(Transport.receipts) > Transport.MAX_RECEIPTS: + culled_receipt = Transport.receipts.pop(0) + culled_receipt.timeout = -1 + receipt.check_timeout() + for receipt in Transport.receipts: - thread = threading.Thread(target=receipt.check_timeout) - thread.setDaemon(True) - thread.start() + receipt.check_timeout() if receipt.status != RNS.PacketReceipt.SENT: Transport.receipts.remove(receipt) @@ -211,11 +216,30 @@ class Transport: if time.time() > link_entry[0] + Transport.LINK_TIMEOUT: Transport.link_table.pop(link_id) - # Cull the destination table + # Cull the path table + stale_paths = [] for destination_hash in Transport.destination_table: destination_entry = Transport.destination_table[destination_hash] + attached_interface = destination_entry[5] + if time.time() > destination_entry[0] + Transport.DESTINATION_TIMEOUT: - Transport.destination_table.pop(destination_hash) + stale_paths.append(destination_hash) + RNS.log("Path to "+RNS.prettyhexrep(destination_hash)+" timed out and was removed", RNS.LOG_DEBUG) + + if not attached_interface in Transport.interfaces: + stale_paths.append(destination_hash) + RNS.log("Path to "+RNS.prettyhexrep(destination_hash)+" was removed since the attached interface no longer exists", RNS.LOG_DEBUG) + + i = 0 + for destination_hash in stale_paths: + Transport.destination_table.pop(destination_hash) + i += 1 + + if i > 0: + if i == 1: + RNS.log("Removed "+str(i)+" path", RNS.LOG_DEBUG) + else: + RNS.log("Removed "+str(i)+" paths", RNS.LOG_DEBUG) Transport.tables_last_culled = time.time() @@ -266,7 +290,7 @@ class Transport: else: # Broadcast packet on all outgoing interfaces, or relevant - # interface, if packet is for a link or has an attachede interface + # interface, if packet is for a link or has an attached interface for interface in Transport.interfaces: if interface.OUT: should_transmit = True @@ -288,7 +312,17 @@ class Transport: packet.sent = True packet.sent_at = time.time() - if (packet.packet_type == RNS.Packet.DATA and packet.destination.type != RNS.Destination.PLAIN): + # Don't generate receipt if it has been explicitly disabled + if (packet.create_receipt == True and + # Only generate receipts for DATA packets + packet.packet_type == RNS.Packet.DATA and + # Don't generate receipts for PLAIN destinations + packet.destination.type != RNS.Destination.PLAIN and + # Don't generate receipts for link-related packets + not (packet.context >= RNS.Packet.KEEPALIVE and packet.context <= RNS.Packet.LRPROOF) and + # Don't generate receipts for resource packets + not (packet.context >= RNS.Packet.RESOURCE and packet.context <= RNS.Packet.RESOURCE_RCL)): + packet.receipt = RNS.PacketReceipt(packet) Transport.receipts.append(packet.receipt)