From b3731524ac3f460361ac941a956255a3eae84ba3 Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Thu, 19 Oct 2023 00:38:41 +0200 Subject: [PATCH] Improved path re-discovery in changing topographies --- RNS/Transport.py | 114 +++++++++++++++++++++++++++++++++++------------ 1 file changed, 85 insertions(+), 29 deletions(-) diff --git a/RNS/Transport.py b/RNS/Transport.py index 6e46eb9..afd2710 100755 --- a/RNS/Transport.py +++ b/RNS/Transport.py @@ -68,6 +68,10 @@ class Transport: PATH_REQUEST_RW = 2 # Path request random window PATH_REQUEST_MI = 5 # Minimum interval in seconds for automated path requests + STATE_UNKNOWN = 0x00 + STATE_UNRESPONSIVE = 0x01 + STATE_RESPONSIVE = 0x02 + LINK_TIMEOUT = RNS.Link.STALE_TIME * 1.25 REVERSE_TIMEOUT = 30*60 # Reverse table entries are removed after 30 minutes DESTINATION_TIMEOUT = 60*60*24*7 # Destination table entries are removed if unused for one week @@ -94,6 +98,7 @@ class Transport: tunnels = {} # A table storing tunnels to other transport instances announce_rate_table = {} # A table for keeping track of announce rates path_requests = {} # A table for storing path request timestamps + path_states = {} # A table for keeping track of path states discovery_path_requests = {} # A table for keeping track of path requests on behalf of other nodes discovery_pr_tags = [] # A table for keeping track of tagged path requests @@ -422,6 +427,12 @@ class Transport: Transport.discovery_pr_tags = Transport.discovery_pr_tags[len(Transport.discovery_pr_tags)-Transport.max_pr_tags:len(Transport.discovery_pr_tags)-1] if time.time() > Transport.tables_last_culled + Transport.tables_cull_interval: + # Remove unneeded path state entries + stale_path_states = [] + for destination_hash in Transport.path_states: + if not destination_hash in Transport.destination_table: + stale_path_states.append(destination_hash) + # Cull the reverse table according to timeout stale_reverse_entries = [] for truncated_packet_hash in Transport.reverse_table: @@ -471,14 +482,18 @@ class Transport: RNS.log("Trying to rediscover path for "+RNS.prettyhexrep(link_entry[6])+" since an attempted link was never established, and destination was previously local to an interface on this instance", RNS.LOG_DEBUG) path_request_conditions = True - # If the link destination was previously only 1 hop - # away, this likely means that it was local to one - # of our interfaces, and that it roamed somewhere else. - # In that case, try to discover a new path. + # If the link initiator was previously only 1 hop + # away, this likely means that network topology has + # changed. In that case, we try to discover a new path, + # and mark the old one as potentially unresponsive. elif not path_request_throttle and lr_taken_hops == 1: RNS.log("Trying to rediscover path for "+RNS.prettyhexrep(link_entry[6])+" since an attempted link was never established, and link initiator is local to an interface on this instance", RNS.LOG_DEBUG) path_request_conditions = True + if RNS.Reticulum.transport_enabled(): + if hasattr(link_entry[4], "mode") and link_entry[4].mode != RNS.Interfaces.Interface.Interface.MODE_BOUNDARY: + Transport.mark_path_unresponsive(link_entry[6]) + if path_request_conditions: if not link_entry[6] in path_requests: path_requests.append(link_entry[6]) @@ -549,8 +564,6 @@ class Transport: else: RNS.log("Removed "+str(ti)+" tunnel paths", RNS.LOG_EXTREME) - - i = 0 for truncated_packet_hash in stale_reverse_entries: Transport.reverse_table.pop(truncated_packet_hash) @@ -562,8 +575,6 @@ class Transport: else: RNS.log("Released "+str(i)+" reverse table entries", RNS.LOG_EXTREME) - - i = 0 for link_id in stale_links: Transport.link_table.pop(link_id) @@ -608,6 +619,17 @@ class Transport: else: RNS.log("Removed "+str(i)+" tunnels", RNS.LOG_EXTREME) + i = 0 + for destination_hash in stale_path_states: + Transport.path_states.pop(destination_hash) + i += 1 + + if i > 0: + if i == 1: + RNS.log("Removed "+str(i)+" path state entry", RNS.LOG_EXTREME) + else: + RNS.log("Removed "+str(i)+" path state entries", RNS.LOG_EXTREME) + Transport.tables_last_culled = time.time() if time.time() > Transport.interface_last_jobs + Transport.interface_jobs_interval: @@ -1319,8 +1341,10 @@ class Transport: if path_announce_emitted >= announce_emitted: break + # If the path has expired, consider this + # announce for adding to the path table. if (now >= path_expires): - # We also check that the announce is + # We check that the announce is # different from ones we've already heard, # to avoid loops in the network if not random_blob in random_blobs: @@ -1331,12 +1355,26 @@ class Transport: else: should_add = False else: + # If the path is not expired, but the emission + # is more recent, and we haven't already heard + # this announce before, update the path table. if (announce_emitted > path_announce_emitted): if not random_blob in random_blobs: RNS.log("Replacing destination table entry for "+str(RNS.prettyhexrep(packet.destination_hash))+" with new announce, since it was more recently emitted", RNS.LOG_DEBUG) should_add = True else: should_add = False + + # If we have already heard this announce before, + # but the path has been marked as unresponsive + # by a failed communications attempt or similar, + # allow updating the path table to this one. + elif announce_emitted == path_announce_emitted: + if Transport.path_is_unresponsive(packet.destination_hash): + RNS.log("Replacing destination table entry for "+str(RNS.prettyhexrep(packet.destination_hash))+" with new announce, since previously tried path was unresponsive", RNS.LOG_DEBUG) + should_add = True + else: + should_add = False else: # If this destination is unknown in our table @@ -1601,32 +1639,35 @@ class Transport: # needs to be transported if (RNS.Reticulum.transport_enabled() or for_local_client_link or from_local_client) and packet.destination_hash in Transport.link_table: link_entry = Transport.link_table[packet.destination_hash] - if packet.receiving_interface == link_entry[2]: - try: - if len(packet.data) == RNS.Identity.SIGLENGTH//8+RNS.Link.ECPUBSIZE//2: - peer_pub_bytes = packet.data[RNS.Identity.SIGLENGTH//8:RNS.Identity.SIGLENGTH//8+RNS.Link.ECPUBSIZE//2] - peer_identity = RNS.Identity.recall(link_entry[6]) - peer_sig_pub_bytes = peer_identity.get_public_key()[RNS.Link.ECPUBSIZE//2:RNS.Link.ECPUBSIZE] + if packet.hops == link_entry[3]: + if packet.receiving_interface == link_entry[2]: + try: + if len(packet.data) == RNS.Identity.SIGLENGTH//8+RNS.Link.ECPUBSIZE//2: + peer_pub_bytes = packet.data[RNS.Identity.SIGLENGTH//8:RNS.Identity.SIGLENGTH//8+RNS.Link.ECPUBSIZE//2] + peer_identity = RNS.Identity.recall(link_entry[6]) + peer_sig_pub_bytes = peer_identity.get_public_key()[RNS.Link.ECPUBSIZE//2:RNS.Link.ECPUBSIZE] - signed_data = packet.destination_hash+peer_pub_bytes+peer_sig_pub_bytes - signature = packet.data[:RNS.Identity.SIGLENGTH//8] + signed_data = packet.destination_hash+peer_pub_bytes+peer_sig_pub_bytes + signature = packet.data[:RNS.Identity.SIGLENGTH//8] - if peer_identity.validate(signature, signed_data): - RNS.log("Link request proof validated for transport via "+str(link_entry[4]), RNS.LOG_EXTREME) - new_raw = packet.raw[0:1] - new_raw += struct.pack("!B", packet.hops) - new_raw += packet.raw[2:] - Transport.link_table[packet.destination_hash][7] = True - Transport.transmit(link_entry[4], new_raw) + if peer_identity.validate(signature, signed_data): + RNS.log("Link request proof validated for transport via "+str(link_entry[4]), RNS.LOG_EXTREME) + new_raw = packet.raw[0:1] + new_raw += struct.pack("!B", packet.hops) + new_raw += packet.raw[2:] + Transport.link_table[packet.destination_hash][7] = True + Transport.transmit(link_entry[4], new_raw) - else: - RNS.log("Invalid link request proof in transport for link "+RNS.prettyhexrep(packet.destination_hash)+", dropping proof.", RNS.LOG_DEBUG) + else: + RNS.log("Invalid link request proof in transport for link "+RNS.prettyhexrep(packet.destination_hash)+", dropping proof.", RNS.LOG_DEBUG) - except Exception as e: - RNS.log("Error while transporting link request proof. The contained exception was: "+str(e), RNS.LOG_ERROR) + except Exception as e: + RNS.log("Error while transporting link request proof. The contained exception was: "+str(e), RNS.LOG_ERROR) + else: + RNS.log("Link request proof received on wrong interface, not transporting it.", RNS.LOG_DEBUG) else: - RNS.log("Link request proof received on wrong interface, not transporting it.", RNS.LOG_DEBUG) + RNS.log("Received link request proof with hop mismatch, not transporting it", RNS.LOG_DEBUG) else: # Check if we can deliver it to a local # pending link @@ -1984,6 +2025,21 @@ class Transport: else: return False + @staticmethod + def mark_path_unresponsive(destination_hash): + if destination_hash in Transport.destination_table: + Transport.path_states[destination_hash] = Transport.STATE_UNRESPONSIVE + return True + else: + return False + + def path_is_unresponsive(destination_hash): + if destination_hash in Transport.path_states: + if Transport.path_states[destination_hash] == Transport.STATE_UNRESPONSIVE: + return True + + return False + @staticmethod def request_path(destination_hash, on_interface=None, tag=None, recursive=False): """