Implemented transport table cleanup

This commit is contained in:
Mark Qvist 2020-03-07 11:20:09 +01:00
parent d169c3d9c1
commit 8e558814eb

View File

@ -38,6 +38,10 @@ class Transport:
PATH_REQUEST_GRACE = 0.25 # Grace time before a path announcement is made, allows directly reachable peers to respond first PATH_REQUEST_GRACE = 0.25 # Grace time before a path announcement is made, allows directly reachable peers to respond first
PATH_REQUEST_RW = 2 # Path request random window PATH_REQUEST_RW = 2 # Path request random window
LINK_TIMEOUT = RNS.Link.KEEPALIVE * 2
REVERSE_TIMEOUT = 30*60 # Reverse table entries are removed after max 30 minutes
DESTINATION_TIMEOUT = 60*60*24*7 # Destination table entries are removed if unused for one week
interfaces = [] # All active interfaces interfaces = [] # All active interfaces
destinations = [] # All active destinations destinations = [] # All active destinations
pending_links = [] # Links that are being established pending_links = [] # Links that are being established
@ -58,6 +62,8 @@ class Transport:
announces_last_checked = 0.0 announces_last_checked = 0.0
announces_check_interval = 1.0 announces_check_interval = 1.0
hashlist_maxsize = 1000000 hashlist_maxsize = 1000000
tables_last_culled = 0.0
tables_cull_interval = 5.0
identity = None identity = None
@ -151,11 +157,26 @@ class Transport:
while (len(Transport.packet_hashlist) > Transport.hashlist_maxsize): while (len(Transport.packet_hashlist) > Transport.hashlist_maxsize):
Transport.packet_hashlist.pop(0) Transport.packet_hashlist.pop(0)
# Cull the reverse table according to max size and/or age of entries if time.time() > Transport.tables_last_culled + Transport.tables_cull_interval:
# TODO: Implement this # Cull the reverse table according to timeout
for truncated_packet_hash in Transport.reverse_table:
reverse_entry = Transport.reverse_table[truncated_packet_hash]
if time.time() > reverse_entry[2] + Transport.REVERSE_TIMEOUT:
Transport.reverse_table.pop(truncated_packet_hash)
# Cull the link table according to timeout
for link_id in Transport.link_table:
link_entry = Transport.link_table[link_id]
if time.time() > link_entry[0] + Transport.LINK_TIMEOUT:
Transport.link_table.pop(link_id)
# Cull the destination table in some way # Cull the destination table in some way
# TODO: Implement this for destination_hash in Transport.destination_table:
destination_entry = Transport.destination_table[destination_hash]
if time.time() > destination_entry[0] + Transport.DESTINATION_TIMEOUT:
Transport.destination_table.pop(destination_hash)
Transport.tables_last_culled = time.time()
except Exception as e: except Exception as e:
RNS.log("An exception occurred while running Transport jobs.", RNS.LOG_ERROR) RNS.log("An exception occurred while running Transport jobs.", RNS.LOG_ERROR)
@ -191,6 +212,7 @@ class Transport:
new_raw += packet.raw[2:] new_raw += packet.raw[2:]
RNS.log("Packet was inserted into transport via "+RNS.prettyhexrep(Transport.destination_table[packet.destination_hash][1])+" on: "+str(outbound_interface), RNS.LOG_DEBUG) RNS.log("Packet was inserted into transport via "+RNS.prettyhexrep(Transport.destination_table[packet.destination_hash][1])+" on: "+str(outbound_interface), RNS.LOG_DEBUG)
outbound_interface.processOutgoing(new_raw) outbound_interface.processOutgoing(new_raw)
Transport.destination_table[packet.destination_hash][0] = time.time()
sent = True sent = True
else: else:
# Destination is directly reachable, and we know on # Destination is directly reachable, and we know on
@ -254,7 +276,6 @@ class Transport:
@staticmethod @staticmethod
def inbound(raw, interface=None): def inbound(raw, interface=None):
# TODO: Rewrite the redundant cache calls in this method
while (Transport.jobs_running): while (Transport.jobs_running):
sleep(0.1) sleep(0.1)
@ -269,6 +290,7 @@ class Transport:
if Transport.packet_filter(packet): if Transport.packet_filter(packet):
Transport.packet_hashlist.append(packet.packet_hash) Transport.packet_hashlist.append(packet.packet_hash)
Transport.cache(packet)
# General transport handling. Takes care of directing # General transport handling. Takes care of directing
# packets according to transport tables and recording # packets according to transport tables and recording
@ -295,6 +317,7 @@ class Transport:
outbound_interface = Transport.destination_table[packet.destination_hash][5] outbound_interface = Transport.destination_table[packet.destination_hash][5]
outbound_interface.processOutgoing(new_raw) outbound_interface.processOutgoing(new_raw)
Transport.destination_table[packet.destination_hash][0] = time.time()
if packet.packet_type == RNS.Packet.LINKREQUEST: if packet.packet_type == RNS.Packet.LINKREQUEST:
# Entry format is # Entry format is
@ -357,6 +380,7 @@ class Transport:
new_raw += struct.pack("!B", packet.hops) new_raw += struct.pack("!B", packet.hops)
new_raw += packet.raw[2:] new_raw += packet.raw[2:]
outbound_interface.processOutgoing(new_raw) outbound_interface.processOutgoing(new_raw)
Transport.link_table[packet.destination_hash][0] = time.time()
else: else:
pass pass
@ -459,7 +483,6 @@ class Transport:
if destination.hash == packet.destination_hash and destination.type == packet.destination_type: if destination.hash == packet.destination_hash and destination.type == packet.destination_type:
packet.destination = destination packet.destination = destination
destination.receive(packet) destination.receive(packet)
Transport.cache(packet)
elif packet.packet_type == RNS.Packet.DATA: elif packet.packet_type == RNS.Packet.DATA:
if packet.destination_type == RNS.Destination.LINK: if packet.destination_type == RNS.Destination.LINK:
@ -467,13 +490,11 @@ class Transport:
if link.link_id == packet.destination_hash: if link.link_id == packet.destination_hash:
packet.link = link packet.link = link
link.receive(packet) link.receive(packet)
Transport.cache(packet)
else: else:
for destination in Transport.destinations: for destination in Transport.destinations:
if destination.hash == packet.destination_hash and destination.type == packet.destination_type: if destination.hash == packet.destination_hash and destination.type == packet.destination_type:
packet.destination = destination packet.destination = destination
destination.receive(packet) destination.receive(packet)
Transport.cache(packet)
if destination.proof_strategy == RNS.Destination.PROVE_ALL: if destination.proof_strategy == RNS.Destination.PROVE_ALL:
packet.prove() packet.prove()
@ -526,7 +547,7 @@ class Transport:
# Check if this proof neds to be transported # Check if this proof neds to be transported
if packet.destination_hash in Transport.reverse_table: if packet.destination_hash in Transport.reverse_table:
reverse_entry = Transport.reverse_table[packet.destination_hash] reverse_entry = Transport.reverse_table.pop(packet.destination_hash)
if packet.receiving_interface == reverse_entry[1]: if packet.receiving_interface == reverse_entry[1]:
RNS.log("Proof received on correct interface, transporting it via "+str(reverse_entry[0]), RNS.LOG_DEBUG) RNS.log("Proof received on correct interface, transporting it via "+str(reverse_entry[0]), RNS.LOG_DEBUG)
new_raw = packet.raw[0:1] new_raw = packet.raw[0:1]
@ -581,8 +602,8 @@ class Transport:
def shouldCache(packet): def shouldCache(packet):
# TODO: Implement sensible rules for which # TODO: Implement sensible rules for which
# packets to cache # packets to cache
if packet.context == RNS.Packet.RESOURCE_PRF: #if packet.context == RNS.Packet.RESOURCE_PRF:
return True # return True
return False return False
@ -599,6 +620,8 @@ class Transport:
RNS.log("Error writing packet to cache", RNS.LOG_ERROR) RNS.log("Error writing packet to cache", RNS.LOG_ERROR)
RNS.log("The contained exception was: "+str(e)) RNS.log("The contained exception was: "+str(e))
# TODO: Implement cache requests. Needs methodology
# rethinking. This is skeleton code.
@staticmethod @staticmethod
def cache_request_packet(packet): def cache_request_packet(packet):
if len(packet.data) == RNS.Identity.HASHLENGTH/8: if len(packet.data) == RNS.Identity.HASHLENGTH/8:
@ -612,6 +635,8 @@ class Transport:
# TODO: Implement outbound for this # TODO: Implement outbound for this
# TODO: Implement cache requests. Needs methodology
# rethinking. This is skeleton code.
@staticmethod @staticmethod
def cache_request(packet_hash): def cache_request(packet_hash):
RNS.log("Cache request for "+RNS.prettyhexrep(packet_hash), RNS.LOG_EXTREME) RNS.log("Cache request for "+RNS.prettyhexrep(packet_hash), RNS.LOG_EXTREME)
@ -668,6 +693,8 @@ class Transport:
else: else:
RNS.log("No known path to requested destination, ignoring request", RNS.LOG_DEBUG) RNS.log("No known path to requested destination, ignoring request", RNS.LOG_DEBUG)
# TODO: Currently only used for cache requests.
# Needs rethink.
@staticmethod @staticmethod
def transport_destination(): def transport_destination():
# TODO: implement this # TODO: implement this