From 8e558814eb437b88e81fce050aac15c7db57a686 Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Sat, 7 Mar 2020 11:20:09 +0100 Subject: [PATCH] Implemented transport table cleanup --- RNS/Transport.py | 73 +++++++++++++++++++++++++++++++++--------------- 1 file changed, 50 insertions(+), 23 deletions(-) diff --git a/RNS/Transport.py b/RNS/Transport.py index 0a7d4d6..8d20570 100755 --- a/RNS/Transport.py +++ b/RNS/Transport.py @@ -35,20 +35,24 @@ class Transport: # various situations LOCAL_REBROADCASTS_MAX = 2 # How many local rebroadcasts of an announce is allowed - PATH_REQUEST_GRACE = 0.25 # Grace time before a path announcement is made, allows directly reachable peers to respond first - PATH_REQUEST_RW = 2 # Path request random window + PATH_REQUEST_GRACE = 0.25 # Grace time before a path announcement is made, allows directly reachable peers to respond first + PATH_REQUEST_RW = 2 # Path request random window - interfaces = [] # All active interfaces - destinations = [] # All active destinations - pending_links = [] # Links that are being established - active_links = [] # Links that are active - packet_hashlist = [] # A list of packet hashes for duplicate detection - receipts = [] # Receipts of all outgoing packets for proof processing + LINK_TIMEOUT = RNS.Link.KEEPALIVE * 2 + REVERSE_TIMEOUT = 30*60 # Reverse table entries are removed after max 30 minutes + DESTINATION_TIMEOUT = 60*60*24*7 # Destination table entries are removed if unused for one week - announce_table = {} # A table for storing announces currently waiting to be retransmitted - destination_table = {} # A lookup table containing the next hop to a given destination - reverse_table = {} # A lookup table for storing packet hashes used to return proofs and replies - link_table = {} # A lookup table containing hops for links + interfaces = [] # All active interfaces + destinations = [] # All active destinations + pending_links = [] # Links that are being established + active_links = [] # Links that are active + packet_hashlist = [] # A list of packet hashes for duplicate detection + receipts = [] # Receipts of all outgoing packets for proof processing + + announce_table = {} # A table for storing announces currently waiting to be retransmitted + destination_table = {} # A lookup table containing the next hop to a given destination + reverse_table = {} # A lookup table for storing packet hashes used to return proofs and replies + link_table = {} # A lookup table containing hops for links jobs_locked = False jobs_running = False @@ -58,6 +62,8 @@ class Transport: announces_last_checked = 0.0 announces_check_interval = 1.0 hashlist_maxsize = 1000000 + tables_last_culled = 0.0 + tables_cull_interval = 5.0 identity = None @@ -151,11 +157,26 @@ class Transport: while (len(Transport.packet_hashlist) > Transport.hashlist_maxsize): Transport.packet_hashlist.pop(0) - # Cull the reverse table according to max size and/or age of entries - # TODO: Implement this + if time.time() > Transport.tables_last_culled + Transport.tables_cull_interval: + # Cull the reverse table according to timeout + for truncated_packet_hash in Transport.reverse_table: + reverse_entry = Transport.reverse_table[truncated_packet_hash] + if time.time() > reverse_entry[2] + Transport.REVERSE_TIMEOUT: + Transport.reverse_table.pop(truncated_packet_hash) - # Cull the destination table in some way - # TODO: Implement this + # Cull the link table according to timeout + for link_id in Transport.link_table: + link_entry = Transport.link_table[link_id] + if time.time() > link_entry[0] + Transport.LINK_TIMEOUT: + Transport.link_table.pop(link_id) + + # Cull the destination table in some way + for destination_hash in Transport.destination_table: + destination_entry = Transport.destination_table[destination_hash] + if time.time() > destination_entry[0] + Transport.DESTINATION_TIMEOUT: + Transport.destination_table.pop(destination_hash) + + Transport.tables_last_culled = time.time() except Exception as e: RNS.log("An exception occurred while running Transport jobs.", RNS.LOG_ERROR) @@ -191,6 +212,7 @@ class Transport: new_raw += packet.raw[2:] RNS.log("Packet was inserted into transport via "+RNS.prettyhexrep(Transport.destination_table[packet.destination_hash][1])+" on: "+str(outbound_interface), RNS.LOG_DEBUG) outbound_interface.processOutgoing(new_raw) + Transport.destination_table[packet.destination_hash][0] = time.time() sent = True else: # Destination is directly reachable, and we know on @@ -254,7 +276,6 @@ class Transport: @staticmethod def inbound(raw, interface=None): - # TODO: Rewrite the redundant cache calls in this method while (Transport.jobs_running): sleep(0.1) @@ -269,6 +290,7 @@ class Transport: if Transport.packet_filter(packet): Transport.packet_hashlist.append(packet.packet_hash) + Transport.cache(packet) # General transport handling. Takes care of directing # packets according to transport tables and recording @@ -295,6 +317,7 @@ class Transport: outbound_interface = Transport.destination_table[packet.destination_hash][5] outbound_interface.processOutgoing(new_raw) + Transport.destination_table[packet.destination_hash][0] = time.time() if packet.packet_type == RNS.Packet.LINKREQUEST: # Entry format is @@ -357,6 +380,7 @@ class Transport: new_raw += struct.pack("!B", packet.hops) new_raw += packet.raw[2:] outbound_interface.processOutgoing(new_raw) + Transport.link_table[packet.destination_hash][0] = time.time() else: pass @@ -459,7 +483,6 @@ class Transport: if destination.hash == packet.destination_hash and destination.type == packet.destination_type: packet.destination = destination destination.receive(packet) - Transport.cache(packet) elif packet.packet_type == RNS.Packet.DATA: if packet.destination_type == RNS.Destination.LINK: @@ -467,13 +490,11 @@ class Transport: if link.link_id == packet.destination_hash: packet.link = link link.receive(packet) - Transport.cache(packet) else: for destination in Transport.destinations: if destination.hash == packet.destination_hash and destination.type == packet.destination_type: packet.destination = destination destination.receive(packet) - Transport.cache(packet) if destination.proof_strategy == RNS.Destination.PROVE_ALL: packet.prove() @@ -526,7 +547,7 @@ class Transport: # Check if this proof neds to be transported if packet.destination_hash in Transport.reverse_table: - reverse_entry = Transport.reverse_table[packet.destination_hash] + reverse_entry = Transport.reverse_table.pop(packet.destination_hash) if packet.receiving_interface == reverse_entry[1]: RNS.log("Proof received on correct interface, transporting it via "+str(reverse_entry[0]), RNS.LOG_DEBUG) new_raw = packet.raw[0:1] @@ -581,8 +602,8 @@ class Transport: def shouldCache(packet): # TODO: Implement sensible rules for which # packets to cache - if packet.context == RNS.Packet.RESOURCE_PRF: - return True + #if packet.context == RNS.Packet.RESOURCE_PRF: + # return True return False @@ -599,6 +620,8 @@ class Transport: RNS.log("Error writing packet to cache", RNS.LOG_ERROR) RNS.log("The contained exception was: "+str(e)) + # TODO: Implement cache requests. Needs methodology + # rethinking. This is skeleton code. @staticmethod def cache_request_packet(packet): if len(packet.data) == RNS.Identity.HASHLENGTH/8: @@ -612,6 +635,8 @@ class Transport: # TODO: Implement outbound for this + # TODO: Implement cache requests. Needs methodology + # rethinking. This is skeleton code. @staticmethod def cache_request(packet_hash): RNS.log("Cache request for "+RNS.prettyhexrep(packet_hash), RNS.LOG_EXTREME) @@ -668,6 +693,8 @@ class Transport: else: RNS.log("No known path to requested destination, ignoring request", RNS.LOG_DEBUG) + # TODO: Currently only used for cache requests. + # Needs rethink. @staticmethod def transport_destination(): # TODO: implement this