From 8a69f7e88c9334352cd5471376338beea82adcf4 Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Thu, 14 May 2020 16:31:23 +0200 Subject: [PATCH] Implemented systemwide shared instance --- RNS/Transport.py | 271 +++++++++++++++++++++++++++++++++-------------- 1 file changed, 194 insertions(+), 77 deletions(-) diff --git a/RNS/Transport.py b/RNS/Transport.py index bcda7c2..81c285d 100755 --- a/RNS/Transport.py +++ b/RNS/Transport.py @@ -38,30 +38,35 @@ class Transport: PATH_REQUEST_GRACE = 0.35 # Grace time before a path announcement is made, allows directly reachable peers to respond first PATH_REQUEST_RW = 2 # Path request random window - LINK_TIMEOUT = RNS.Link.KEEPALIVE * 2 - REVERSE_TIMEOUT = 30*60 # Reverse table entries are removed after max 30 minutes - DESTINATION_TIMEOUT = 60*60*24*7 # Destination table entries are removed if unused for one week - MAX_RECEIPTS = 1024 # Maximum number of receipts to keep track of + LINK_TIMEOUT = RNS.Link.KEEPALIVE * 2 + REVERSE_TIMEOUT = 30*60 # Reverse table entries are removed after max 30 minutes + DESTINATION_TIMEOUT = 60*60*24*7 # Destination table entries are removed if unused for one week + MAX_RECEIPTS = 1024 # Maximum number of receipts to keep track of - interfaces = [] # All active interfaces - destinations = [] # All active destinations - pending_links = [] # Links that are being established - active_links = [] # Links that are active - packet_hashlist = [] # A list of packet hashes for duplicate detection - receipts = [] # Receipts of all outgoing packets for proof processing + interfaces = [] # All active interfaces + destinations = [] # All active destinations + pending_links = [] # Links that are being established + active_links = [] # Links that are active + packet_hashlist = [] # A list of packet hashes for duplicate detection + receipts = [] # Receipts of all outgoing packets for proof processing + + # TODO: "destination_table" should really be renamed to "path_table" + announce_table = {} # A table for storing announces currently waiting to be retransmitted + destination_table = {} # A lookup table containing the next hop to a given destination + reverse_table = {} # A lookup table for storing packet hashes used to return proofs and replies + link_table = {} # A lookup table containing hops for links + held_announces = {} # A table containing temporarily held announce-table entries + + # Transport control destinations are used + # for control purposes like path requests + control_destinations = [] + control_hashes = [] # Interfaces for communicating with # local clients connected to a shared # Reticulum instance local_client_interfaces = [] - # TODO: "destination_table" should really be renamed to "path_table" - announce_table = {} # A table for storing announces currently waiting to be retransmitted - destination_table = {} # A lookup table containing the next hop to a given destination - reverse_table = {} # A lookup table for storing packet hashes used to return proofs and replies - link_table = {} # A lookup table containing hops for links - held_announces = {} # A table containing temporarily held announce-table entries - jobs_locked = False jobs_running = False job_interval = 0.250 @@ -76,7 +81,9 @@ class Transport: identity = None @staticmethod - def start(): + def start(reticulum_instance): + Transport.owner = reticulum_instance + if Transport.identity == None: transport_identity_path = RNS.Reticulum.storagepath+"/transport_identity" if os.path.isfile(transport_identity_path): @@ -99,8 +106,10 @@ class Transport: RNS.log("Could not load packet hashlist from storage, the contained exception was: "+str(e), RNS.LOG_ERROR) # Create transport-specific destinations - path_request_destination = RNS.Destination(None, RNS.Destination.IN, RNS.Destination.PLAIN, Transport.APP_NAME, "path", "request") - path_request_destination.packet_callback(Transport.pathRequestHandler) + Transport.path_request_destination = RNS.Destination(None, RNS.Destination.IN, RNS.Destination.PLAIN, Transport.APP_NAME, "path", "request") + Transport.path_request_destination.packet_callback(Transport.pathRequestHandler) + Transport.control_destinations.append(Transport.path_request_destination) + Transport.control_hashes.append(Transport.path_request_destination.hash) thread = threading.Thread(target=Transport.jobloop) thread.setDaemon(True) @@ -108,7 +117,7 @@ class Transport: if RNS.Reticulum.transport_enabled(): destination_table_path = RNS.Reticulum.storagepath+"/destination_table" - if os.path.isfile(destination_table_path): + if os.path.isfile(destination_table_path) and not Transport.owner.is_connected_to_shared_instance: serialised_destinations = [] try: file = open(destination_table_path, "rb") @@ -117,16 +126,21 @@ class Transport: for serialised_entry in serialised_destinations: destination_hash = serialised_entry[0] + timestamp = serialised_entry[1] + received_from = serialised_entry[2] + hops = serialised_entry[3] + expires = serialised_entry[4] + random_blobs = serialised_entry[5] receiving_interface = Transport.find_interface_from_hash(serialised_entry[6]) announce_packet = Transport.get_cached_packet(serialised_entry[7]) if announce_packet != None and receiving_interface != None: announce_packet.unpack() - timestamp = serialised_entry[1] - received_from = serialised_entry[2] - hops = serialised_entry[3] - expires = serialised_entry[4] - random_blobs = serialised_entry[5] + # We increase the hops, since reading a packet + # from cache is equivalent to receiving it again + # over an interface. It is cached with it's non- + # increased hop-count. + announce_packet.hops += 1 Transport.destination_table[destination_hash] = [timestamp, received_from, hops, expires, random_blobs, receiving_interface, announce_packet] RNS.log("Loaded path table entry for "+RNS.prettyhexrep(destination_hash)+" from storage", RNS.LOG_DEBUG) else: @@ -242,10 +256,11 @@ class Transport: Transport.reverse_table.pop(truncated_packet_hash) # Cull the link table according to timeout + stale_links = [] for link_id in Transport.link_table: link_entry = Transport.link_table[link_id] if time.time() > link_entry[0] + Transport.LINK_TIMEOUT: - Transport.link_table.pop(link_id) + stale_links.append(link_id) # Cull the path table stale_paths = [] @@ -261,6 +276,17 @@ class Transport: stale_paths.append(destination_hash) RNS.log("Path to "+RNS.prettyhexrep(destination_hash)+" was removed since the attached interface no longer exists", RNS.LOG_DEBUG) + i = 0 + for link_id in stale_links: + Transport.link_table.pop(link_id) + i += 1 + + if i > 0: + if i == 1: + RNS.log("Dropped "+str(i)+" link", RNS.LOG_DEBUG) + else: + RNS.log("Dropped "+str(i)+" links", RNS.LOG_DEBUG) + i = 0 for destination_hash in stale_paths: Transport.destination_table.pop(destination_hash) @@ -294,28 +320,26 @@ class Transport: packet.updateHash() sent = False - # Check if we have a known path for the destination - # in the destination table + # Check if we have a known path for the destination in the path table if packet.packet_type != RNS.Packet.ANNOUNCE and packet.destination_hash in Transport.destination_table: outbound_interface = Transport.destination_table[packet.destination_hash][5] if Transport.destination_table[packet.destination_hash][2] > 1: - # Insert packet into transport - new_flags = (RNS.Packet.HEADER_2) << 6 | (Transport.TRANSPORT) << 4 | (packet.flags & 0b00001111) - new_raw = struct.pack("!B", new_flags) - new_raw += packet.raw[1:2] - new_raw += Transport.destination_table[packet.destination_hash][1] - new_raw += packet.raw[2:] - RNS.log("Packet was inserted into transport via "+RNS.prettyhexrep(Transport.destination_table[packet.destination_hash][1])+" on: "+str(outbound_interface), RNS.LOG_DEBUG) - outbound_interface.processOutgoing(new_raw) - Transport.destination_table[packet.destination_hash][0] = time.time() - sent = True + if packet.header_type == RNS.Packet.HEADER_1: + # Insert packet into transport + new_flags = (RNS.Packet.HEADER_2) << 6 | (Transport.TRANSPORT) << 4 | (packet.flags & 0b00001111) + new_raw = struct.pack("!B", new_flags) + new_raw += packet.raw[1:2] + new_raw += Transport.destination_table[packet.destination_hash][1] + new_raw += packet.raw[2:] + # TODO: Remove at some point + RNS.log("Packet was inserted into transport via "+RNS.prettyhexrep(Transport.destination_table[packet.destination_hash][1])+" on: "+str(outbound_interface), RNS.LOG_DEBUG) + outbound_interface.processOutgoing(new_raw) + Transport.destination_table[packet.destination_hash][0] = time.time() + sent = True else: # Destination is directly reachable, and we know on # what interface, so transmit only on that one - - RNS.log("Transmitting "+str(len(packet.raw))+" bytes on: "+str(outbound_interface), RNS.LOG_EXTREME) - RNS.log("Hash is "+RNS.prettyhexrep(packet.packet_hash), RNS.LOG_EXTREME) outbound_interface.processOutgoing(packet.raw) sent = True @@ -364,7 +388,9 @@ class Transport: @staticmethod def packet_filter(packet): - # TODO: Think long and hard about this + # TODO: Think long and hard about this. + # Is it even strictly necessary with the current + # transport rules? if packet.context == RNS.Packet.KEEPALIVE: return True if packet.context == RNS.Packet.RESOURCE_REQ: @@ -373,6 +399,8 @@ class Transport: return True if packet.context == RNS.Packet.RESOURCE: return True + if packet.destination_type == RNS.Destination.PLAIN: + return True if not packet.packet_hash in Transport.packet_hashlist: return True else: @@ -401,24 +429,57 @@ class Transport: new_raw += struct.pack("!B", packet.hops) new_raw += packet.raw[2:] - for local_client in Transport.local_client_interfaces: - if local_client != interface: - local_client.processOutgoing(new_raw) - if Transport.is_local_client_interface(interface): packet.hops -= 1 elif Transport.interface_to_shared_instance(interface): packet.hops -= 1 - + if Transport.packet_filter(packet): Transport.packet_hashlist.append(packet.packet_hash) Transport.cache(packet) + # Check special conditions for local clients connected + # through a shared Reticulum instance + from_local_client = (packet.receiving_interface in Transport.local_client_interfaces) + for_local_client = (packet.packet_type != RNS.Packet.ANNOUNCE) and (packet.destination_hash in Transport.destination_table and Transport.destination_table[packet.destination_hash][2] == 0) + for_local_client_link = (packet.packet_type != RNS.Packet.ANNOUNCE) and (packet.destination_hash in Transport.link_table and Transport.link_table[packet.destination_hash][4] in Transport.local_client_interfaces) + for_local_client_link |= (packet.packet_type != RNS.Packet.ANNOUNCE) and (packet.destination_hash in Transport.link_table and Transport.link_table[packet.destination_hash][2] in Transport.local_client_interfaces) + proof_for_local_client = (packet.destination_hash in Transport.reverse_table) and (Transport.reverse_table[packet.destination_hash][0] in Transport.local_client_interfaces) + + # Plain broadcast packets from local clients are sent + # directly on all attached interfaces, since they are + # never injected into transport. + if not packet.destination_hash in Transport.control_hashes: + if packet.destination_type == RNS.Destination.PLAIN and packet.transport_type == Transport.BROADCAST: + # Send to all interfaces except the originator + if from_local_client: + for interface in Transport.interfaces: + if interface != packet.receiving_interface: + interface.processOutgoing(packet.raw) + # If the packet was not from a local client, send + # it directly to all local clients + else: + for interface in Transport.local_client_interfaces: + interface.processOutgoing(packet.raw) + + # General transport handling. Takes care of directing # packets according to transport tables and recording # entries in reverse and link tables. - if RNS.Reticulum.transport_enabled(): + if RNS.Reticulum.transport_enabled() or from_local_client or for_local_client or for_local_client_link: + + # If there is no transport id, but the packet is + # for a local client, we generate the transport + # id (it was stripped on the previous hop, since + # we "spoof" the hop count for clients behind a + # shared instance, so they look directly reach- + # able), and reinsert, so the normal transport + # implementation can handle the packet. + + if packet.transport_id == None and for_local_client: + packet.transport_id = Transport.identity.hash + if packet.transport_id != None and packet.packet_type != RNS.Packet.ANNOUNCE: if packet.transport_id == Transport.identity.hash: RNS.log("Received packet in transport for "+RNS.prettyhexrep(packet.destination_hash)+" with matching transport ID, transporting it...", RNS.LOG_DEBUG) @@ -432,12 +493,17 @@ class Transport: new_raw += struct.pack("!B", packet.hops) new_raw += next_hop new_raw += packet.raw[12:] - else: + elif remaining_hops == 1: # Strip transport headers and transmit new_flags = (RNS.Packet.HEADER_1) << 6 | (Transport.BROADCAST) << 4 | (packet.flags & 0b00001111) new_raw = struct.pack("!B", new_flags) new_raw += struct.pack("!B", packet.hops) new_raw += packet.raw[12:] + elif remaining_hops == 0: + # Just increase hop count and transmit + new_raw = packet.raw[0:1] + new_raw += struct.pack("!B", packet.hops) + new_raw += packet.raw[2:] outbound_interface = Transport.destination_table[packet.destination_hash][5] outbound_interface.processOutgoing(new_raw) @@ -474,7 +540,7 @@ class Transport: # Link transport handling. Directs packets according # to entries in the link tables - if packet.packet_type != RNS.Packet.ANNOUNCE and packet.packet_type != RNS.Packet.LINKREQUEST: + if packet.packet_type != RNS.Packet.ANNOUNCE and packet.packet_type != RNS.Packet.LINKREQUEST and packet.context != RNS.Packet.LRPROOF: if packet.destination_hash in Transport.link_table: link_entry = Transport.link_table[packet.destination_hash] # If receiving and outbound interface is @@ -602,7 +668,7 @@ class Transport: if (RNS.Reticulum.transport_enabled() or Transport.from_local_client(packet)) and packet.context != RNS.Packet.PATH_RESPONSE: # If the announce is from a local client, # we announce it immediately, but only one - # time, and also set the hops to 0. + # time. if Transport.from_local_client(packet): retransmit_timeout = now retries = Transport.PATHFINDER_R @@ -619,15 +685,41 @@ class Transport: attached_interface ] + # If we have any local clients connected, we re- + # transmit the announce to them immediately + if (len(Transport.local_client_interfaces)): + announce_identity = RNS.Identity.recall(packet.destination_hash) + announce_destination = RNS.Destination(announce_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, "unknown", "unknown"); + announce_destination.hash = packet.destination_hash + announce_destination.hexhash = announce_destination.hash.hex() + announce_context = RNS.Packet.NONE + announce_data = packet.data + + new_announce = RNS.Packet( + announce_destination, + announce_data, + RNS.Packet.ANNOUNCE, + context = announce_context, + header_type = RNS.Packet.HEADER_2, + transport_type = Transport.TRANSPORT, + transport_id = Transport.identity.hash, + attached_interface = attached_interface + ) + + new_announce.hops = packet.hops + new_announce.send() + Transport.destination_table[packet.destination_hash] = [now, received_from, announce_hops, expires, random_blobs, packet.receiving_interface, packet] RNS.log("Path to "+RNS.prettyhexrep(packet.destination_hash)+" is now "+str(announce_hops)+" hops away via "+RNS.prettyhexrep(received_from)+" on "+str(packet.receiving_interface), RNS.LOG_VERBOSE) + # Handling for linkrequests to local destinations elif packet.packet_type == RNS.Packet.LINKREQUEST: for destination in Transport.destinations: if destination.hash == packet.destination_hash and destination.type == packet.destination_type: packet.destination = destination destination.receive(packet) + # Handling for local data packets elif packet.packet_type == RNS.Packet.DATA: if packet.destination_type == RNS.Destination.LINK: for link in Transport.active_links: @@ -648,12 +740,12 @@ class Transport: if destination.callbacks.proof_requested(packet): packet.prove() + # Handling for proofs and link-request proofs elif packet.packet_type == RNS.Packet.PROOF: if packet.context == RNS.Packet.LRPROOF: # This is a link request proof, check if it # needs to be transported - - if RNS.Reticulum.transport_enabled() and packet.destination_hash in Transport.link_table: + if (RNS.Reticulum.transport_enabled() or for_local_client_link or from_local_client) and packet.destination_hash in Transport.link_table: link_entry = Transport.link_table[packet.destination_hash] if packet.receiving_interface == link_entry[2]: # TODO: Should we validate the LR proof at each transport @@ -689,7 +781,7 @@ class Transport: proof_hash = None # Check if this proof neds to be transported - if RNS.Reticulum.transport_enabled() and packet.destination_hash in Transport.reverse_table: + if (RNS.Reticulum.transport_enabled() or from_local_client or proof_for_local_client) and packet.destination_hash in Transport.reverse_table: reverse_entry = Transport.reverse_table.pop(packet.destination_hash) if packet.receiving_interface == reverse_entry[1]: RNS.log("Proof received on correct interface, transporting it via "+str(reverse_entry[0]), RNS.LOG_DEBUG) @@ -757,6 +849,11 @@ class Transport: return False + # When caching packets to storage, they are written + # exactly as they arrived over their interface. This + # means that they have not had their hop count + # increased yet! Take note of this when reading from + # the packet cache. @staticmethod def cache(packet, force_cache=False): if RNS.Transport.shouldCache(packet) or force_cache: @@ -931,27 +1028,47 @@ class Transport: except Exception as e: RNS.log("Could not save packet hashlist to storage, the contained exception was: "+str(e), RNS.LOG_ERROR) - RNS.log("Saving path table to storage...", RNS.LOG_VERBOSE) - try: - serialised_destinations = [] - for destination_hash in Transport.destination_table: - # Get the destination entry from the destination table - de = Transport.destination_table[destination_hash] - interface_hash = de[5].get_hash() + if not Transport.owner.is_connected_to_shared_instance: + RNS.log("Saving path table to storage...", RNS.LOG_VERBOSE) + try: + serialised_destinations = [] + for destination_hash in Transport.destination_table: + # Get the destination entry from the destination table + de = Transport.destination_table[destination_hash] + interface_hash = de[5].get_hash() - # Only store destination table entry if the associated - # interface is still active - interface = Transport.find_interface_from_hash(interface_hash) - if interface != None: - Transport.cache(de[6], force_cache=True) - packet_hash = de[6].getHash() - serialised_entry = [destination_hash, de[0], de[1], de[2], de[3], de[4], interface_hash, packet_hash] - serialised_destinations.append(serialised_entry) + # Only store destination table entry if the associated + # interface is still active + interface = Transport.find_interface_from_hash(interface_hash) + if interface != None: + # Get the destination entry from the destination table + de = Transport.destination_table[destination_hash] + timestamp = de[0] + received_from = de[1] + hops = de[2] + expires = de[3] + random_blobs = de[4] + packet_hash = de[6].getHash() - destination_table_path = RNS.Reticulum.storagepath+"/destination_table" - file = open(destination_table_path, "wb") - file.write(umsgpack.packb(serialised_destinations)) - file.close() - RNS.log("Done saving path table to storage", RNS.LOG_VERBOSE) - except Exception as e: - RNS.log("Could not save path table to storage, the contained exception was: "+str(e), RNS.LOG_ERROR) + serialised_entry = [ + destination_hash, + timestamp, + received_from, + hops, + expires, + random_blobs, + interface_hash, + packet_hash + ] + + serialised_destinations.append(serialised_entry) + + Transport.cache(de[6], force_cache=True) + + destination_table_path = RNS.Reticulum.storagepath+"/destination_table" + file = open(destination_table_path, "wb") + file.write(umsgpack.packb(serialised_destinations)) + file.close() + RNS.log("Done saving path table to storage", RNS.LOG_VERBOSE) + except Exception as e: + RNS.log("Could not save path table to storage, the contained exception was: "+str(e), RNS.LOG_ERROR)