# MIT License # # Copyright (c) 2016-2023 Mark Qvist / unsigned.io and contributors. # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in all # copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. import os import RNS import time import math import struct import threading from time import sleep from .vendor import umsgpack as umsgpack class Transport: """ Through static methods of this class you can interact with the Transport system of Reticulum. """ # Constants BROADCAST = 0x00; TRANSPORT = 0x01; RELAY = 0x02; TUNNEL = 0x03; types = [BROADCAST, TRANSPORT, RELAY, TUNNEL] REACHABILITY_UNREACHABLE = 0x00 REACHABILITY_DIRECT = 0x01 REACHABILITY_TRANSPORT = 0x02 APP_NAME = "rnstransport" PATHFINDER_M = 128 # Max hops """ Maximum amount of hops that Reticulum will transport a packet. """ PATHFINDER_R = 1 # Retransmit retries PATHFINDER_G = 5 # Retry grace period PATHFINDER_RW = 0.5 # Random window for announce rebroadcast PATHFINDER_E = 60*60*24*7 # Path expiration of one week AP_PATH_TIME = 60*60*24 # Path expiration of one day for Access Point paths ROAMING_PATH_TIME = 60*60*6 # Path expiration of 6 hours for Roaming paths # TODO: Calculate an optimal number for this in # various situations LOCAL_REBROADCASTS_MAX = 2 # How many local rebroadcasts of an announce is allowed PATH_REQUEST_TIMEOUT = 15 # Default timuout for client path requests in seconds PATH_REQUEST_GRACE = 0.4 # Grace time before a path announcement is made, allows directly reachable peers to respond first PATH_REQUEST_RG = 1.0 # Extra grace time for roaming-mode interfaces to allow more suitable peers to respond first PATH_REQUEST_MI = 20 # Minimum interval in seconds for automated path requests STATE_UNKNOWN = 0x00 STATE_UNRESPONSIVE = 0x01 STATE_RESPONSIVE = 0x02 LINK_TIMEOUT = RNS.Link.STALE_TIME * 1.25 REVERSE_TIMEOUT = 30*60 # Reverse table entries are removed after 30 minutes DESTINATION_TIMEOUT = 60*60*24*7 # Destination table entries are removed if unused for one week MAX_RECEIPTS = 1024 # Maximum number of receipts to keep track of MAX_RATE_TIMESTAMPS = 16 # Maximum number of announce timestamps to keep per destination PERSIST_RANDOM_BLOBS = 32 # Maximum number of random blobs per destination to persist to disk MAX_RANDOM_BLOBS = 64 # Maximum number of random blobs per destination to keep in memory interfaces = [] # All active interfaces destinations = [] # All active destinations pending_links = [] # Links that are being established active_links = [] # Links that are active packet_hashlist = [] # A list of packet hashes for duplicate detection receipts = [] # Receipts of all outgoing packets for proof processing # TODO: "destination_table" should really be renamed to "path_table" # Notes on memory usage: 1 megabyte of memory can store approximately # 55.100 path table entries or approximately 22.300 link table entries. announce_table = {} # A table for storing announces currently waiting to be retransmitted destination_table = {} # A lookup table containing the next hop to a given destination reverse_table = {} # A lookup table for storing packet hashes used to return proofs and replies link_table = {} # A lookup table containing hops for links held_announces = {} # A table containing temporarily held announce-table entries announce_handlers = [] # A table storing externally registered announce handlers tunnels = {} # A table storing tunnels to other transport instances announce_rate_table = {} # A table for keeping track of announce rates path_requests = {} # A table for storing path request timestamps path_states = {} # A table for keeping track of path states discovery_path_requests = {} # A table for keeping track of path requests on behalf of other nodes discovery_pr_tags = [] # A table for keeping track of tagged path requests max_pr_tags = 32000 # Maximum amount of unique path request tags to remember # Transport control destinations are used # for control purposes like path requests control_destinations = [] control_hashes = [] # Interfaces for communicating with # local clients connected to a shared # Reticulum instance local_client_interfaces = [] local_client_rssi_cache = [] local_client_snr_cache = [] local_client_q_cache = [] LOCAL_CLIENT_CACHE_MAXSIZE = 512 pending_local_path_requests = {} start_time = None jobs_locked = False jobs_running = False job_interval = 0.250 links_last_checked = 0.0 links_check_interval = 1.0 receipts_last_checked = 0.0 receipts_check_interval = 1.0 announces_last_checked = 0.0 announces_check_interval = 1.0 hashlist_maxsize = 1000000 tables_last_culled = 0.0 tables_cull_interval = 5.0 interface_last_jobs = 0.0 interface_jobs_interval = 5.0 identity = None @staticmethod def start(reticulum_instance): Transport.jobs_running = True Transport.owner = reticulum_instance if Transport.identity == None: transport_identity_path = RNS.Reticulum.storagepath+"/transport_identity" if os.path.isfile(transport_identity_path): Transport.identity = RNS.Identity.from_file(transport_identity_path) if Transport.identity == None: RNS.log("No valid Transport Identity in storage, creating...", RNS.LOG_VERBOSE) Transport.identity = RNS.Identity() Transport.identity.to_file(transport_identity_path) else: RNS.log("Loaded Transport Identity from storage", RNS.LOG_VERBOSE) packet_hashlist_path = RNS.Reticulum.storagepath+"/packet_hashlist" if not Transport.owner.is_connected_to_shared_instance: if os.path.isfile(packet_hashlist_path): try: file = open(packet_hashlist_path, "rb") Transport.packet_hashlist = umsgpack.unpackb(file.read()) file.close() except Exception as e: RNS.log("Could not load packet hashlist from storage, the contained exception was: "+str(e), RNS.LOG_ERROR) # Create transport-specific destinations Transport.path_request_destination = RNS.Destination(None, RNS.Destination.IN, RNS.Destination.PLAIN, Transport.APP_NAME, "path", "request") Transport.path_request_destination.set_packet_callback(Transport.path_request_handler) Transport.control_destinations.append(Transport.path_request_destination) Transport.control_hashes.append(Transport.path_request_destination.hash) Transport.tunnel_synthesize_destination = RNS.Destination(None, RNS.Destination.IN, RNS.Destination.PLAIN, Transport.APP_NAME, "tunnel", "synthesize") Transport.tunnel_synthesize_destination.set_packet_callback(Transport.tunnel_synthesize_handler) Transport.control_destinations.append(Transport.tunnel_synthesize_handler) Transport.control_hashes.append(Transport.tunnel_synthesize_destination.hash) Transport.jobs_running = False thread = threading.Thread(target=Transport.jobloop, daemon=True) thread.start() if RNS.Reticulum.transport_enabled(): destination_table_path = RNS.Reticulum.storagepath+"/destination_table" tunnel_table_path = RNS.Reticulum.storagepath+"/tunnels" if os.path.isfile(destination_table_path) and not Transport.owner.is_connected_to_shared_instance: serialised_destinations = [] try: file = open(destination_table_path, "rb") serialised_destinations = umsgpack.unpackb(file.read()) file.close() for serialised_entry in serialised_destinations: destination_hash = serialised_entry[0] if len(destination_hash) == RNS.Reticulum.TRUNCATED_HASHLENGTH//8: timestamp = serialised_entry[1] received_from = serialised_entry[2] hops = serialised_entry[3] expires = serialised_entry[4] random_blobs = serialised_entry[5] receiving_interface = Transport.find_interface_from_hash(serialised_entry[6]) announce_packet = Transport.get_cached_packet(serialised_entry[7]) if announce_packet != None and receiving_interface != None: announce_packet.unpack() # We increase the hops, since reading a packet # from cache is equivalent to receiving it again # over an interface. It is cached with it's non- # increased hop-count. announce_packet.hops += 1 Transport.destination_table[destination_hash] = [timestamp, received_from, hops, expires, random_blobs, receiving_interface, announce_packet] RNS.log("Loaded path table entry for "+RNS.prettyhexrep(destination_hash)+" from storage", RNS.LOG_DEBUG) else: RNS.log("Could not reconstruct path table entry from storage for "+RNS.prettyhexrep(destination_hash), RNS.LOG_DEBUG) if announce_packet == None: RNS.log("The announce packet could not be loaded from cache", RNS.LOG_DEBUG) if receiving_interface == None: RNS.log("The interface is no longer available", RNS.LOG_DEBUG) if len(Transport.destination_table) == 1: specifier = "entry" else: specifier = "entries" RNS.log("Loaded "+str(len(Transport.destination_table))+" path table "+specifier+" from storage", RNS.LOG_VERBOSE) except Exception as e: RNS.log("Could not load destination table from storage, the contained exception was: "+str(e), RNS.LOG_ERROR) if os.path.isfile(tunnel_table_path) and not Transport.owner.is_connected_to_shared_instance: serialised_tunnels = [] try: file = open(tunnel_table_path, "rb") serialised_tunnels = umsgpack.unpackb(file.read()) file.close() for serialised_tunnel in serialised_tunnels: tunnel_id = serialised_tunnel[0] interface_hash = serialised_tunnel[1] serialised_paths = serialised_tunnel[2] expires = serialised_tunnel[3] tunnel_paths = {} for serialised_entry in serialised_paths: destination_hash = serialised_entry[0] timestamp = serialised_entry[1] received_from = serialised_entry[2] hops = serialised_entry[3] expires = serialised_entry[4] random_blobs = serialised_entry[5] receiving_interface = Transport.find_interface_from_hash(serialised_entry[6]) announce_packet = Transport.get_cached_packet(serialised_entry[7]) if announce_packet != None: announce_packet.unpack() # We increase the hops, since reading a packet # from cache is equivalent to receiving it again # over an interface. It is cached with it's non- # increased hop-count. announce_packet.hops += 1 tunnel_path = [timestamp, received_from, hops, expires, random_blobs, receiving_interface, announce_packet] tunnel_paths[destination_hash] = tunnel_path tunnel = [tunnel_id, None, tunnel_paths, expires] Transport.tunnels[tunnel_id] = tunnel if len(Transport.destination_table) == 1: specifier = "entry" else: specifier = "entries" RNS.log("Loaded "+str(len(Transport.tunnels))+" tunnel table "+specifier+" from storage", RNS.LOG_VERBOSE) except Exception as e: RNS.log("Could not load tunnel table from storage, the contained exception was: "+str(e), RNS.LOG_ERROR) if RNS.Reticulum.probe_destination_enabled(): Transport.probe_destination = RNS.Destination(Transport.identity, RNS.Destination.IN, RNS.Destination.SINGLE, Transport.APP_NAME, "probe") Transport.probe_destination.accepts_links(False) Transport.probe_destination.set_proof_strategy(RNS.Destination.PROVE_ALL) Transport.probe_destination.announce() RNS.log("Transport Instance will respond to probe requests on "+str(Transport.probe_destination), RNS.LOG_NOTICE) else: Transport.probe_destination = None RNS.log("Transport instance "+str(Transport.identity)+" started", RNS.LOG_VERBOSE) Transport.start_time = time.time() # Synthesize tunnels for any interfaces wanting it for interface in Transport.interfaces: interface.tunnel_id = None if hasattr(interface, "wants_tunnel") and interface.wants_tunnel: Transport.synthesize_tunnel(interface) @staticmethod def jobloop(): while (True): Transport.jobs() sleep(Transport.job_interval) @staticmethod def jobs(): outgoing = [] path_requests = {} blocked_if = None Transport.jobs_running = True try: if not Transport.jobs_locked: # Process active and pending link lists if time.time() > Transport.links_last_checked+Transport.links_check_interval: for link in Transport.pending_links: if link.status == RNS.Link.CLOSED: # If we are not a Transport Instance, finding a pending link # that was never activated will trigger an expiry of the path # to the destination, and an attempt to rediscover the path. if not RNS.Reticulum.transport_enabled(): Transport.expire_path(link.destination.hash) # If we are connected to a shared instance, it will take # care of sending out a new path request. If not, we will # send one directly. if not Transport.owner.is_connected_to_shared_instance: last_path_request = 0 if link.destination.hash in Transport.path_requests: last_path_request = Transport.path_requests[link.destination.hash] if time.time() - last_path_request > Transport.PATH_REQUEST_MI: RNS.log("Trying to rediscover path for "+RNS.prettyhexrep(link.destination.hash)+" since an attempted link was never established", RNS.LOG_DEBUG) if not link.destination.hash in path_requests: blocked_if = None path_requests[link.destination.hash] = blocked_if Transport.pending_links.remove(link) for link in Transport.active_links: if link.status == RNS.Link.CLOSED: Transport.active_links.remove(link) Transport.links_last_checked = time.time() # Process receipts list for timed-out packets if time.time() > Transport.receipts_last_checked+Transport.receipts_check_interval: while len(Transport.receipts) > Transport.MAX_RECEIPTS: culled_receipt = Transport.receipts.pop(0) culled_receipt.timeout = -1 culled_receipt.check_timeout() for receipt in Transport.receipts: receipt.check_timeout() if receipt.status != RNS.PacketReceipt.SENT: if receipt in Transport.receipts: Transport.receipts.remove(receipt) Transport.receipts_last_checked = time.time() # Process announces needing retransmission if time.time() > Transport.announces_last_checked+Transport.announces_check_interval: completed_announces = [] for destination_hash in Transport.announce_table: announce_entry = Transport.announce_table[destination_hash] if announce_entry[2] > Transport.PATHFINDER_R: RNS.log("Completed announce processing for "+RNS.prettyhexrep(destination_hash)+", retry limit reached", RNS.LOG_EXTREME) completed_announces.append(destination_hash) else: if time.time() > announce_entry[1]: announce_entry[1] = time.time() + Transport.PATHFINDER_G + Transport.PATHFINDER_RW announce_entry[2] += 1 packet = announce_entry[5] block_rebroadcasts = announce_entry[7] attached_interface = announce_entry[8] announce_context = RNS.Packet.NONE if block_rebroadcasts: announce_context = RNS.Packet.PATH_RESPONSE announce_data = packet.data announce_identity = RNS.Identity.recall(packet.destination_hash) announce_destination = RNS.Destination(announce_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, "unknown", "unknown"); announce_destination.hash = packet.destination_hash announce_destination.hexhash = announce_destination.hash.hex() new_packet = RNS.Packet( announce_destination, announce_data, RNS.Packet.ANNOUNCE, context = announce_context, header_type = RNS.Packet.HEADER_2, transport_type = Transport.TRANSPORT, transport_id = Transport.identity.hash, attached_interface = attached_interface ) new_packet.hops = announce_entry[4] if block_rebroadcasts: RNS.log("Rebroadcasting announce as path response for "+RNS.prettyhexrep(announce_destination.hash)+" with hop count "+str(new_packet.hops), RNS.LOG_DEBUG) else: RNS.log("Rebroadcasting announce for "+RNS.prettyhexrep(announce_destination.hash)+" with hop count "+str(new_packet.hops), RNS.LOG_DEBUG) outgoing.append(new_packet) # This handles an edge case where a peer sends a past # request for a destination just after an announce for # said destination has arrived, but before it has been # rebroadcast locally. In such a case the actual announce # is temporarily held, and then reinserted when the path # request has been served to the peer. if destination_hash in Transport.held_announces: held_entry = Transport.held_announces.pop(destination_hash) Transport.announce_table[destination_hash] = held_entry RNS.log("Reinserting held announce into table", RNS.LOG_DEBUG) for destination_hash in completed_announces: if destination_hash in Transport.announce_table: Transport.announce_table.pop(destination_hash) Transport.announces_last_checked = time.time() # Cull the packet hashlist if it has reached its max size if len(Transport.packet_hashlist) > Transport.hashlist_maxsize: Transport.packet_hashlist = Transport.packet_hashlist[len(Transport.packet_hashlist)-Transport.hashlist_maxsize:len(Transport.packet_hashlist)-1] # Cull the path request tags list if it has reached its max size if len(Transport.discovery_pr_tags) > Transport.max_pr_tags: Transport.discovery_pr_tags = Transport.discovery_pr_tags[len(Transport.discovery_pr_tags)-Transport.max_pr_tags:len(Transport.discovery_pr_tags)-1] if time.time() > Transport.tables_last_culled + Transport.tables_cull_interval: # Remove unneeded path state entries stale_path_states = [] for destination_hash in Transport.path_states: if not destination_hash in Transport.destination_table: stale_path_states.append(destination_hash) # Cull the reverse table according to timeout stale_reverse_entries = [] for truncated_packet_hash in Transport.reverse_table: reverse_entry = Transport.reverse_table[truncated_packet_hash] if time.time() > reverse_entry[2] + Transport.REVERSE_TIMEOUT: stale_reverse_entries.append(truncated_packet_hash) # Cull the link table according to timeout stale_links = [] for link_id in Transport.link_table: link_entry = Transport.link_table[link_id] if link_entry[7] == True: if time.time() > link_entry[0] + Transport.LINK_TIMEOUT: stale_links.append(link_id) else: if time.time() > link_entry[8]: stale_links.append(link_id) last_path_request = 0 if link_entry[6] in Transport.path_requests: last_path_request = Transport.path_requests[link_entry[6]] lr_taken_hops = link_entry[5] path_request_throttle = time.time() - last_path_request < Transport.PATH_REQUEST_MI path_request_conditions = False # If the path has been invalidated between the time of # making the link request and now, try to rediscover it if not Transport.has_path(link_entry[6]): RNS.log("Trying to rediscover path for "+RNS.prettyhexrep(link_entry[6])+" since an attempted link was never established, and path is now missing", RNS.LOG_DEBUG) path_request_conditions =True # If this link request was originated from a local client # attempt to rediscover a path to the destination, if this # has not already happened recently. elif not path_request_throttle and lr_taken_hops == 0: RNS.log("Trying to rediscover path for "+RNS.prettyhexrep(link_entry[6])+" since an attempted local client link was never established", RNS.LOG_DEBUG) path_request_conditions = True # If the link destination was previously only 1 hop # away, this likely means that it was local to one # of our interfaces, and that it roamed somewhere else. # In that case, try to discover a new path. elif not path_request_throttle and Transport.hops_to(link_entry[6]) == 1: RNS.log("Trying to rediscover path for "+RNS.prettyhexrep(link_entry[6])+" since an attempted link was never established, and destination was previously local to an interface on this instance", RNS.LOG_DEBUG) path_request_conditions = True blocked_if = link_entry[4] # If the link initiator is only 1 hop away, # this likely means that network topology has # changed. In that case, we try to discover a new path, # and mark the old one as potentially unresponsive. elif not path_request_throttle and lr_taken_hops == 1: RNS.log("Trying to rediscover path for "+RNS.prettyhexrep(link_entry[6])+" since an attempted link was never established, and link initiator is local to an interface on this instance", RNS.LOG_DEBUG) path_request_conditions = True blocked_if = link_entry[4] if RNS.Reticulum.transport_enabled(): if hasattr(link_entry[4], "mode") and link_entry[4].mode != RNS.Interfaces.Interface.Interface.MODE_BOUNDARY: Transport.mark_path_unresponsive(link_entry[6]) if path_request_conditions: if not link_entry[6] in path_requests: path_requests[link_entry[6]] = blocked_if if not RNS.Reticulum.transport_enabled(): # Drop current path if we are not a transport instance, to # allow using higher-hop count paths or reused announces # from newly adjacent transport instances. Transport.expire_path(link_entry[6]) # Cull the path table stale_paths = [] for destination_hash in Transport.destination_table: destination_entry = Transport.destination_table[destination_hash] attached_interface = destination_entry[5] if attached_interface != None and hasattr(attached_interface, "mode") and attached_interface.mode == RNS.Interfaces.Interface.Interface.MODE_ACCESS_POINT: destination_expiry = destination_entry[0] + Transport.AP_PATH_TIME elif attached_interface != None and hasattr(attached_interface, "mode") and attached_interface.mode == RNS.Interfaces.Interface.Interface.MODE_ROAMING: destination_expiry = destination_entry[0] + Transport.ROAMING_PATH_TIME else: destination_expiry = destination_entry[0] + Transport.DESTINATION_TIMEOUT if time.time() > destination_expiry: stale_paths.append(destination_hash) RNS.log("Path to "+RNS.prettyhexrep(destination_hash)+" timed out and was removed", RNS.LOG_DEBUG) elif not attached_interface in Transport.interfaces: stale_paths.append(destination_hash) RNS.log("Path to "+RNS.prettyhexrep(destination_hash)+" was removed since the attached interface no longer exists", RNS.LOG_DEBUG) # Cull the pending discovery path requests table stale_discovery_path_requests = [] for destination_hash in Transport.discovery_path_requests: entry = Transport.discovery_path_requests[destination_hash] if time.time() > entry["timeout"]: stale_discovery_path_requests.append(destination_hash) RNS.log("Waiting path request for "+RNS.prettyhexrep(destination_hash)+" timed out and was removed", RNS.LOG_DEBUG) # Cull the tunnel table stale_tunnels = [] ti = 0 for tunnel_id in Transport.tunnels: tunnel_entry = Transport.tunnels[tunnel_id] expires = tunnel_entry[3] if time.time() > expires: stale_tunnels.append(tunnel_id) RNS.log("Tunnel "+RNS.prettyhexrep(tunnel_id)+" timed out and was removed", RNS.LOG_EXTREME) else: stale_tunnel_paths = [] tunnel_paths = tunnel_entry[2] for tunnel_path in tunnel_paths: tunnel_path_entry = tunnel_paths[tunnel_path] if time.time() > tunnel_path_entry[0] + Transport.DESTINATION_TIMEOUT: stale_tunnel_paths.append(tunnel_path) RNS.log("Tunnel path to "+RNS.prettyhexrep(tunnel_path)+" timed out and was removed", RNS.LOG_EXTREME) for tunnel_path in stale_tunnel_paths: tunnel_paths.pop(tunnel_path) ti += 1 if ti > 0: if ti == 1: RNS.log("Removed "+str(ti)+" tunnel path", RNS.LOG_EXTREME) else: RNS.log("Removed "+str(ti)+" tunnel paths", RNS.LOG_EXTREME) i = 0 for truncated_packet_hash in stale_reverse_entries: Transport.reverse_table.pop(truncated_packet_hash) i += 1 if i > 0: if i == 1: RNS.log("Released "+str(i)+" reverse table entry", RNS.LOG_EXTREME) else: RNS.log("Released "+str(i)+" reverse table entries", RNS.LOG_EXTREME) i = 0 for link_id in stale_links: Transport.link_table.pop(link_id) i += 1 if i > 0: if i == 1: RNS.log("Released "+str(i)+" link", RNS.LOG_EXTREME) else: RNS.log("Released "+str(i)+" links", RNS.LOG_EXTREME) i = 0 for destination_hash in stale_paths: Transport.destination_table.pop(destination_hash) i += 1 if i > 0: if i == 1: RNS.log("Removed "+str(i)+" path", RNS.LOG_EXTREME) else: RNS.log("Removed "+str(i)+" paths", RNS.LOG_EXTREME) i = 0 for destination_hash in stale_discovery_path_requests: Transport.discovery_path_requests.pop(destination_hash) i += 1 if i > 0: if i == 1: RNS.log("Removed "+str(i)+" waiting path request", RNS.LOG_EXTREME) else: RNS.log("Removed "+str(i)+" waiting path requests", RNS.LOG_EXTREME) i = 0 for tunnel_id in stale_tunnels: Transport.tunnels.pop(tunnel_id) i += 1 if i > 0: if i == 1: RNS.log("Removed "+str(i)+" tunnel", RNS.LOG_EXTREME) else: RNS.log("Removed "+str(i)+" tunnels", RNS.LOG_EXTREME) i = 0 for destination_hash in stale_path_states: Transport.path_states.pop(destination_hash) i += 1 if i > 0: if i == 1: RNS.log("Removed "+str(i)+" path state entry", RNS.LOG_EXTREME) else: RNS.log("Removed "+str(i)+" path state entries", RNS.LOG_EXTREME) Transport.tables_last_culled = time.time() if time.time() > Transport.interface_last_jobs + Transport.interface_jobs_interval: for interface in Transport.interfaces: interface.process_held_announces() Transport.interface_last_jobs = time.time() else: # Transport jobs were locked, do nothing pass except Exception as e: RNS.log("An exception occurred while running Transport jobs.", RNS.LOG_ERROR) RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR) Transport.jobs_running = False for packet in outgoing: packet.send() for destination_hash in path_requests: blocked_if = path_requests[destination_hash] if blocked_if == None: Transport.request_path(destination_hash) else: for interface in Transport.interfaces: if interface != blocked_if: # RNS.log("Transmitting path request on "+str(interface), RNS.LOG_DEBUG) Transport.request_path(destination_hash, on_interface=interface) else: pass # RNS.log("Blocking path request on "+str(interface), RNS.LOG_DEBUG) @staticmethod def transmit(interface, raw): try: if hasattr(interface, "ifac_identity") and interface.ifac_identity != None: # Calculate packet access code ifac = interface.ifac_identity.sign(raw)[-interface.ifac_size:] # Generate mask mask = RNS.Cryptography.hkdf( length=len(raw)+interface.ifac_size, derive_from=ifac, salt=interface.ifac_key, context=None, ) # Set IFAC flag new_header = bytes([raw[0] | 0x80, raw[1]]) # Assemble new payload with IFAC new_raw = new_header+ifac+raw[2:] # Mask payload i = 0; masked_raw = b"" for byte in new_raw: if i == 0: # Mask first header byte, but make sure the # IFAC flag is still set masked_raw += bytes([byte ^ mask[i] | 0x80]) elif i == 1 or i > interface.ifac_size+1: # Mask second header byte and payload masked_raw += bytes([byte ^ mask[i]]) else: # Don't mask the IFAC itself masked_raw += bytes([byte]) i += 1 # Send it interface.processOutgoing(masked_raw) else: interface.processOutgoing(raw) except Exception as e: RNS.log("Error while transmitting on "+str(interface)+". The contained exception was: "+str(e), RNS.LOG_ERROR) @staticmethod def outbound(packet): while (Transport.jobs_running): sleep(0.0005) Transport.jobs_locked = True sent = False outbound_time = time.time() generate_receipt = False if (packet.create_receipt == True and # Only generate receipts for DATA packets packet.packet_type == RNS.Packet.DATA and # Don't generate receipts for PLAIN destinations packet.destination.type != RNS.Destination.PLAIN and # Don't generate receipts for link-related packets not (packet.context >= RNS.Packet.KEEPALIVE and packet.context <= RNS.Packet.LRPROOF) and # Don't generate receipts for resource packets not (packet.context >= RNS.Packet.RESOURCE and packet.context <= RNS.Packet.RESOURCE_RCL)): generate_receipt = True def packet_sent(packet): packet.sent = True packet.sent_at = time.time() if generate_receipt: packet.receipt = RNS.PacketReceipt(packet) Transport.receipts.append(packet.receipt) # TODO: Enable when caching has been redesigned # Transport.cache(packet) # Check if we have a known path for the destination in the path table if packet.packet_type != RNS.Packet.ANNOUNCE and packet.destination.type != RNS.Destination.PLAIN and packet.destination.type != RNS.Destination.GROUP and packet.destination_hash in Transport.destination_table: outbound_interface = Transport.destination_table[packet.destination_hash][5] # If there's more than one hop to the destination, and we know # a path, we insert the packet into transport by adding the next # transport nodes address to the header, and modifying the flags. # This rule applies both for "normal" transport, and when connected # to a local shared Reticulum instance. if Transport.destination_table[packet.destination_hash][2] > 1: if packet.header_type == RNS.Packet.HEADER_1: # Insert packet into transport new_flags = (RNS.Packet.HEADER_2) << 6 | (Transport.TRANSPORT) << 4 | (packet.flags & 0b00001111) new_raw = struct.pack("!B", new_flags) new_raw += packet.raw[1:2] new_raw += Transport.destination_table[packet.destination_hash][1] new_raw += packet.raw[2:] packet_sent(packet) Transport.transmit(outbound_interface, new_raw) Transport.destination_table[packet.destination_hash][0] = time.time() sent = True # In the special case where we are connected to a local shared # Reticulum instance, and the destination is one hop away, we # also add transport headers to inject the packet into transport # via the shared instance. Normally a packet for a destination # one hop away would just be broadcast directly, but since we # are "behind" a shared instance, we need to get that instance # to transport it onto the network. elif Transport.destination_table[packet.destination_hash][2] == 1 and Transport.owner.is_connected_to_shared_instance: if packet.header_type == RNS.Packet.HEADER_1: # Insert packet into transport new_flags = (RNS.Packet.HEADER_2) << 6 | (Transport.TRANSPORT) << 4 | (packet.flags & 0b00001111) new_raw = struct.pack("!B", new_flags) new_raw += packet.raw[1:2] new_raw += Transport.destination_table[packet.destination_hash][1] new_raw += packet.raw[2:] packet_sent(packet) Transport.transmit(outbound_interface, new_raw) Transport.destination_table[packet.destination_hash][0] = time.time() sent = True # If none of the above applies, we know the destination is # directly reachable, and also on which interface, so we # simply transmit the packet directly on that one. else: packet_sent(packet) Transport.transmit(outbound_interface, packet.raw) sent = True # If we don't have a known path for the destination, we'll # broadcast the packet on all outgoing interfaces, or the # just the relevant interface if the packet has an attached # interface, or belongs to a link. else: stored_hash = False for interface in Transport.interfaces: if interface.OUT: should_transmit = True if packet.destination.type == RNS.Destination.LINK: if packet.destination.status == RNS.Link.CLOSED: should_transmit = False if interface != packet.destination.attached_interface: should_transmit = False if packet.attached_interface != None and interface != packet.attached_interface: should_transmit = False if packet.packet_type == RNS.Packet.ANNOUNCE: if packet.attached_interface == None: if interface.mode == RNS.Interfaces.Interface.Interface.MODE_ACCESS_POINT: RNS.log("Blocking announce broadcast on "+str(interface)+" due to AP mode", RNS.LOG_EXTREME) should_transmit = False elif interface.mode == RNS.Interfaces.Interface.Interface.MODE_ROAMING: local_destination = next((d for d in Transport.destinations if d.hash == packet.destination_hash), None) if local_destination != None: # RNS.log("Allowing announce broadcast on roaming-mode interface from instance-local destination", RNS.LOG_EXTREME) pass else: from_interface = Transport.next_hop_interface(packet.destination_hash) if from_interface == None or not hasattr(from_interface, "mode"): should_transmit = False if from_interface == None: RNS.log("Blocking announce broadcast on "+str(interface)+" since next hop interface doesn't exist", RNS.LOG_EXTREME) elif not hasattr(from_interface, "mode"): RNS.log("Blocking announce broadcast on "+str(interface)+" since next hop interface has no mode configured", RNS.LOG_EXTREME) else: if from_interface.mode == RNS.Interfaces.Interface.Interface.MODE_ROAMING: RNS.log("Blocking announce broadcast on "+str(interface)+" due to roaming-mode next-hop interface", RNS.LOG_EXTREME) should_transmit = False elif from_interface.mode == RNS.Interfaces.Interface.Interface.MODE_BOUNDARY: RNS.log("Blocking announce broadcast on "+str(interface)+" due to boundary-mode next-hop interface", RNS.LOG_EXTREME) should_transmit = False elif interface.mode == RNS.Interfaces.Interface.Interface.MODE_BOUNDARY: local_destination = next((d for d in Transport.destinations if d.hash == packet.destination_hash), None) if local_destination != None: # RNS.log("Allowing announce broadcast on boundary-mode interface from instance-local destination", RNS.LOG_EXTREME) pass else: from_interface = Transport.next_hop_interface(packet.destination_hash) if from_interface == None or not hasattr(from_interface, "mode"): should_transmit = False if from_interface == None: RNS.log("Blocking announce broadcast on "+str(interface)+" since next hop interface doesn't exist", RNS.LOG_EXTREME) elif not hasattr(from_interface, "mode"): RNS.log("Blocking announce broadcast on "+str(interface)+" since next hop interface has no mode configured", RNS.LOG_EXTREME) else: if from_interface.mode == RNS.Interfaces.Interface.Interface.MODE_ROAMING: RNS.log("Blocking announce broadcast on "+str(interface)+" due to roaming-mode next-hop interface", RNS.LOG_EXTREME) should_transmit = False else: # Currently, annouces originating locally are always # allowed, and do not conform to bandwidth caps. # TODO: Rethink whether this is actually optimal. if packet.hops > 0: if not hasattr(interface, "announce_cap"): interface.announce_cap = RNS.Reticulum.ANNOUNCE_CAP if not hasattr(interface, "announce_allowed_at"): interface.announce_allowed_at = 0 if not hasattr(interface, "announce_queue"): interface.announce_queue = [] queued_announces = True if len(interface.announce_queue) > 0 else False if not queued_announces and outbound_time > interface.announce_allowed_at and interface.bitrate != None and interface.bitrate != 0: tx_time = (len(packet.raw)*8) / interface.bitrate wait_time = (tx_time / interface.announce_cap) interface.announce_allowed_at = outbound_time + wait_time else: should_transmit = False if not len(interface.announce_queue) >= RNS.Reticulum.MAX_QUEUED_ANNOUNCES: should_queue = True already_queued = False for e in interface.announce_queue: if e["destination"] == packet.destination_hash: already_queued = True existing_entry = e emission_timestamp = Transport.announce_emitted(packet) if already_queued: should_queue = False if emission_timestamp > existing_entry["emitted"]: e["time"] = outbound_time e["hops"] = packet.hops e["emitted"] = emission_timestamp e["raw"] = packet.raw if should_queue: entry = { "destination": packet.destination_hash, "time": outbound_time, "hops": packet.hops, "emitted": Transport.announce_emitted(packet), "raw": packet.raw } queued_announces = True if len(interface.announce_queue) > 0 else False interface.announce_queue.append(entry) if not queued_announces: wait_time = max(interface.announce_allowed_at - time.time(), 0) timer = threading.Timer(wait_time, interface.process_announce_queue) timer.start() if wait_time < 1: wait_time_str = str(round(wait_time*1000,2))+"ms" else: wait_time_str = str(round(wait_time*1,2))+"s" ql_str = str(len(interface.announce_queue)) RNS.log("Added announce to queue (height "+ql_str+") on "+str(interface)+" for processing in "+wait_time_str, RNS.LOG_EXTREME) else: wait_time = max(interface.announce_allowed_at - time.time(), 0) if wait_time < 1: wait_time_str = str(round(wait_time*1000,2))+"ms" else: wait_time_str = str(round(wait_time*1,2))+"s" ql_str = str(len(interface.announce_queue)) RNS.log("Added announce to queue (height "+ql_str+") on "+str(interface)+" for processing in "+wait_time_str, RNS.LOG_EXTREME) else: pass else: pass if should_transmit: if not stored_hash: Transport.packet_hashlist.append(packet.packet_hash) stored_hash = True # TODO: Re-evaluate potential for blocking # def send_packet(): # Transport.transmit(interface, packet.raw) # thread = threading.Thread(target=send_packet) # thread.daemon = True # thread.start() Transport.transmit(interface, packet.raw) if packet.packet_type == RNS.Packet.ANNOUNCE: interface.sent_announce() packet_sent(packet) sent = True Transport.jobs_locked = False return sent @staticmethod def packet_filter(packet): # TODO: Think long and hard about this. # Is it even strictly necessary with the current # transport rules? # Filter packets intended for other transport instances if packet.transport_id != None and packet.packet_type != RNS.Packet.ANNOUNCE: if packet.transport_id != Transport.identity.hash: RNS.log("Ignored packet "+RNS.prettyhexrep(packet.packet_hash)+" in transport for other transport instance", RNS.LOG_EXTREME) return False if packet.context == RNS.Packet.KEEPALIVE: return True if packet.context == RNS.Packet.RESOURCE_REQ: return True if packet.context == RNS.Packet.RESOURCE_PRF: return True if packet.context == RNS.Packet.RESOURCE: return True if packet.context == RNS.Packet.CACHE_REQUEST: return True if packet.context == RNS.Packet.CHANNEL: return True if packet.destination_type == RNS.Destination.PLAIN: if packet.packet_type != RNS.Packet.ANNOUNCE: if packet.hops > 1: RNS.log("Dropped PLAIN packet "+RNS.prettyhexrep(packet.hash)+" with "+str(packet.hops)+" hops", RNS.LOG_DEBUG) return False else: return True else: RNS.log("Dropped invalid PLAIN announce packet", RNS.LOG_DEBUG) return False if packet.destination_type == RNS.Destination.GROUP: if packet.packet_type != RNS.Packet.ANNOUNCE: if packet.hops > 1: RNS.log("Dropped GROUP packet "+RNS.prettyhexrep(packet.hash)+" with "+str(packet.hops)+" hops", RNS.LOG_DEBUG) return False else: return True else: RNS.log("Dropped invalid GROUP announce packet", RNS.LOG_DEBUG) return False if not packet.packet_hash in Transport.packet_hashlist: return True else: if packet.packet_type == RNS.Packet.ANNOUNCE: if packet.destination_type == RNS.Destination.SINGLE: return True else: RNS.log("Dropped invalid announce packet", RNS.LOG_DEBUG) return False RNS.log("Filtered packet with hash "+RNS.prettyhexrep(packet.packet_hash), RNS.LOG_EXTREME) return False @staticmethod def inbound(raw, interface=None): # If interface access codes are enabled, # we must authenticate each packet. if len(raw) > 2: if interface != None and hasattr(interface, "ifac_identity") and interface.ifac_identity != None: # Check that IFAC flag is set if raw[0] & 0x80 == 0x80: if len(raw) > 2+interface.ifac_size: # Extract IFAC ifac = raw[2:2+interface.ifac_size] # Generate mask mask = RNS.Cryptography.hkdf( length=len(raw), derive_from=ifac, salt=interface.ifac_key, context=None, ) # Unmask payload i = 0; unmasked_raw = b"" for byte in raw: if i <= 1 or i > interface.ifac_size+1: # Unmask header bytes and payload unmasked_raw += bytes([byte ^ mask[i]]) else: # Don't unmask IFAC itself unmasked_raw += bytes([byte]) i += 1 raw = unmasked_raw # Unset IFAC flag new_header = bytes([raw[0] & 0x7f, raw[1]]) # Re-assemble packet new_raw = new_header+raw[2+interface.ifac_size:] # Calculate expected IFAC expected_ifac = interface.ifac_identity.sign(new_raw)[-interface.ifac_size:] # Check it if ifac == expected_ifac: raw = new_raw else: return else: return else: # If the IFAC flag is not set, but should be, # drop the packet. return else: # If the interface does not have IFAC enabled, # check the received packet IFAC flag. if raw[0] & 0x80 == 0x80: # If the flag is set, drop the packet return else: return while (Transport.jobs_running): sleep(0.0005) if Transport.identity == None: return Transport.jobs_locked = True packet = RNS.Packet(None, raw) if not packet.unpack(): Transport.jobs_locked = False return packet.receiving_interface = interface packet.hops += 1 if interface != None: if hasattr(interface, "r_stat_rssi"): if interface.r_stat_rssi != None: packet.rssi = interface.r_stat_rssi if len(Transport.local_client_interfaces) > 0: Transport.local_client_rssi_cache.append([packet.packet_hash, packet.rssi]) while len(Transport.local_client_rssi_cache) > Transport.LOCAL_CLIENT_CACHE_MAXSIZE: Transport.local_client_rssi_cache.pop(0) if hasattr(interface, "r_stat_snr"): if interface.r_stat_rssi != None: packet.snr = interface.r_stat_snr if len(Transport.local_client_interfaces) > 0: Transport.local_client_snr_cache.append([packet.packet_hash, packet.snr]) while len(Transport.local_client_snr_cache) > Transport.LOCAL_CLIENT_CACHE_MAXSIZE: Transport.local_client_snr_cache.pop(0) if hasattr(interface, "r_stat_q"): if interface.r_stat_q != None: packet.q = interface.r_stat_q if len(Transport.local_client_interfaces) > 0: Transport.local_client_q_cache.append([packet.packet_hash, packet.q]) while len(Transport.local_client_q_cache) > Transport.LOCAL_CLIENT_CACHE_MAXSIZE: Transport.local_client_q_cache.pop(0) if len(Transport.local_client_interfaces) > 0: if Transport.is_local_client_interface(interface): packet.hops -= 1 elif Transport.interface_to_shared_instance(interface): packet.hops -= 1 if Transport.packet_filter(packet): # By default, remember packet hashes to avoid routing # loops in the network, using the packet filter. remember_packet_hash = True # If this packet belongs to a link in our link table, # we'll have to defer adding it to the filter list. # In some cases, we might see a packet over a shared- # medium interface, belonging to a link that transports # or terminates with this instance, but before it would # normally reach us. If the packet is appended to the # filter list at this point, link transport will break. if packet.destination_hash in Transport.link_table: remember_packet_hash = False # If this is a link request proof, don't add it until # we are sure it's not actually somewhere else in the # routing chain. if packet.packet_type == RNS.Packet.PROOF and packet.context == RNS.Packet.LRPROOF: remember_packet_hash = False if remember_packet_hash: Transport.packet_hashlist.append(packet.packet_hash) # TODO: Enable when caching has been redesigned # Transport.cache(packet) # Check special conditions for local clients connected # through a shared Reticulum instance from_local_client = (packet.receiving_interface in Transport.local_client_interfaces) for_local_client = (packet.packet_type != RNS.Packet.ANNOUNCE) and (packet.destination_hash in Transport.destination_table and Transport.destination_table[packet.destination_hash][2] == 0) for_local_client_link = (packet.packet_type != RNS.Packet.ANNOUNCE) and (packet.destination_hash in Transport.link_table and Transport.link_table[packet.destination_hash][4] in Transport.local_client_interfaces) for_local_client_link |= (packet.packet_type != RNS.Packet.ANNOUNCE) and (packet.destination_hash in Transport.link_table and Transport.link_table[packet.destination_hash][2] in Transport.local_client_interfaces) proof_for_local_client = (packet.destination_hash in Transport.reverse_table) and (Transport.reverse_table[packet.destination_hash][0] in Transport.local_client_interfaces) # Plain broadcast packets from local clients are sent # directly on all attached interfaces, since they are # never injected into transport. if not packet.destination_hash in Transport.control_hashes: if packet.destination_type == RNS.Destination.PLAIN and packet.transport_type == Transport.BROADCAST: # Send to all interfaces except the originator if from_local_client: for interface in Transport.interfaces: if interface != packet.receiving_interface: Transport.transmit(interface, packet.raw) # If the packet was not from a local client, send # it directly to all local clients else: for interface in Transport.local_client_interfaces: Transport.transmit(interface, packet.raw) # General transport handling. Takes care of directing # packets according to transport tables and recording # entries in reverse and link tables. if RNS.Reticulum.transport_enabled() or from_local_client or for_local_client or for_local_client_link: # If there is no transport id, but the packet is # for a local client, we generate the transport # id (it was stripped on the previous hop, since # we "spoof" the hop count for clients behind a # shared instance, so they look directly reach- # able), and reinsert, so the normal transport # implementation can handle the packet. if packet.transport_id == None and for_local_client: packet.transport_id = Transport.identity.hash # If this is a cache request, and we can fullfill # it, do so and stop processing. Otherwise resume # normal processing. if packet.context == RNS.Packet.CACHE_REQUEST: if Transport.cache_request_packet(packet): Transport.jobs_locked = False return # If the packet is in transport, check whether we # are the designated next hop, and process it # accordingly if we are. if packet.transport_id != None and packet.packet_type != RNS.Packet.ANNOUNCE: if packet.transport_id == Transport.identity.hash: if packet.destination_hash in Transport.destination_table: next_hop = Transport.destination_table[packet.destination_hash][1] remaining_hops = Transport.destination_table[packet.destination_hash][2] if remaining_hops > 1: # Just increase hop count and transmit new_raw = packet.raw[0:1] new_raw += struct.pack("!B", packet.hops) new_raw += next_hop new_raw += packet.raw[(RNS.Identity.TRUNCATED_HASHLENGTH//8)+2:] elif remaining_hops == 1: # Strip transport headers and transmit new_flags = (RNS.Packet.HEADER_1) << 6 | (Transport.BROADCAST) << 4 | (packet.flags & 0b00001111) new_raw = struct.pack("!B", new_flags) new_raw += struct.pack("!B", packet.hops) new_raw += packet.raw[(RNS.Identity.TRUNCATED_HASHLENGTH//8)+2:] elif remaining_hops == 0: # Just increase hop count and transmit new_raw = packet.raw[0:1] new_raw += struct.pack("!B", packet.hops) new_raw += packet.raw[2:] outbound_interface = Transport.destination_table[packet.destination_hash][5] if packet.packet_type == RNS.Packet.LINKREQUEST: now = time.time() proof_timeout = Transport.extra_link_proof_timeout(packet.receiving_interface) proof_timeout += now + RNS.Link.ESTABLISHMENT_TIMEOUT_PER_HOP * max(1, remaining_hops) # Entry format is link_entry = [ now, # 0: Timestamp, next_hop, # 1: Next-hop transport ID outbound_interface, # 2: Next-hop interface remaining_hops, # 3: Remaining hops packet.receiving_interface, # 4: Received on interface packet.hops, # 5: Taken hops packet.destination_hash, # 6: Original destination hash False, # 7: Validated proof_timeout] # 8: Proof timeout timestamp Transport.link_table[packet.getTruncatedHash()] = link_entry else: # Entry format is reverse_entry = [ packet.receiving_interface, # 0: Received on interface outbound_interface, # 1: Outbound interface time.time()] # 2: Timestamp Transport.reverse_table[packet.getTruncatedHash()] = reverse_entry Transport.transmit(outbound_interface, new_raw) Transport.destination_table[packet.destination_hash][0] = time.time() else: # TODO: There should probably be some kind of REJECT # mechanism here, to signal to the source that their # expected path failed. RNS.log("Got packet in transport, but no known path to final destination "+RNS.prettyhexrep(packet.destination_hash)+". Dropping packet.", RNS.LOG_EXTREME) # Link transport handling. Directs packets according # to entries in the link tables if packet.packet_type != RNS.Packet.ANNOUNCE and packet.packet_type != RNS.Packet.LINKREQUEST and packet.context != RNS.Packet.LRPROOF: if packet.destination_hash in Transport.link_table: link_entry = Transport.link_table[packet.destination_hash] # If receiving and outbound interface is # the same for this link, direction doesn't # matter, and we simply repeat the packet. outbound_interface = None if link_entry[2] == link_entry[4]: # But check that taken hops matches one # of the expectede values. if packet.hops == link_entry[3] or packet.hops == link_entry[5]: outbound_interface = link_entry[2] else: # If interfaces differ, we transmit on # the opposite interface of what the # packet was received on. if packet.receiving_interface == link_entry[2]: # Also check that expected hop count matches if packet.hops == link_entry[3]: outbound_interface = link_entry[4] elif packet.receiving_interface == link_entry[4]: # Also check that expected hop count matches if packet.hops == link_entry[5]: outbound_interface = link_entry[2] if outbound_interface != None: # Add this packet to the filter hashlist if we # have determined that it's actually our turn # to process it. Transport.packet_hashlist.append(packet.packet_hash) new_raw = packet.raw[0:1] new_raw += struct.pack("!B", packet.hops) new_raw += packet.raw[2:] Transport.transmit(outbound_interface, new_raw) Transport.link_table[packet.destination_hash][0] = time.time() else: pass # Announce handling. Handles logic related to incoming # announces, queueing rebroadcasts of these, and removal # of queued announce rebroadcasts once handed to the next node. if packet.packet_type == RNS.Packet.ANNOUNCE: if interface != None and RNS.Identity.validate_announce(packet, only_validate_signature=True): interface.received_announce() if not packet.destination_hash in Transport.destination_table: # This is an unknown destination, and we'll apply # potential ingress limiting. Already known # destinations will have re-announces controlled # by normal announce rate limiting. if interface.should_ingress_limit(): interface.hold_announce(packet) Transport.jobs_locked = False return local_destination = next((d for d in Transport.destinations if d.hash == packet.destination_hash), None) if local_destination == None and RNS.Identity.validate_announce(packet): if packet.transport_id != None: received_from = packet.transport_id # Check if this is a next retransmission from # another node. If it is, we're removing the # announce in question from our pending table if RNS.Reticulum.transport_enabled() and packet.destination_hash in Transport.announce_table: announce_entry = Transport.announce_table[packet.destination_hash] if packet.hops-1 == announce_entry[4]: RNS.log("Heard a local rebroadcast of announce for "+RNS.prettyhexrep(packet.destination_hash), RNS.LOG_DEBUG) announce_entry[6] += 1 if announce_entry[6] >= Transport.LOCAL_REBROADCASTS_MAX: RNS.log("Max local rebroadcasts of announce for "+RNS.prettyhexrep(packet.destination_hash)+" reached, dropping announce from our table", RNS.LOG_DEBUG) if packet.destination_hash in Transport.announce_table: Transport.announce_table.pop(packet.destination_hash) if packet.hops-1 == announce_entry[4]+1 and announce_entry[2] > 0: now = time.time() if now < announce_entry[1]: RNS.log("Rebroadcasted announce for "+RNS.prettyhexrep(packet.destination_hash)+" has been passed on to another node, no further tries needed", RNS.LOG_DEBUG) Transport.announce_table.pop(packet.destination_hash) else: received_from = packet.destination_hash # Check if this announce should be inserted into # announce and destination tables should_add = False # First, check that the announce is not for a destination # local to this system, and that hops are less than the max if (not any(packet.destination_hash == d.hash for d in Transport.destinations) and packet.hops < Transport.PATHFINDER_M+1): announce_emitted = Transport.announce_emitted(packet) random_blob = packet.data[RNS.Identity.KEYSIZE//8+RNS.Identity.NAME_HASH_LENGTH//8:RNS.Identity.KEYSIZE//8+RNS.Identity.NAME_HASH_LENGTH//8+10] random_blobs = [] if packet.destination_hash in Transport.destination_table: random_blobs = Transport.destination_table[packet.destination_hash][4] # If we already have a path to the announced # destination, but the hop count is equal or # less, we'll update our tables. if packet.hops <= Transport.destination_table[packet.destination_hash][2]: # Make sure we haven't heard the random # blob before, so announces can't be # replayed to forge paths. # TODO: Check whether this approach works # under all circumstances if not random_blob in random_blobs: Transport.mark_path_unknown_state(packet.destination_hash) should_add = True else: should_add = False else: # If an announce arrives with a larger hop # count than we already have in the table, # ignore it, unless the path is expired, or # the emission timestamp is more recent. now = time.time() path_expires = Transport.destination_table[packet.destination_hash][3] path_announce_emitted = 0 for path_random_blob in random_blobs: path_announce_emitted = max(path_announce_emitted, int.from_bytes(path_random_blob[5:10], "big")) if path_announce_emitted >= announce_emitted: break # If the path has expired, consider this # announce for adding to the path table. if (now >= path_expires): # We check that the announce is # different from ones we've already heard, # to avoid loops in the network if not random_blob in random_blobs: # TODO: Check that this ^ approach actually # works under all circumstances RNS.log("Replacing destination table entry for "+str(RNS.prettyhexrep(packet.destination_hash))+" with new announce due to expired path", RNS.LOG_DEBUG) Transport.mark_path_unknown_state(packet.destination_hash) should_add = True else: should_add = False else: # If the path is not expired, but the emission # is more recent, and we haven't already heard # this announce before, update the path table. if (announce_emitted > path_announce_emitted): if not random_blob in random_blobs: RNS.log("Replacing destination table entry for "+str(RNS.prettyhexrep(packet.destination_hash))+" with new announce, since it was more recently emitted", RNS.LOG_DEBUG) Transport.mark_path_unknown_state(packet.destination_hash) should_add = True else: should_add = False # If we have already heard this announce before, # but the path has been marked as unresponsive # by a failed communications attempt or similar, # allow updating the path table to this one. elif announce_emitted == path_announce_emitted: if Transport.path_is_unresponsive(packet.destination_hash): RNS.log("Replacing destination table entry for "+str(RNS.prettyhexrep(packet.destination_hash))+" with new announce, since previously tried path was unresponsive", RNS.LOG_DEBUG) should_add = True else: should_add = False else: # If this destination is unknown in our table # we should add it should_add = True if should_add: now = time.time() rate_blocked = False if packet.context != RNS.Packet.PATH_RESPONSE and packet.receiving_interface.announce_rate_target != None: if not packet.destination_hash in Transport.announce_rate_table: rate_entry = { "last": now, "rate_violations": 0, "blocked_until": 0, "timestamps": [now]} Transport.announce_rate_table[packet.destination_hash] = rate_entry else: rate_entry = Transport.announce_rate_table[packet.destination_hash] rate_entry["timestamps"].append(now) while len(rate_entry["timestamps"]) > Transport.MAX_RATE_TIMESTAMPS: rate_entry["timestamps"].pop(0) current_rate = now - rate_entry["last"] if now > rate_entry["blocked_until"]: if current_rate < packet.receiving_interface.announce_rate_target: rate_entry["rate_violations"] += 1 else: rate_entry["rate_violations"] = max(0, rate_entry["rate_violations"]-1) if rate_entry["rate_violations"] > packet.receiving_interface.announce_rate_grace: rate_target = packet.receiving_interface.announce_rate_target rate_penalty = packet.receiving_interface.announce_rate_penalty rate_entry["blocked_until"] = rate_entry["last"] + rate_target + rate_penalty rate_blocked = True else: rate_entry["last"] = now else: rate_blocked = True retries = 0 announce_hops = packet.hops local_rebroadcasts = 0 block_rebroadcasts = False attached_interface = None retransmit_timeout = now + (RNS.rand() * Transport.PATHFINDER_RW) if hasattr(packet.receiving_interface, "mode") and packet.receiving_interface.mode == RNS.Interfaces.Interface.Interface.MODE_ACCESS_POINT: expires = now + Transport.AP_PATH_TIME elif hasattr(packet.receiving_interface, "mode") and packet.receiving_interface.mode == RNS.Interfaces.Interface.Interface.MODE_ROAMING: expires = now + Transport.ROAMING_PATH_TIME else: expires = now + Transport.PATHFINDER_E random_blobs.append(random_blob) random_blobs = random_blobs[-Transport.MAX_RANDOM_BLOBS:] if (RNS.Reticulum.transport_enabled() or Transport.from_local_client(packet)) and packet.context != RNS.Packet.PATH_RESPONSE: # Insert announce into announce table for retransmission if rate_blocked: RNS.log("Blocking rebroadcast of announce from "+RNS.prettyhexrep(packet.destination_hash)+" due to excessive announce rate", RNS.LOG_DEBUG) else: if Transport.from_local_client(packet): # If the announce is from a local client, # it is announced immediately, but only one time. retransmit_timeout = now retries = Transport.PATHFINDER_R Transport.announce_table[packet.destination_hash] = [ now, retransmit_timeout, retries, received_from, announce_hops, packet, local_rebroadcasts, block_rebroadcasts, attached_interface ] # TODO: Check from_local_client once and store result elif Transport.from_local_client(packet) and packet.context == RNS.Packet.PATH_RESPONSE: # If this is a path response from a local client, # check if any external interfaces have pending # path requests. if packet.destination_hash in Transport.pending_local_path_requests: desiring_interface = Transport.pending_local_path_requests.pop(packet.destination_hash) retransmit_timeout = now retries = Transport.PATHFINDER_R Transport.announce_table[packet.destination_hash] = [ now, retransmit_timeout, retries, received_from, announce_hops, packet, local_rebroadcasts, block_rebroadcasts, attached_interface ] # If we have any local clients connected, we re- # transmit the announce to them immediately if (len(Transport.local_client_interfaces)): announce_identity = RNS.Identity.recall(packet.destination_hash) announce_destination = RNS.Destination(announce_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, "unknown", "unknown"); announce_destination.hash = packet.destination_hash announce_destination.hexhash = announce_destination.hash.hex() announce_context = RNS.Packet.NONE announce_data = packet.data # TODO: Shouldn't the context be PATH_RESPONSE in the first case here? if Transport.from_local_client(packet) and packet.context == RNS.Packet.PATH_RESPONSE: for local_interface in Transport.local_client_interfaces: if packet.receiving_interface != local_interface: new_announce = RNS.Packet( announce_destination, announce_data, RNS.Packet.ANNOUNCE, context = announce_context, header_type = RNS.Packet.HEADER_2, transport_type = Transport.TRANSPORT, transport_id = Transport.identity.hash, attached_interface = local_interface ) new_announce.hops = packet.hops new_announce.send() else: for local_interface in Transport.local_client_interfaces: if packet.receiving_interface != local_interface: new_announce = RNS.Packet( announce_destination, announce_data, RNS.Packet.ANNOUNCE, context = announce_context, header_type = RNS.Packet.HEADER_2, transport_type = Transport.TRANSPORT, transport_id = Transport.identity.hash, attached_interface = local_interface ) new_announce.hops = packet.hops new_announce.send() # If we have any waiting discovery path requests # for this destination, we retransmit to that # interface immediately if packet.destination_hash in Transport.discovery_path_requests: pr_entry = Transport.discovery_path_requests[packet.destination_hash] attached_interface = pr_entry["requesting_interface"] interface_str = " on "+str(attached_interface) RNS.log("Got matching announce, answering waiting discovery path request for "+RNS.prettyhexrep(packet.destination_hash)+interface_str, RNS.LOG_DEBUG) announce_identity = RNS.Identity.recall(packet.destination_hash) announce_destination = RNS.Destination(announce_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, "unknown", "unknown"); announce_destination.hash = packet.destination_hash announce_destination.hexhash = announce_destination.hash.hex() announce_context = RNS.Packet.NONE announce_data = packet.data new_announce = RNS.Packet( announce_destination, announce_data, RNS.Packet.ANNOUNCE, context = RNS.Packet.PATH_RESPONSE, header_type = RNS.Packet.HEADER_2, transport_type = Transport.TRANSPORT, transport_id = Transport.identity.hash, attached_interface = attached_interface ) new_announce.hops = packet.hops new_announce.send() destination_table_entry = [now, received_from, announce_hops, expires, random_blobs, packet.receiving_interface, packet] Transport.destination_table[packet.destination_hash] = destination_table_entry RNS.log("Destination "+RNS.prettyhexrep(packet.destination_hash)+" is now "+str(announce_hops)+" hops away via "+RNS.prettyhexrep(received_from)+" on "+str(packet.receiving_interface), RNS.LOG_DEBUG) # If the receiving interface is a tunnel, we add the # announce to the tunnels table if hasattr(packet.receiving_interface, "tunnel_id") and packet.receiving_interface.tunnel_id != None: tunnel_entry = Transport.tunnels[packet.receiving_interface.tunnel_id] paths = tunnel_entry[2] paths[packet.destination_hash] = destination_table_entry expires = time.time() + Transport.DESTINATION_TIMEOUT tunnel_entry[3] = expires RNS.log("Path to "+RNS.prettyhexrep(packet.destination_hash)+" associated with tunnel "+RNS.prettyhexrep(packet.receiving_interface.tunnel_id), RNS.LOG_DEBUG) # Call externally registered callbacks from apps # wanting to know when an announce arrives if packet.context != RNS.Packet.PATH_RESPONSE: for handler in Transport.announce_handlers: try: # Check that the announced destination matches # the handlers aspect filter execute_callback = False announce_identity = RNS.Identity.recall(packet.destination_hash) if handler.aspect_filter == None: # If the handlers aspect filter is set to # None, we execute the callback in all cases execute_callback = True else: handler_expected_hash = RNS.Destination.hash_from_name_and_identity(handler.aspect_filter, announce_identity) if packet.destination_hash == handler_expected_hash: execute_callback = True if execute_callback: handler.received_announce( destination_hash=packet.destination_hash, announced_identity=announce_identity, app_data=RNS.Identity.recall_app_data(packet.destination_hash) ) except Exception as e: RNS.log("Error while processing external announce callback.", RNS.LOG_ERROR) RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR) # Handling for link requests to local destinations elif packet.packet_type == RNS.Packet.LINKREQUEST: if packet.transport_id == None or packet.transport_id == Transport.identity.hash: for destination in Transport.destinations: if destination.hash == packet.destination_hash and destination.type == packet.destination_type: packet.destination = destination destination.receive(packet) # Handling for local data packets elif packet.packet_type == RNS.Packet.DATA: if packet.destination_type == RNS.Destination.LINK: for link in Transport.active_links: if link.link_id == packet.destination_hash: packet.link = link link.receive(packet) else: for destination in Transport.destinations: if destination.hash == packet.destination_hash and destination.type == packet.destination_type: packet.destination = destination destination.receive(packet) if destination.proof_strategy == RNS.Destination.PROVE_ALL: packet.prove() elif destination.proof_strategy == RNS.Destination.PROVE_APP: if destination.callbacks.proof_requested: try: if destination.callbacks.proof_requested(packet): packet.prove() except Exception as e: RNS.log("Error while executing proof request callback. The contained exception was: "+str(e), RNS.LOG_ERROR) # Handling for proofs and link-request proofs elif packet.packet_type == RNS.Packet.PROOF: if packet.context == RNS.Packet.LRPROOF: # This is a link request proof, check if it # needs to be transported if (RNS.Reticulum.transport_enabled() or for_local_client_link or from_local_client) and packet.destination_hash in Transport.link_table: link_entry = Transport.link_table[packet.destination_hash] if packet.hops == link_entry[3]: if packet.receiving_interface == link_entry[2]: try: if len(packet.data) == RNS.Identity.SIGLENGTH//8+RNS.Link.ECPUBSIZE//2: peer_pub_bytes = packet.data[RNS.Identity.SIGLENGTH//8:RNS.Identity.SIGLENGTH//8+RNS.Link.ECPUBSIZE//2] peer_identity = RNS.Identity.recall(link_entry[6]) peer_sig_pub_bytes = peer_identity.get_public_key()[RNS.Link.ECPUBSIZE//2:RNS.Link.ECPUBSIZE] signed_data = packet.destination_hash+peer_pub_bytes+peer_sig_pub_bytes signature = packet.data[:RNS.Identity.SIGLENGTH//8] if peer_identity.validate(signature, signed_data): RNS.log("Link request proof validated for transport via "+str(link_entry[4]), RNS.LOG_EXTREME) new_raw = packet.raw[0:1] new_raw += struct.pack("!B", packet.hops) new_raw += packet.raw[2:] Transport.link_table[packet.destination_hash][7] = True Transport.transmit(link_entry[4], new_raw) else: RNS.log("Invalid link request proof in transport for link "+RNS.prettyhexrep(packet.destination_hash)+", dropping proof.", RNS.LOG_DEBUG) except Exception as e: RNS.log("Error while transporting link request proof. The contained exception was: "+str(e), RNS.LOG_ERROR) else: RNS.log("Link request proof received on wrong interface, not transporting it.", RNS.LOG_DEBUG) else: RNS.log("Received link request proof with hop mismatch, not transporting it", RNS.LOG_DEBUG) else: # Check if we can deliver it to a local # pending link for link in Transport.pending_links: if link.link_id == packet.destination_hash: if packet.hops == link.expected_hops: # Add this packet to the filter hashlist if we # have determined that it's actually destined # for this system, and then validate the proof Transport.packet_hashlist.append(packet.packet_hash) link.validate_proof(packet) elif packet.context == RNS.Packet.RESOURCE_PRF: for link in Transport.active_links: if link.link_id == packet.destination_hash: link.receive(packet) else: if packet.destination_type == RNS.Destination.LINK: for link in Transport.active_links: if link.link_id == packet.destination_hash: packet.link = link if len(packet.data) == RNS.PacketReceipt.EXPL_LENGTH: proof_hash = packet.data[:RNS.Identity.HASHLENGTH//8] else: proof_hash = None # Check if this proof needs to be transported if (RNS.Reticulum.transport_enabled() or from_local_client or proof_for_local_client) and packet.destination_hash in Transport.reverse_table: reverse_entry = Transport.reverse_table.pop(packet.destination_hash) if packet.receiving_interface == reverse_entry[1]: RNS.log("Proof received on correct interface, transporting it via "+str(reverse_entry[0]), RNS.LOG_EXTREME) new_raw = packet.raw[0:1] new_raw += struct.pack("!B", packet.hops) new_raw += packet.raw[2:] Transport.transmit(reverse_entry[0], new_raw) else: RNS.log("Proof received on wrong interface, not transporting it.", RNS.LOG_DEBUG) for receipt in Transport.receipts: receipt_validated = False if proof_hash != None: # Only test validation if hash matches if receipt.hash == proof_hash: receipt_validated = receipt.validate_proof_packet(packet) else: # In case of an implicit proof, we have # to check every single outstanding receipt receipt_validated = receipt.validate_proof_packet(packet) if receipt_validated: if receipt in Transport.receipts: Transport.receipts.remove(receipt) Transport.jobs_locked = False @staticmethod def synthesize_tunnel(interface): interface_hash = interface.get_hash() public_key = RNS.Transport.identity.get_public_key() random_hash = RNS.Identity.get_random_hash() tunnel_id_data = public_key+interface_hash tunnel_id = RNS.Identity.full_hash(tunnel_id_data) signed_data = tunnel_id_data+random_hash signature = Transport.identity.sign(signed_data) data = signed_data+signature tnl_snth_dst = RNS.Destination(None, RNS.Destination.OUT, RNS.Destination.PLAIN, Transport.APP_NAME, "tunnel", "synthesize") packet = RNS.Packet(tnl_snth_dst, data, packet_type = RNS.Packet.DATA, transport_type = RNS.Transport.BROADCAST, header_type = RNS.Packet.HEADER_1, attached_interface = interface) packet.send() interface.wants_tunnel = False @staticmethod def tunnel_synthesize_handler(data, packet): try: expected_length = RNS.Identity.KEYSIZE//8+RNS.Identity.HASHLENGTH//8+RNS.Reticulum.TRUNCATED_HASHLENGTH//8+RNS.Identity.SIGLENGTH//8 if len(data) == expected_length: public_key = data[:RNS.Identity.KEYSIZE//8] interface_hash = data[RNS.Identity.KEYSIZE//8:RNS.Identity.KEYSIZE//8+RNS.Identity.HASHLENGTH//8] tunnel_id_data = public_key+interface_hash tunnel_id = RNS.Identity.full_hash(tunnel_id_data) random_hash = data[RNS.Identity.KEYSIZE//8+RNS.Identity.HASHLENGTH//8:RNS.Identity.KEYSIZE//8+RNS.Identity.HASHLENGTH//8+RNS.Reticulum.TRUNCATED_HASHLENGTH//8] signature = data[RNS.Identity.KEYSIZE//8+RNS.Identity.HASHLENGTH//8+RNS.Reticulum.TRUNCATED_HASHLENGTH//8:expected_length] signed_data = tunnel_id_data+random_hash remote_transport_identity = RNS.Identity(create_keys=False) remote_transport_identity.load_public_key(public_key) if remote_transport_identity.validate(signature, signed_data): Transport.handle_tunnel(tunnel_id, packet.receiving_interface) except Exception as e: RNS.log("An error occurred while validating tunnel establishment packet.", RNS.LOG_DEBUG) RNS.log("The contained exception was: "+str(e), RNS.LOG_DEBUG) @staticmethod def handle_tunnel(tunnel_id, interface): expires = time.time() + Transport.DESTINATION_TIMEOUT if not tunnel_id in Transport.tunnels: RNS.log("Tunnel endpoint "+RNS.prettyhexrep(tunnel_id)+" established.", RNS.LOG_DEBUG) paths = {} tunnel_entry = [tunnel_id, interface, paths, expires] interface.tunnel_id = tunnel_id Transport.tunnels[tunnel_id] = tunnel_entry else: RNS.log("Tunnel endpoint "+RNS.prettyhexrep(tunnel_id)+" reappeared. Restoring paths...", RNS.LOG_DEBUG) tunnel_entry = Transport.tunnels[tunnel_id] tunnel_entry[1] = interface tunnel_entry[3] = expires interface.tunnel_id = tunnel_id paths = tunnel_entry[2] deprecated_paths = [] for destination_hash, path_entry in paths.items(): received_from = path_entry[1] announce_hops = path_entry[2] expires = path_entry[3] random_blobs = path_entry[4] receiving_interface = interface packet = path_entry[6] new_entry = [time.time(), received_from, announce_hops, expires, random_blobs, receiving_interface, packet] should_add = False if destination_hash in Transport.destination_table: old_entry = Transport.destination_table[destination_hash] old_hops = old_entry[2] old_expires = old_entry[3] if announce_hops <= old_hops or time.time() > old_expires: should_add = True else: RNS.log("Did not restore path to "+RNS.prettyhexrep(packet.destination_hash)+" because a newer path with fewer hops exist", RNS.LOG_DEBUG) else: if time.time() < expires: should_add = True else: RNS.log("Did not restore path to "+RNS.prettyhexrep(packet.destination_hash)+" because it has expired", RNS.LOG_DEBUG) if should_add: Transport.destination_table[destination_hash] = new_entry RNS.log("Restored path to "+RNS.prettyhexrep(packet.destination_hash)+" is now "+str(announce_hops)+" hops away via "+RNS.prettyhexrep(received_from)+" on "+str(receiving_interface), RNS.LOG_DEBUG) else: deprecated_paths.append(destination_hash) for deprecated_path in deprecated_paths: RNS.log("Removing path to "+RNS.prettyhexrep(deprecated_path)+" from tunnel "+RNS.prettyhexrep(tunnel_id), RNS.LOG_DEBUG) paths.pop(deprecated_path) @staticmethod def register_destination(destination): destination.MTU = RNS.Reticulum.MTU if destination.direction == RNS.Destination.IN: for registered_destination in Transport.destinations: if destination.hash == registered_destination.hash: raise KeyError("Attempt to register an already registered destination.") Transport.destinations.append(destination) if Transport.owner.is_connected_to_shared_instance: if destination.type == RNS.Destination.SINGLE: destination.announce(path_response=True) @staticmethod def deregister_destination(destination): if destination in Transport.destinations: Transport.destinations.remove(destination) @staticmethod def register_link(link): RNS.log("Registering link "+str(link), RNS.LOG_EXTREME) if link.initiator: Transport.pending_links.append(link) else: Transport.active_links.append(link) @staticmethod def activate_link(link): RNS.log("Activating link "+str(link), RNS.LOG_EXTREME) if link in Transport.pending_links: if link.status != RNS.Link.ACTIVE: raise IOError("Invalid link state for link activation: "+str(link.status)) Transport.pending_links.remove(link) Transport.active_links.append(link) link.status = RNS.Link.ACTIVE else: RNS.log("Attempted to activate a link that was not in the pending table", RNS.LOG_ERROR) @staticmethod def register_announce_handler(handler): """ Registers an announce handler. :param handler: Must be an object with an *aspect_filter* attribute and a *received_announce(destination_hash, announced_identity, app_data)* callable. See the :ref:`Announce Example` for more info. """ if hasattr(handler, "received_announce") and callable(handler.received_announce): if hasattr(handler, "aspect_filter"): Transport.announce_handlers.append(handler) @staticmethod def deregister_announce_handler(handler): """ Deregisters an announce handler. :param handler: The announce handler to be deregistered. """ while handler in Transport.announce_handlers: Transport.announce_handlers.remove(handler) @staticmethod def find_interface_from_hash(interface_hash): for interface in Transport.interfaces: if interface.get_hash() == interface_hash: return interface return None @staticmethod def should_cache(packet): # TODO: Rework the caching system. It's currently # not very useful to even cache Resource proofs, # disabling it for now, until redesigned. # if packet.context == RNS.Packet.RESOURCE_PRF: # return True return False # When caching packets to storage, they are written # exactly as they arrived over their interface. This # means that they have not had their hop count # increased yet! Take note of this when reading from # the packet cache. @staticmethod def cache(packet, force_cache=False): if RNS.Transport.should_cache(packet) or force_cache: try: packet_hash = RNS.hexrep(packet.get_hash(), delimit=False) interface_reference = None if packet.receiving_interface != None: interface_reference = str(packet.receiving_interface) file = open(RNS.Reticulum.cachepath+"/"+packet_hash, "wb") file.write(umsgpack.packb([packet.raw, interface_reference])) file.close() except Exception as e: RNS.log("Error writing packet to cache. The contained exception was: "+str(e), RNS.LOG_ERROR) @staticmethod def get_cached_packet(packet_hash): try: packet_hash = RNS.hexrep(packet_hash, delimit=False) path = RNS.Reticulum.cachepath+"/"+packet_hash if os.path.isfile(path): file = open(path, "rb") cached_data = umsgpack.unpackb(file.read()) file.close() packet = RNS.Packet(None, cached_data[0]) interface_reference = cached_data[1] for interface in Transport.interfaces: if str(interface) == interface_reference: packet.receiving_interface = interface return packet else: return None except Exception as e: RNS.log("Exception occurred while getting cached packet.", RNS.LOG_ERROR) RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR) @staticmethod def cache_request_packet(packet): if len(packet.data) == RNS.Identity.HASHLENGTH/8: packet = Transport.get_cached_packet(packet.data) if packet != None: # If the packet was retrieved from the local # cache, replay it to the Transport instance, # so that it can be directed towards it original # destination. Transport.inbound(packet.raw, packet.receiving_interface) return True else: return False else: return False @staticmethod def cache_request(packet_hash, destination): cached_packet = Transport.get_cached_packet(packet_hash) if cached_packet: # The packet was found in the local cache, # replay it to the Transport instance. Transport.inbound(packet.raw, packet.receiving_interface) else: # The packet is not in the local cache, # query the network. RNS.Packet(destination, packet_hash, context = RNS.Packet.CACHE_REQUEST).send() @staticmethod def has_path(destination_hash): """ :param destination_hash: A destination hash as *bytes*. :returns: *True* if a path to the destination is known, otherwise *False*. """ if destination_hash in Transport.destination_table: return True else: return False @staticmethod def hops_to(destination_hash): """ :param destination_hash: A destination hash as *bytes*. :returns: The number of hops to the specified destination, or ``RNS.Transport.PATHFINDER_M`` if the number of hops is unknown. """ if destination_hash in Transport.destination_table: return Transport.destination_table[destination_hash][2] else: return Transport.PATHFINDER_M @staticmethod def next_hop(destination_hash): """ :param destination_hash: A destination hash as *bytes*. :returns: The destination hash as *bytes* for the next hop to the specified destination, or *None* if the next hop is unknown. """ if destination_hash in Transport.destination_table: return Transport.destination_table[destination_hash][1] else: return None @staticmethod def next_hop_interface(destination_hash): """ :param destination_hash: A destination hash as *bytes*. :returns: The interface for the next hop to the specified destination, or *None* if the interface is unknown. """ if destination_hash in Transport.destination_table: return Transport.destination_table[destination_hash][5] else: return None @staticmethod def next_hop_interface_bitrate(destination_hash): next_hop_interface = Transport.next_hop_interface(destination_hash) if next_hop_interface != None: return next_hop_interface.bitrate else: return None @staticmethod def next_hop_per_bit_latency(destination_hash): next_hop_interface_bitrate = Transport.next_hop_interface_bitrate(destination_hash) if next_hop_interface_bitrate != None: return (1/next_hop_interface_bitrate) else: return None @staticmethod def next_hop_per_byte_latency(destination_hash): per_bit_latency = Transport.next_hop_per_bit_latency(destination_hash) if per_bit_latency != None: return per_bit_latency*8 else: return None @staticmethod def first_hop_timeout(destination_hash): latency = Transport.next_hop_per_byte_latency(destination_hash) if latency != None: return RNS.Reticulum.MTU * latency + RNS.Reticulum.DEFAULT_PER_HOP_TIMEOUT else: return RNS.Reticulum.DEFAULT_PER_HOP_TIMEOUT @staticmethod def extra_link_proof_timeout(interface): if interface != None: return ((1/interface.bitrate)*8)*RNS.Reticulum.MTU else: return 0 @staticmethod def expire_path(destination_hash): if destination_hash in Transport.destination_table: Transport.destination_table[destination_hash][0] = 0 Transport.tables_last_culled = 0 return True else: return False @staticmethod def mark_path_unresponsive(destination_hash): if destination_hash in Transport.destination_table: Transport.path_states[destination_hash] = Transport.STATE_UNRESPONSIVE return True else: return False @staticmethod def mark_path_responsive(destination_hash): if destination_hash in Transport.destination_table: Transport.path_states[destination_hash] = Transport.STATE_RESPONSIVE return True else: return False @staticmethod def mark_path_unknown_state(destination_hash): if destination_hash in Transport.destination_table: Transport.path_states[destination_hash] = Transport.STATE_UNKNOWN return True else: return False def path_is_unresponsive(destination_hash): if destination_hash in Transport.path_states: if Transport.path_states[destination_hash] == Transport.STATE_UNRESPONSIVE: return True return False @staticmethod def request_path(destination_hash, on_interface=None, tag=None, recursive=False): """ Requests a path to the destination from the network. If another reachable peer on the network knows a path, it will announce it. :param destination_hash: A destination hash as *bytes*. :param on_interface: If specified, the path request will only be sent on this interface. In normal use, Reticulum handles this automatically, and this parameter should not be used. """ if tag == None: request_tag = RNS.Identity.get_random_hash() else: request_tag = tag if RNS.Reticulum.transport_enabled(): path_request_data = destination_hash+Transport.identity.hash+request_tag else: path_request_data = destination_hash+request_tag path_request_dst = RNS.Destination(None, RNS.Destination.OUT, RNS.Destination.PLAIN, Transport.APP_NAME, "path", "request") packet = RNS.Packet(path_request_dst, path_request_data, packet_type = RNS.Packet.DATA, transport_type = RNS.Transport.BROADCAST, header_type = RNS.Packet.HEADER_1, attached_interface = on_interface) if on_interface != None and recursive: if not hasattr(on_interface, "announce_cap"): on_interface.announce_cap = RNS.Reticulum.ANNOUNCE_CAP if not hasattr(on_interface, "announce_allowed_at"): on_interface.announce_allowed_at = 0 if not hasattr(on_interface, "announce_queue"): on_interface.announce_queue = [] queued_announces = True if len(on_interface.announce_queue) > 0 else False if queued_announces: RNS.log("Blocking recursive path request on "+str(on_interface)+" due to queued announces", RNS.LOG_EXTREME) return else: now = time.time() if now < on_interface.announce_allowed_at: RNS.log("Blocking recursive path request on "+str(on_interface)+" due to active announce cap", RNS.LOG_EXTREME) return else: tx_time = ((len(path_request_data)+RNS.Reticulum.HEADER_MINSIZE)*8) / on_interface.bitrate wait_time = (tx_time / on_interface.announce_cap) on_interface.announce_allowed_at = now + wait_time packet.send() Transport.path_requests[destination_hash] = time.time() @staticmethod def path_request_handler(data, packet): try: # If there is at least bytes enough for a destination # hash in the packet, we assume those bytes are the # destination being requested. if len(data) >= RNS.Identity.TRUNCATED_HASHLENGTH//8: destination_hash = data[:RNS.Identity.TRUNCATED_HASHLENGTH//8] # If there is also enough bytes for a transport # instance ID and at least one tag byte, we # assume the next bytes to be the trasport ID # of the requesting transport instance. if len(data) > (RNS.Identity.TRUNCATED_HASHLENGTH//8)*2: requesting_transport_instance = data[RNS.Identity.TRUNCATED_HASHLENGTH//8:(RNS.Identity.TRUNCATED_HASHLENGTH//8)*2] else: requesting_transport_instance = None tag_bytes = None if len(data) > (RNS.Identity.TRUNCATED_HASHLENGTH//8)*2: tag_bytes = data[RNS.Identity.TRUNCATED_HASHLENGTH//8*2:] elif len(data) > (RNS.Identity.TRUNCATED_HASHLENGTH//8): tag_bytes = data[RNS.Identity.TRUNCATED_HASHLENGTH//8:] if tag_bytes != None: if len(tag_bytes) > RNS.Identity.TRUNCATED_HASHLENGTH//8: tag_bytes = tag_bytes[:RNS.Identity.TRUNCATED_HASHLENGTH//8] unique_tag = destination_hash+tag_bytes if not unique_tag in Transport.discovery_pr_tags: Transport.discovery_pr_tags.append(unique_tag) Transport.path_request( destination_hash, Transport.from_local_client(packet), packet.receiving_interface, requestor_transport_id = requesting_transport_instance, tag=tag_bytes ) else: RNS.log("Ignoring duplicate path request for "+RNS.prettyhexrep(destination_hash)+" with tag "+RNS.prettyhexrep(unique_tag), RNS.LOG_DEBUG) else: RNS.log("Ignoring tagless path request for "+RNS.prettyhexrep(destination_hash), RNS.LOG_DEBUG) except Exception as e: RNS.log("Error while handling path request. The contained exception was: "+str(e), RNS.LOG_ERROR) @staticmethod def path_request(destination_hash, is_from_local_client, attached_interface, requestor_transport_id=None, tag=None): should_search_for_unknown = False if attached_interface != None: if RNS.Reticulum.transport_enabled() and attached_interface.mode in RNS.Interfaces.Interface.Interface.DISCOVER_PATHS_FOR: should_search_for_unknown = True interface_str = " on "+str(attached_interface) else: interface_str = "" RNS.log("Path request for "+RNS.prettyhexrep(destination_hash)+interface_str, RNS.LOG_DEBUG) destination_exists_on_local_client = False if len(Transport.local_client_interfaces) > 0: if destination_hash in Transport.destination_table: destination_interface = Transport.destination_table[destination_hash][5] if Transport.is_local_client_interface(destination_interface): destination_exists_on_local_client = True Transport.pending_local_path_requests[destination_hash] = attached_interface local_destination = next((d for d in Transport.destinations if d.hash == destination_hash), None) if local_destination != None: local_destination.announce(path_response=True, tag=tag, attached_interface=attached_interface) RNS.log("Answering path request for "+RNS.prettyhexrep(destination_hash)+interface_str+", destination is local to this system", RNS.LOG_DEBUG) elif (RNS.Reticulum.transport_enabled() or is_from_local_client) and (destination_hash in Transport.destination_table): packet = Transport.destination_table[destination_hash][6] next_hop = Transport.destination_table[destination_hash][1] received_from = Transport.destination_table[destination_hash][5] if attached_interface.mode == RNS.Interfaces.Interface.Interface.MODE_ROAMING and attached_interface == received_from: RNS.log("Not answering path request on roaming-mode interface, since next hop is on same roaming-mode interface", RNS.LOG_DEBUG) else: if requestor_transport_id != None and next_hop == requestor_transport_id: # TODO: Find a bandwidth efficient way to invalidate our # known path on this signal. The obvious way of signing # path requests with transport instance keys is quite # inefficient. There is probably a better way. Doing # path invalidation here would decrease the network # convergence time. Maybe just drop it? RNS.log("Not answering path request for "+RNS.prettyhexrep(destination_hash)+interface_str+", since next hop is the requestor", RNS.LOG_DEBUG) else: RNS.log("Answering path request for "+RNS.prettyhexrep(destination_hash)+interface_str+", path is known", RNS.LOG_DEBUG) now = time.time() retries = Transport.PATHFINDER_R local_rebroadcasts = 0 block_rebroadcasts = True announce_hops = packet.hops if is_from_local_client: retransmit_timeout = now else: # TODO: Consider this timing retransmit_timeout = now + Transport.PATH_REQUEST_GRACE # + (RNS.rand() * Transport.PATHFINDER_RW) # If we are answering on a roaming-mode interface, wait a # little longer, to allow potential more well-connected # peers to answer first. if attached_interface.mode == RNS.Interfaces.Interface.Interface.MODE_ROAMING: retransmit_timeout += Transport.PATH_REQUEST_RG # This handles an edge case where a peer sends a past # request for a destination just after an announce for # said destination has arrived, but before it has been # rebroadcast locally. In such a case the actual announce # is temporarily held, and then reinserted when the path # request has been served to the peer. if packet.destination_hash in Transport.announce_table: held_entry = Transport.announce_table[packet.destination_hash] Transport.held_announces[packet.destination_hash] = held_entry Transport.announce_table[packet.destination_hash] = [now, retransmit_timeout, retries, received_from, announce_hops, packet, local_rebroadcasts, block_rebroadcasts, attached_interface] elif is_from_local_client: # Forward path request on all interfaces # except the local client RNS.log("Forwarding path request from local client for "+RNS.prettyhexrep(destination_hash)+interface_str+" to all other interfaces", RNS.LOG_DEBUG) request_tag = RNS.Identity.get_random_hash() for interface in Transport.interfaces: if not interface == attached_interface: Transport.request_path(destination_hash, interface, tag = request_tag) elif should_search_for_unknown: if destination_hash in Transport.discovery_path_requests: RNS.log("There is already a waiting path request for "+RNS.prettyhexrep(destination_hash)+" on behalf of path request"+interface_str, RNS.LOG_DEBUG) else: # Forward path request on all interfaces # except the requestor interface RNS.log("Attempting to discover unknown path to "+RNS.prettyhexrep(destination_hash)+" on behalf of path request"+interface_str, RNS.LOG_DEBUG) pr_entry = { "destination_hash": destination_hash, "timeout": time.time()+Transport.PATH_REQUEST_TIMEOUT, "requesting_interface": attached_interface } Transport.discovery_path_requests[destination_hash] = pr_entry for interface in Transport.interfaces: if not interface == attached_interface: # Use the previously extracted tag from this path request # on the new path requests as well, to avoid potential loops Transport.request_path(destination_hash, on_interface=interface, tag=tag, recursive=True) elif not is_from_local_client and len(Transport.local_client_interfaces) > 0: # Forward the path request on all local # client interfaces RNS.log("Forwarding path request for "+RNS.prettyhexrep(destination_hash)+interface_str+" to local clients", RNS.LOG_DEBUG) for interface in Transport.local_client_interfaces: Transport.request_path(destination_hash, on_interface=interface) else: RNS.log("Ignoring path request for "+RNS.prettyhexrep(destination_hash)+interface_str+", no path known", RNS.LOG_DEBUG) @staticmethod def from_local_client(packet): if hasattr(packet.receiving_interface, "parent_interface"): return Transport.is_local_client_interface(packet.receiving_interface) else: return False @staticmethod def is_local_client_interface(interface): if hasattr(interface, "parent_interface"): if hasattr(interface.parent_interface, "is_local_shared_instance"): return True else: return False else: return False @staticmethod def interface_to_shared_instance(interface): if hasattr(interface, "is_connected_to_shared_instance"): return True else: return False @staticmethod def detach_interfaces(): detachable_interfaces = [] for interface in Transport.interfaces: # Currently no rules are being applied # here, and all interfaces will be sent # the detach call on RNS teardown. if True: detachable_interfaces.append(interface) else: pass for interface in Transport.local_client_interfaces: # Currently no rules are being applied # here, and all interfaces will be sent # the detach call on RNS teardown. if True: detachable_interfaces.append(interface) else: pass for interface in detachable_interfaces: try: interface.detach() except Exception as e: RNS.log("An error occurred while detaching "+str(interface)+". The contained exception was: "+str(e), RNS.LOG_ERROR) @staticmethod def shared_connection_disappeared(): for link in Transport.active_links: link.teardown() for link in Transport.pending_links: link.teardown() Transport.announce_table = {} Transport.destination_table = {} Transport.reverse_table = {} Transport.link_table = {} Transport.held_announces = {} Transport.announce_handlers = [] Transport.tunnels = {} @staticmethod def shared_connection_reappeared(): if Transport.owner.is_connected_to_shared_instance: for registered_destination in Transport.destinations: if registered_destination.type == RNS.Destination.SINGLE: registered_destination.announce(path_response=True) @staticmethod def drop_announce_queues(): for interface in Transport.interfaces: if hasattr(interface, "announce_queue") and interface.announce_queue != None: na = len(interface.announce_queue) if na > 0: if na == 1: na_str = "1 announce" else: na_str = str(na)+" announces" interface.announce_queue = [] RNS.log("Dropped "+na_str+" on "+str(interface), RNS.LOG_VERBOSE) @staticmethod def announce_emitted(packet): random_blob = packet.data[RNS.Identity.KEYSIZE//8+RNS.Identity.NAME_HASH_LENGTH//8:RNS.Identity.KEYSIZE//8+RNS.Identity.NAME_HASH_LENGTH//8+10] announce_emitted = int.from_bytes(random_blob[5:10], "big") return announce_emitted @staticmethod def save_packet_hashlist(): if not Transport.owner.is_connected_to_shared_instance: if hasattr(Transport, "saving_packet_hashlist"): wait_interval = 0.2 wait_timeout = 5 wait_start = time.time() while Transport.saving_packet_hashlist: time.sleep(wait_interval) if time.time() > wait_start+wait_timeout: RNS.log("Could not save packet hashlist to storage, waiting for previous save operation timed out.", RNS.LOG_ERROR) return False try: Transport.saving_packet_hashlist = True save_start = time.time() if not RNS.Reticulum.transport_enabled(): Transport.packet_hashlist = [] else: RNS.log("Saving packet hashlist to storage...", RNS.LOG_DEBUG) packet_hashlist_path = RNS.Reticulum.storagepath+"/packet_hashlist" file = open(packet_hashlist_path, "wb") file.write(umsgpack.packb(Transport.packet_hashlist)) file.close() save_time = time.time() - save_start if save_time < 1: time_str = str(round(save_time*1000,2))+"ms" else: time_str = str(round(save_time,2))+"s" RNS.log("Saved packet hashlist in "+time_str, RNS.LOG_DEBUG) except Exception as e: RNS.log("Could not save packet hashlist to storage, the contained exception was: "+str(e), RNS.LOG_ERROR) Transport.saving_packet_hashlist = False @staticmethod def save_path_table(): if not Transport.owner.is_connected_to_shared_instance: if hasattr(Transport, "saving_path_table"): wait_interval = 0.2 wait_timeout = 5 wait_start = time.time() while Transport.saving_path_table: time.sleep(wait_interval) if time.time() > wait_start+wait_timeout: RNS.log("Could not save path table to storage, waiting for previous save operation timed out.", RNS.LOG_ERROR) return False try: Transport.saving_path_table = True save_start = time.time() RNS.log("Saving path table to storage...", RNS.LOG_DEBUG) serialised_destinations = [] for destination_hash in Transport.destination_table: # Get the destination entry from the destination table de = Transport.destination_table[destination_hash] interface_hash = de[5].get_hash() # Only store destination table entry if the associated # interface is still active interface = Transport.find_interface_from_hash(interface_hash) if interface != None: # Get the destination entry from the destination table de = Transport.destination_table[destination_hash] timestamp = de[0] received_from = de[1] hops = de[2] expires = de[3] random_blobs = de[4] packet_hash = de[6].get_hash() serialised_entry = [ destination_hash, timestamp, received_from, hops, expires, random_blobs, interface_hash, packet_hash ] serialised_destinations.append(serialised_entry) Transport.cache(de[6], force_cache=True) destination_table_path = RNS.Reticulum.storagepath+"/destination_table" file = open(destination_table_path, "wb") file.write(umsgpack.packb(serialised_destinations)) file.close() save_time = time.time() - save_start if save_time < 1: time_str = str(round(save_time*1000,2))+"ms" else: time_str = str(round(save_time,2))+"s" RNS.log("Saved "+str(len(serialised_destinations))+" path table entries in "+time_str, RNS.LOG_DEBUG) except Exception as e: RNS.log("Could not save path table to storage, the contained exception was: "+str(e), RNS.LOG_ERROR) Transport.saving_path_table = False @staticmethod def save_tunnel_table(): if not Transport.owner.is_connected_to_shared_instance: if hasattr(Transport, "saving_tunnel_table"): wait_interval = 0.2 wait_timeout = 5 wait_start = time.time() while Transport.saving_tunnel_table: time.sleep(wait_interval) if time.time() > wait_start+wait_timeout: RNS.log("Could not save tunnel table to storage, waiting for previous save operation timed out.", RNS.LOG_ERROR) return False try: Transport.saving_tunnel_table = True save_start = time.time() RNS.log("Saving tunnel table to storage...", RNS.LOG_DEBUG) serialised_tunnels = [] for tunnel_id in Transport.tunnels: te = Transport.tunnels[tunnel_id] interface = te[1] tunnel_paths = te[2] expires = te[3] if interface != None: interface_hash = interface.get_hash() else: interface_hash = None serialised_paths = [] for destination_hash in tunnel_paths: de = tunnel_paths[destination_hash] timestamp = de[0] received_from = de[1] hops = de[2] expires = de[3] random_blobs = de[4][-Transport.PERSIST_RANDOM_BLOBS:] packet_hash = de[6].get_hash() serialised_entry = [ destination_hash, timestamp, received_from, hops, expires, random_blobs, interface_hash, packet_hash ] serialised_paths.append(serialised_entry) Transport.cache(de[6], force_cache=True) serialised_tunnel = [tunnel_id, interface_hash, serialised_paths, expires] serialised_tunnels.append(serialised_tunnel) tunnels_path = RNS.Reticulum.storagepath+"/tunnels" file = open(tunnels_path, "wb") file.write(umsgpack.packb(serialised_tunnels)) file.close() save_time = time.time() - save_start if save_time < 1: time_str = str(round(save_time*1000,2))+"ms" else: time_str = str(round(save_time,2))+"s" RNS.log("Saved "+str(len(serialised_tunnels))+" tunnel table entries in "+time_str, RNS.LOG_DEBUG) except Exception as e: RNS.log("Could not save tunnel table to storage, the contained exception was: "+str(e), RNS.LOG_ERROR) Transport.saving_tunnel_table = False @staticmethod def persist_data(): Transport.save_packet_hashlist() Transport.save_path_table() Transport.save_tunnel_table() @staticmethod def exit_handler(): if not Transport.owner.is_connected_to_shared_instance: Transport.persist_data()