mirror of
https://github.com/markqvist/Reticulum.git
synced 2024-11-22 13:40:19 +00:00
2826 lines
150 KiB
Python
Executable File
2826 lines
150 KiB
Python
Executable File
# MIT License
|
|
#
|
|
# Copyright (c) 2016-2024 Mark Qvist / unsigned.io and contributors.
|
|
#
|
|
# Permission is hereby granted, free of charge, to any person obtaining a copy
|
|
# of this software and associated documentation files (the "Software"), to deal
|
|
# in the Software without restriction, including without limitation the rights
|
|
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
|
# copies of the Software, and to permit persons to whom the Software is
|
|
# furnished to do so, subject to the following conditions:
|
|
#
|
|
# The above copyright notice and this permission notice shall be included in all
|
|
# copies or substantial portions of the Software.
|
|
#
|
|
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
|
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
|
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
|
# SOFTWARE.
|
|
|
|
import os
|
|
import RNS
|
|
import time
|
|
import math
|
|
import struct
|
|
import threading
|
|
from time import sleep
|
|
from .vendor import umsgpack as umsgpack
|
|
|
|
class Transport:
|
|
"""
|
|
Through static methods of this class you can interact with the
|
|
Transport system of Reticulum.
|
|
"""
|
|
# Constants
|
|
BROADCAST = 0x00;
|
|
TRANSPORT = 0x01;
|
|
RELAY = 0x02;
|
|
TUNNEL = 0x03;
|
|
types = [BROADCAST, TRANSPORT, RELAY, TUNNEL]
|
|
|
|
REACHABILITY_UNREACHABLE = 0x00
|
|
REACHABILITY_DIRECT = 0x01
|
|
REACHABILITY_TRANSPORT = 0x02
|
|
|
|
APP_NAME = "rnstransport"
|
|
|
|
PATHFINDER_M = 128 # Max hops
|
|
"""
|
|
Maximum amount of hops that Reticulum will transport a packet.
|
|
"""
|
|
|
|
PATHFINDER_R = 1 # Retransmit retries
|
|
PATHFINDER_G = 5 # Retry grace period
|
|
PATHFINDER_RW = 0.5 # Random window for announce rebroadcast
|
|
PATHFINDER_E = 60*60*24*7 # Path expiration of one week
|
|
AP_PATH_TIME = 60*60*24 # Path expiration of one day for Access Point paths
|
|
ROAMING_PATH_TIME = 60*60*6 # Path expiration of 6 hours for Roaming paths
|
|
|
|
# TODO: Calculate an optimal number for this in
|
|
# various situations
|
|
LOCAL_REBROADCASTS_MAX = 2 # How many local rebroadcasts of an announce is allowed
|
|
|
|
PATH_REQUEST_TIMEOUT = 15 # Default timuout for client path requests in seconds
|
|
PATH_REQUEST_GRACE = 0.4 # Grace time before a path announcement is made, allows directly reachable peers to respond first
|
|
PATH_REQUEST_RG = 1.5 # Extra grace time for roaming-mode interfaces to allow more suitable peers to respond first
|
|
PATH_REQUEST_MI = 20 # Minimum interval in seconds for automated path requests
|
|
|
|
STATE_UNKNOWN = 0x00
|
|
STATE_UNRESPONSIVE = 0x01
|
|
STATE_RESPONSIVE = 0x02
|
|
|
|
LINK_TIMEOUT = RNS.Link.STALE_TIME * 1.25
|
|
REVERSE_TIMEOUT = 30*60 # Reverse table entries are removed after 30 minutes
|
|
DESTINATION_TIMEOUT = 60*60*24*7 # Destination table entries are removed if unused for one week
|
|
MAX_RECEIPTS = 1024 # Maximum number of receipts to keep track of
|
|
MAX_RATE_TIMESTAMPS = 16 # Maximum number of announce timestamps to keep per destination
|
|
PERSIST_RANDOM_BLOBS = 32 # Maximum number of random blobs per destination to persist to disk
|
|
MAX_RANDOM_BLOBS = 64 # Maximum number of random blobs per destination to keep in memory
|
|
|
|
interfaces = [] # All active interfaces
|
|
destinations = [] # All active destinations
|
|
pending_links = [] # Links that are being established
|
|
active_links = [] # Links that are active
|
|
packet_hashlist = [] # A list of packet hashes for duplicate detection
|
|
receipts = [] # Receipts of all outgoing packets for proof processing
|
|
|
|
# TODO: "destination_table" should really be renamed to "path_table"
|
|
# Notes on memory usage: 1 megabyte of memory can store approximately
|
|
# 55.100 path table entries or approximately 22.300 link table entries.
|
|
|
|
announce_table = {} # A table for storing announces currently waiting to be retransmitted
|
|
destination_table = {} # A lookup table containing the next hop to a given destination
|
|
reverse_table = {} # A lookup table for storing packet hashes used to return proofs and replies
|
|
link_table = {} # A lookup table containing hops for links
|
|
held_announces = {} # A table containing temporarily held announce-table entries
|
|
announce_handlers = [] # A table storing externally registered announce handlers
|
|
tunnels = {} # A table storing tunnels to other transport instances
|
|
announce_rate_table = {} # A table for keeping track of announce rates
|
|
path_requests = {} # A table for storing path request timestamps
|
|
path_states = {} # A table for keeping track of path states
|
|
|
|
discovery_path_requests = {} # A table for keeping track of path requests on behalf of other nodes
|
|
discovery_pr_tags = [] # A table for keeping track of tagged path requests
|
|
max_pr_tags = 32000 # Maximum amount of unique path request tags to remember
|
|
|
|
# Transport control destinations are used
|
|
# for control purposes like path requests
|
|
control_destinations = []
|
|
control_hashes = []
|
|
remote_management_allowed = []
|
|
|
|
# Interfaces for communicating with
|
|
# local clients connected to a shared
|
|
# Reticulum instance
|
|
local_client_interfaces = []
|
|
|
|
local_client_rssi_cache = []
|
|
local_client_snr_cache = []
|
|
local_client_q_cache = []
|
|
LOCAL_CLIENT_CACHE_MAXSIZE = 512
|
|
|
|
pending_local_path_requests = {}
|
|
|
|
start_time = None
|
|
jobs_locked = False
|
|
jobs_running = False
|
|
job_interval = 0.250
|
|
links_last_checked = 0.0
|
|
links_check_interval = 1.0
|
|
receipts_last_checked = 0.0
|
|
receipts_check_interval = 1.0
|
|
announces_last_checked = 0.0
|
|
announces_check_interval = 1.0
|
|
hashlist_maxsize = 1000000
|
|
tables_last_culled = 0.0
|
|
tables_cull_interval = 5.0
|
|
interface_last_jobs = 0.0
|
|
interface_jobs_interval = 5.0
|
|
|
|
identity = None
|
|
|
|
@staticmethod
|
|
def start(reticulum_instance):
|
|
Transport.jobs_running = True
|
|
Transport.owner = reticulum_instance
|
|
|
|
if Transport.identity == None:
|
|
transport_identity_path = f"{RNS.Reticulum.storagepath}/transport_identity"
|
|
if os.path.isfile(transport_identity_path):
|
|
Transport.identity = RNS.Identity.from_file(transport_identity_path)
|
|
|
|
if Transport.identity == None:
|
|
RNS.log("No valid Transport Identity in storage, creating...", RNS.LOG_VERBOSE)
|
|
Transport.identity = RNS.Identity()
|
|
Transport.identity.to_file(transport_identity_path)
|
|
else:
|
|
RNS.log("Loaded Transport Identity from storage", RNS.LOG_VERBOSE)
|
|
|
|
packet_hashlist_path = f"{RNS.Reticulum.storagepath}/packet_hashlist"
|
|
if not Transport.owner.is_connected_to_shared_instance:
|
|
if os.path.isfile(packet_hashlist_path):
|
|
try:
|
|
file = open(packet_hashlist_path, "rb")
|
|
Transport.packet_hashlist = umsgpack.unpackb(file.read())
|
|
file.close()
|
|
except Exception as e:
|
|
RNS.log(f"Could not load packet hashlist from storage, the contained exception was: {e}", RNS.LOG_ERROR)
|
|
|
|
# Create transport-specific destinations
|
|
Transport.path_request_destination = RNS.Destination(None, RNS.Destination.IN, RNS.Destination.PLAIN, Transport.APP_NAME, "path", "request")
|
|
Transport.path_request_destination.set_packet_callback(Transport.path_request_handler)
|
|
Transport.control_destinations.append(Transport.path_request_destination)
|
|
Transport.control_hashes.append(Transport.path_request_destination.hash)
|
|
|
|
Transport.tunnel_synthesize_destination = RNS.Destination(None, RNS.Destination.IN, RNS.Destination.PLAIN, Transport.APP_NAME, "tunnel", "synthesize")
|
|
Transport.tunnel_synthesize_destination.set_packet_callback(Transport.tunnel_synthesize_handler)
|
|
Transport.control_destinations.append(Transport.tunnel_synthesize_handler)
|
|
Transport.control_hashes.append(Transport.tunnel_synthesize_destination.hash)
|
|
|
|
if RNS.Reticulum.remote_management_enabled() and not Transport.owner.is_connected_to_shared_instance:
|
|
Transport.remote_management_destination = RNS.Destination(Transport.identity, RNS.Destination.IN, RNS.Destination.SINGLE, Transport.APP_NAME, "remote", "management")
|
|
Transport.remote_management_destination.register_request_handler("/status", response_generator = Transport.remote_status_handler, allow = RNS.Destination.ALLOW_LIST, allowed_list=Transport.remote_management_allowed)
|
|
Transport.remote_management_destination.register_request_handler("/path", response_generator = Transport.remote_path_handler, allow = RNS.Destination.ALLOW_LIST, allowed_list=Transport.remote_management_allowed)
|
|
Transport.control_destinations.append(Transport.remote_management_destination)
|
|
Transport.control_hashes.append(Transport.remote_management_destination.hash)
|
|
RNS.log(f"Enabled remote management on {Transport.remote_management_destination}", RNS.LOG_NOTICE)
|
|
|
|
Transport.jobs_running = False
|
|
thread = threading.Thread(target=Transport.jobloop, daemon=True)
|
|
thread.start()
|
|
|
|
if RNS.Reticulum.transport_enabled():
|
|
destination_table_path = f"{RNS.Reticulum.storagepath}/destination_table"
|
|
tunnel_table_path = f"{RNS.Reticulum.storagepath}/tunnels"
|
|
|
|
if os.path.isfile(destination_table_path) and not Transport.owner.is_connected_to_shared_instance:
|
|
serialised_destinations = []
|
|
try:
|
|
file = open(destination_table_path, "rb")
|
|
serialised_destinations = umsgpack.unpackb(file.read())
|
|
file.close()
|
|
|
|
for serialised_entry in serialised_destinations:
|
|
destination_hash = serialised_entry[0]
|
|
|
|
if len(destination_hash) == RNS.Reticulum.TRUNCATED_HASHLENGTH//8:
|
|
timestamp = serialised_entry[1]
|
|
received_from = serialised_entry[2]
|
|
hops = serialised_entry[3]
|
|
expires = serialised_entry[4]
|
|
random_blobs = serialised_entry[5]
|
|
receiving_interface = Transport.find_interface_from_hash(serialised_entry[6])
|
|
announce_packet = Transport.get_cached_packet(serialised_entry[7])
|
|
|
|
if announce_packet != None and receiving_interface != None:
|
|
announce_packet.unpack()
|
|
# We increase the hops, since reading a packet
|
|
# from cache is equivalent to receiving it again
|
|
# over an interface. It is cached with it's non-
|
|
# increased hop-count.
|
|
announce_packet.hops += 1
|
|
Transport.destination_table[destination_hash] = [timestamp, received_from, hops, expires, random_blobs, receiving_interface, announce_packet]
|
|
RNS.log(f"Loaded path table entry for {RNS.prettyhexrep(destination_hash)} from storage", RNS.LOG_DEBUG)
|
|
else:
|
|
RNS.log(f"Could not reconstruct path table entry from storage for {RNS.prettyhexrep(destination_hash)}", RNS.LOG_DEBUG)
|
|
if announce_packet == None:
|
|
RNS.log("The announce packet could not be loaded from cache", RNS.LOG_DEBUG)
|
|
if receiving_interface == None:
|
|
RNS.log("The interface is no longer available", RNS.LOG_DEBUG)
|
|
|
|
if len(Transport.destination_table) == 1:
|
|
specifier = "entry"
|
|
else:
|
|
specifier = "entries"
|
|
|
|
RNS.log(f"Loaded {len(Transport.destination_table)} path table {specifier} from storage", RNS.LOG_VERBOSE)
|
|
|
|
except Exception as e:
|
|
RNS.log(f"Could not load destination table from storage, the contained exception was: {e}", RNS.LOG_ERROR)
|
|
|
|
if os.path.isfile(tunnel_table_path) and not Transport.owner.is_connected_to_shared_instance:
|
|
serialised_tunnels = []
|
|
try:
|
|
file = open(tunnel_table_path, "rb")
|
|
serialised_tunnels = umsgpack.unpackb(file.read())
|
|
file.close()
|
|
|
|
for serialised_tunnel in serialised_tunnels:
|
|
tunnel_id = serialised_tunnel[0]
|
|
interface_hash = serialised_tunnel[1]
|
|
serialised_paths = serialised_tunnel[2]
|
|
expires = serialised_tunnel[3]
|
|
|
|
tunnel_paths = {}
|
|
for serialised_entry in serialised_paths:
|
|
destination_hash = serialised_entry[0]
|
|
timestamp = serialised_entry[1]
|
|
received_from = serialised_entry[2]
|
|
hops = serialised_entry[3]
|
|
expires = serialised_entry[4]
|
|
random_blobs = serialised_entry[5]
|
|
receiving_interface = Transport.find_interface_from_hash(serialised_entry[6])
|
|
announce_packet = Transport.get_cached_packet(serialised_entry[7])
|
|
|
|
if announce_packet != None:
|
|
announce_packet.unpack()
|
|
# We increase the hops, since reading a packet
|
|
# from cache is equivalent to receiving it again
|
|
# over an interface. It is cached with it's non-
|
|
# increased hop-count.
|
|
announce_packet.hops += 1
|
|
|
|
tunnel_path = [timestamp, received_from, hops, expires, random_blobs, receiving_interface, announce_packet]
|
|
tunnel_paths[destination_hash] = tunnel_path
|
|
|
|
tunnel = [tunnel_id, None, tunnel_paths, expires]
|
|
Transport.tunnels[tunnel_id] = tunnel
|
|
|
|
if len(Transport.destination_table) == 1:
|
|
specifier = "entry"
|
|
else:
|
|
specifier = "entries"
|
|
|
|
RNS.log(f"Loaded {len(Transport.tunnels)} tunnel table {specifier} from storage", RNS.LOG_VERBOSE)
|
|
|
|
except Exception as e:
|
|
RNS.log(f"Could not load tunnel table from storage, the contained exception was: {e}", RNS.LOG_ERROR)
|
|
|
|
if RNS.Reticulum.probe_destination_enabled():
|
|
Transport.probe_destination = RNS.Destination(Transport.identity, RNS.Destination.IN, RNS.Destination.SINGLE, Transport.APP_NAME, "probe")
|
|
Transport.probe_destination.accepts_links(False)
|
|
Transport.probe_destination.set_proof_strategy(RNS.Destination.PROVE_ALL)
|
|
Transport.probe_destination.announce()
|
|
RNS.log(f"Transport Instance will respond to probe requests on {Transport.probe_destination}", RNS.LOG_NOTICE)
|
|
else:
|
|
Transport.probe_destination = None
|
|
|
|
RNS.log(f"Transport instance {Transport.identity} started", RNS.LOG_VERBOSE)
|
|
Transport.start_time = time.time()
|
|
|
|
# Sort interfaces according to bitrate
|
|
Transport.prioritize_interfaces()
|
|
|
|
# Synthesize tunnels for any interfaces wanting it
|
|
for interface in Transport.interfaces:
|
|
interface.tunnel_id = None
|
|
if hasattr(interface, "wants_tunnel") and interface.wants_tunnel:
|
|
Transport.synthesize_tunnel(interface)
|
|
|
|
@staticmethod
|
|
def prioritize_interfaces():
|
|
try:
|
|
Transport.interfaces.sort(key=lambda interface: interface.bitrate, reverse=True)
|
|
|
|
except Exception as e:
|
|
RNS.log(f"Could not prioritize interfaces according to bitrate. The contained exception was: {e}", RNS.LOG_ERROR)
|
|
|
|
@staticmethod
|
|
def jobloop():
|
|
while (True):
|
|
Transport.jobs()
|
|
sleep(Transport.job_interval)
|
|
|
|
@staticmethod
|
|
def jobs():
|
|
outgoing = []
|
|
path_requests = {}
|
|
blocked_if = None
|
|
Transport.jobs_running = True
|
|
|
|
try:
|
|
if not Transport.jobs_locked:
|
|
|
|
# Process active and pending link lists
|
|
if time.time() > Transport.links_last_checked+Transport.links_check_interval:
|
|
|
|
for link in Transport.pending_links:
|
|
if link.status == RNS.Link.CLOSED:
|
|
# If we are not a Transport Instance, finding a pending link
|
|
# that was never activated will trigger an expiry of the path
|
|
# to the destination, and an attempt to rediscover the path.
|
|
if not RNS.Reticulum.transport_enabled():
|
|
Transport.expire_path(link.destination.hash)
|
|
|
|
# If we are connected to a shared instance, it will take
|
|
# care of sending out a new path request. If not, we will
|
|
# send one directly.
|
|
if not Transport.owner.is_connected_to_shared_instance:
|
|
last_path_request = 0
|
|
if link.destination.hash in Transport.path_requests:
|
|
last_path_request = Transport.path_requests[link.destination.hash]
|
|
|
|
if time.time() - last_path_request > Transport.PATH_REQUEST_MI:
|
|
RNS.log(f"Trying to rediscover path for {RNS.prettyhexrep(link.destination.hash)} since an attempted link was never established", RNS.LOG_DEBUG)
|
|
if not link.destination.hash in path_requests:
|
|
blocked_if = None
|
|
path_requests[link.destination.hash] = blocked_if
|
|
|
|
Transport.pending_links.remove(link)
|
|
|
|
for link in Transport.active_links:
|
|
if link.status == RNS.Link.CLOSED:
|
|
Transport.active_links.remove(link)
|
|
|
|
Transport.links_last_checked = time.time()
|
|
|
|
# Process receipts list for timed-out packets
|
|
if time.time() > Transport.receipts_last_checked+Transport.receipts_check_interval:
|
|
while len(Transport.receipts) > Transport.MAX_RECEIPTS:
|
|
culled_receipt = Transport.receipts.pop(0)
|
|
culled_receipt.timeout = -1
|
|
culled_receipt.check_timeout()
|
|
|
|
for receipt in Transport.receipts:
|
|
receipt.check_timeout()
|
|
if receipt.status != RNS.PacketReceipt.SENT:
|
|
if receipt in Transport.receipts:
|
|
Transport.receipts.remove(receipt)
|
|
|
|
Transport.receipts_last_checked = time.time()
|
|
|
|
# Process announces needing retransmission
|
|
if time.time() > Transport.announces_last_checked+Transport.announces_check_interval:
|
|
completed_announces = []
|
|
for destination_hash in Transport.announce_table:
|
|
announce_entry = Transport.announce_table[destination_hash]
|
|
if announce_entry[2] > Transport.PATHFINDER_R:
|
|
RNS.log(f"Completed announce processing for {RNS.prettyhexrep(destination_hash)}, retry limit reached", RNS.LOG_EXTREME)
|
|
completed_announces.append(destination_hash)
|
|
else:
|
|
if time.time() > announce_entry[1]:
|
|
announce_entry[1] = time.time() + Transport.PATHFINDER_G + Transport.PATHFINDER_RW
|
|
announce_entry[2] += 1
|
|
packet = announce_entry[5]
|
|
block_rebroadcasts = announce_entry[7]
|
|
attached_interface = announce_entry[8]
|
|
announce_context = RNS.Packet.NONE
|
|
if block_rebroadcasts:
|
|
announce_context = RNS.Packet.PATH_RESPONSE
|
|
announce_data = packet.data
|
|
announce_identity = RNS.Identity.recall(packet.destination_hash)
|
|
announce_destination = RNS.Destination(announce_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, "unknown", "unknown");
|
|
announce_destination.hash = packet.destination_hash
|
|
announce_destination.hexhash = announce_destination.hash.hex()
|
|
|
|
new_packet = RNS.Packet(
|
|
announce_destination,
|
|
announce_data,
|
|
RNS.Packet.ANNOUNCE,
|
|
context = announce_context,
|
|
header_type = RNS.Packet.HEADER_2,
|
|
transport_type = Transport.TRANSPORT,
|
|
transport_id = Transport.identity.hash,
|
|
attached_interface = attached_interface,
|
|
context_flag = packet.context_flag,
|
|
)
|
|
|
|
new_packet.hops = announce_entry[4]
|
|
if block_rebroadcasts:
|
|
RNS.log(f"Rebroadcasting announce as path response for {RNS.prettyhexrep(announce_destination.hash)} with hop count {new_packet.hops}", RNS.LOG_DEBUG)
|
|
else:
|
|
RNS.log(f"Rebroadcasting announce for {RNS.prettyhexrep(announce_destination.hash)} with hop count {new_packet.hops}", RNS.LOG_DEBUG)
|
|
|
|
outgoing.append(new_packet)
|
|
|
|
# This handles an edge case where a peer sends a past
|
|
# request for a destination just after an announce for
|
|
# said destination has arrived, but before it has been
|
|
# rebroadcast locally. In such a case the actual announce
|
|
# is temporarily held, and then reinserted when the path
|
|
# request has been served to the peer.
|
|
if destination_hash in Transport.held_announces:
|
|
held_entry = Transport.held_announces.pop(destination_hash)
|
|
Transport.announce_table[destination_hash] = held_entry
|
|
RNS.log("Reinserting held announce into table", RNS.LOG_DEBUG)
|
|
|
|
for destination_hash in completed_announces:
|
|
if destination_hash in Transport.announce_table:
|
|
Transport.announce_table.pop(destination_hash)
|
|
|
|
Transport.announces_last_checked = time.time()
|
|
|
|
|
|
# Cull the packet hashlist if it has reached its max size
|
|
if len(Transport.packet_hashlist) > Transport.hashlist_maxsize:
|
|
Transport.packet_hashlist = Transport.packet_hashlist[len(Transport.packet_hashlist)-Transport.hashlist_maxsize:len(Transport.packet_hashlist)-1]
|
|
|
|
# Cull the path request tags list if it has reached its max size
|
|
if len(Transport.discovery_pr_tags) > Transport.max_pr_tags:
|
|
Transport.discovery_pr_tags = Transport.discovery_pr_tags[len(Transport.discovery_pr_tags)-Transport.max_pr_tags:len(Transport.discovery_pr_tags)-1]
|
|
|
|
if time.time() > Transport.tables_last_culled + Transport.tables_cull_interval:
|
|
# Remove unneeded path state entries
|
|
stale_path_states = []
|
|
for destination_hash in Transport.path_states:
|
|
if not destination_hash in Transport.destination_table:
|
|
stale_path_states.append(destination_hash)
|
|
|
|
# Cull the reverse table according to timeout
|
|
stale_reverse_entries = []
|
|
for truncated_packet_hash in Transport.reverse_table:
|
|
reverse_entry = Transport.reverse_table[truncated_packet_hash]
|
|
if time.time() > reverse_entry[2] + Transport.REVERSE_TIMEOUT:
|
|
stale_reverse_entries.append(truncated_packet_hash)
|
|
|
|
# Cull the link table according to timeout
|
|
stale_links = []
|
|
for link_id in Transport.link_table:
|
|
link_entry = Transport.link_table[link_id]
|
|
|
|
if link_entry[7] == True:
|
|
if time.time() > link_entry[0] + Transport.LINK_TIMEOUT:
|
|
stale_links.append(link_id)
|
|
else:
|
|
if time.time() > link_entry[8]:
|
|
stale_links.append(link_id)
|
|
|
|
last_path_request = 0
|
|
if link_entry[6] in Transport.path_requests:
|
|
last_path_request = Transport.path_requests[link_entry[6]]
|
|
|
|
lr_taken_hops = link_entry[5]
|
|
|
|
path_request_throttle = time.time() - last_path_request < Transport.PATH_REQUEST_MI
|
|
path_request_conditions = False
|
|
|
|
# If the path has been invalidated between the time of
|
|
# making the link request and now, try to rediscover it
|
|
if not Transport.has_path(link_entry[6]):
|
|
RNS.log(f"Trying to rediscover path for {RNS.prettyhexrep(link_entry[6])} since an attempted link was never established, and path is now missing", RNS.LOG_DEBUG)
|
|
path_request_conditions =True
|
|
|
|
# If this link request was originated from a local client
|
|
# attempt to rediscover a path to the destination, if this
|
|
# has not already happened recently.
|
|
elif not path_request_throttle and lr_taken_hops == 0:
|
|
RNS.log(f"Trying to rediscover path for {RNS.prettyhexrep(link_entry[6])} since an attempted local client link was never established", RNS.LOG_DEBUG)
|
|
path_request_conditions = True
|
|
|
|
# If the link destination was previously only 1 hop
|
|
# away, this likely means that it was local to one
|
|
# of our interfaces, and that it roamed somewhere else.
|
|
# In that case, try to discover a new path, and mark
|
|
# the old one as unresponsive.
|
|
elif not path_request_throttle and Transport.hops_to(link_entry[6]) == 1:
|
|
RNS.log(f"Trying to rediscover path for {RNS.prettyhexrep(link_entry[6])} since an attempted link was never established, and destination was previously local to an interface on this instance", RNS.LOG_DEBUG)
|
|
path_request_conditions = True
|
|
blocked_if = link_entry[4]
|
|
|
|
# TODO: This might result in the path re-resolution
|
|
# only being able to happen once, since new path found
|
|
# after allowing update from higher hop-count path, after
|
|
# marking old path unresponsive, might be more than 1 hop away,
|
|
# thus dealocking us into waiting for a new announce all-together.
|
|
# Is this problematic, or does it actually not matter?
|
|
# Best would be to have full support for alternative paths,
|
|
# and score them according to number of unsuccessful tries or
|
|
# similar.
|
|
if RNS.Reticulum.transport_enabled():
|
|
if hasattr(link_entry[4], "mode") and link_entry[4].mode != RNS.Interfaces.Interface.Interface.MODE_BOUNDARY:
|
|
Transport.mark_path_unresponsive(link_entry[6])
|
|
|
|
# If the link initiator is only 1 hop away,
|
|
# this likely means that network topology has
|
|
# changed. In that case, we try to discover a new path,
|
|
# and mark the old one as potentially unresponsive.
|
|
elif not path_request_throttle and lr_taken_hops == 1:
|
|
RNS.log(f"Trying to rediscover path for {RNS.prettyhexrep(link_entry[6])} since an attempted link was never established, and link initiator is local to an interface on this instance", RNS.LOG_DEBUG)
|
|
path_request_conditions = True
|
|
blocked_if = link_entry[4]
|
|
|
|
if RNS.Reticulum.transport_enabled():
|
|
if hasattr(link_entry[4], "mode") and link_entry[4].mode != RNS.Interfaces.Interface.Interface.MODE_BOUNDARY:
|
|
Transport.mark_path_unresponsive(link_entry[6])
|
|
|
|
if path_request_conditions:
|
|
if not link_entry[6] in path_requests:
|
|
path_requests[link_entry[6]] = blocked_if
|
|
|
|
if not RNS.Reticulum.transport_enabled():
|
|
# Drop current path if we are not a transport instance, to
|
|
# allow using higher-hop count paths or reused announces
|
|
# from newly adjacent transport instances.
|
|
Transport.expire_path(link_entry[6])
|
|
|
|
# Cull the path table
|
|
stale_paths = []
|
|
for destination_hash in Transport.destination_table:
|
|
destination_entry = Transport.destination_table[destination_hash]
|
|
attached_interface = destination_entry[5]
|
|
|
|
if attached_interface != None and hasattr(attached_interface, "mode") and attached_interface.mode == RNS.Interfaces.Interface.Interface.MODE_ACCESS_POINT:
|
|
destination_expiry = destination_entry[0] + Transport.AP_PATH_TIME
|
|
elif attached_interface != None and hasattr(attached_interface, "mode") and attached_interface.mode == RNS.Interfaces.Interface.Interface.MODE_ROAMING:
|
|
destination_expiry = destination_entry[0] + Transport.ROAMING_PATH_TIME
|
|
else:
|
|
destination_expiry = destination_entry[0] + Transport.DESTINATION_TIMEOUT
|
|
|
|
if time.time() > destination_expiry:
|
|
stale_paths.append(destination_hash)
|
|
RNS.log(f"Path to {RNS.prettyhexrep(destination_hash)} timed out and was removed", RNS.LOG_DEBUG)
|
|
elif not attached_interface in Transport.interfaces:
|
|
stale_paths.append(destination_hash)
|
|
RNS.log(f"Path to {RNS.prettyhexrep(destination_hash)} was removed since the attached interface no longer exists", RNS.LOG_DEBUG)
|
|
|
|
# Cull the pending discovery path requests table
|
|
stale_discovery_path_requests = []
|
|
for destination_hash in Transport.discovery_path_requests:
|
|
entry = Transport.discovery_path_requests[destination_hash]
|
|
|
|
if time.time() > entry["timeout"]:
|
|
stale_discovery_path_requests.append(destination_hash)
|
|
RNS.log(f"Waiting path request for {RNS.prettyhexrep(destination_hash)} timed out and was removed", RNS.LOG_DEBUG)
|
|
|
|
# Cull the tunnel table
|
|
stale_tunnels = []
|
|
ti = 0
|
|
for tunnel_id in Transport.tunnels:
|
|
tunnel_entry = Transport.tunnels[tunnel_id]
|
|
|
|
expires = tunnel_entry[3]
|
|
if time.time() > expires:
|
|
stale_tunnels.append(tunnel_id)
|
|
RNS.log(f"Tunnel {RNS.prettyhexrep(tunnel_id)} timed out and was removed", RNS.LOG_EXTREME)
|
|
else:
|
|
stale_tunnel_paths = []
|
|
tunnel_paths = tunnel_entry[2]
|
|
for tunnel_path in tunnel_paths:
|
|
tunnel_path_entry = tunnel_paths[tunnel_path]
|
|
|
|
if time.time() > tunnel_path_entry[0] + Transport.DESTINATION_TIMEOUT:
|
|
stale_tunnel_paths.append(tunnel_path)
|
|
RNS.log(f"Tunnel path to {RNS.prettyhexrep(tunnel_path)} timed out and was removed", RNS.LOG_EXTREME)
|
|
|
|
for tunnel_path in stale_tunnel_paths:
|
|
tunnel_paths.pop(tunnel_path)
|
|
ti += 1
|
|
|
|
|
|
if ti > 0:
|
|
if ti == 1:
|
|
RNS.log(f"Removed {ti} tunnel path", RNS.LOG_EXTREME)
|
|
else:
|
|
RNS.log(f"Removed {ti} tunnel paths", RNS.LOG_EXTREME)
|
|
|
|
i = 0
|
|
for truncated_packet_hash in stale_reverse_entries:
|
|
Transport.reverse_table.pop(truncated_packet_hash)
|
|
i += 1
|
|
|
|
if i > 0:
|
|
if i == 1:
|
|
RNS.log(f"Released {i} reverse table entry", RNS.LOG_EXTREME)
|
|
else:
|
|
RNS.log(f"Released {i} reverse table entries", RNS.LOG_EXTREME)
|
|
|
|
i = 0
|
|
for link_id in stale_links:
|
|
Transport.link_table.pop(link_id)
|
|
i += 1
|
|
|
|
if i > 0:
|
|
if i == 1:
|
|
RNS.log(f"Released {i} link", RNS.LOG_EXTREME)
|
|
else:
|
|
RNS.log(f"Released {i} links", RNS.LOG_EXTREME)
|
|
|
|
i = 0
|
|
for destination_hash in stale_paths:
|
|
Transport.destination_table.pop(destination_hash)
|
|
i += 1
|
|
|
|
if i > 0:
|
|
if i == 1:
|
|
RNS.log(f"Removed {i} path", RNS.LOG_EXTREME)
|
|
else:
|
|
RNS.log(f"Removed {i} paths", RNS.LOG_EXTREME)
|
|
|
|
i = 0
|
|
for destination_hash in stale_discovery_path_requests:
|
|
Transport.discovery_path_requests.pop(destination_hash)
|
|
i += 1
|
|
|
|
if i > 0:
|
|
if i == 1:
|
|
RNS.log(f"Removed {i} waiting path request", RNS.LOG_EXTREME)
|
|
else:
|
|
RNS.log(f"Removed {i} waiting path requests", RNS.LOG_EXTREME)
|
|
|
|
i = 0
|
|
for tunnel_id in stale_tunnels:
|
|
Transport.tunnels.pop(tunnel_id)
|
|
i += 1
|
|
|
|
if i > 0:
|
|
if i == 1:
|
|
RNS.log(f"Removed {i} tunnel", RNS.LOG_EXTREME)
|
|
else:
|
|
RNS.log(f"Removed {i} tunnels", RNS.LOG_EXTREME)
|
|
|
|
i = 0
|
|
for destination_hash in stale_path_states:
|
|
Transport.path_states.pop(destination_hash)
|
|
i += 1
|
|
|
|
if i > 0:
|
|
if i == 1:
|
|
RNS.log(f"Removed {i} path state entry", RNS.LOG_EXTREME)
|
|
else:
|
|
RNS.log(f"Removed {i} path state entries", RNS.LOG_EXTREME)
|
|
|
|
Transport.tables_last_culled = time.time()
|
|
|
|
if time.time() > Transport.interface_last_jobs + Transport.interface_jobs_interval:
|
|
Transport.prioritize_interfaces()
|
|
for interface in Transport.interfaces:
|
|
interface.process_held_announces()
|
|
Transport.interface_last_jobs = time.time()
|
|
|
|
else:
|
|
# Transport jobs were locked, do nothing
|
|
pass
|
|
|
|
except Exception as e:
|
|
RNS.log("An exception occurred while running Transport jobs.", RNS.LOG_ERROR)
|
|
RNS.log(f"The contained exception was: {e}", RNS.LOG_ERROR)
|
|
|
|
Transport.jobs_running = False
|
|
|
|
for packet in outgoing:
|
|
packet.send()
|
|
|
|
for destination_hash in path_requests:
|
|
blocked_if = path_requests[destination_hash]
|
|
if blocked_if == None:
|
|
Transport.request_path(destination_hash)
|
|
else:
|
|
for interface in Transport.interfaces:
|
|
if interface != blocked_if:
|
|
# RNS.log("Transmitting path request on "+str(interface), RNS.LOG_DEBUG)
|
|
Transport.request_path(destination_hash, on_interface=interface)
|
|
else:
|
|
pass
|
|
# RNS.log("Blocking path request on "+str(interface), RNS.LOG_DEBUG)
|
|
|
|
@staticmethod
|
|
def transmit(interface, raw):
|
|
try:
|
|
if hasattr(interface, "ifac_identity") and interface.ifac_identity != None:
|
|
# Calculate packet access code
|
|
ifac = interface.ifac_identity.sign(raw)[-interface.ifac_size:]
|
|
|
|
# Generate mask
|
|
mask = RNS.Cryptography.hkdf(
|
|
length=len(raw)+interface.ifac_size,
|
|
derive_from=ifac,
|
|
salt=interface.ifac_key,
|
|
context=None,
|
|
)
|
|
|
|
# Set IFAC flag
|
|
new_header = bytes([raw[0] | 0x80, raw[1]])
|
|
|
|
# Assemble new payload with IFAC
|
|
new_raw = new_header+ifac+raw[2:]
|
|
|
|
# Mask payload
|
|
i = 0; masked_raw = b""
|
|
for byte in new_raw:
|
|
if i == 0:
|
|
# Mask first header byte, but make sure the
|
|
# IFAC flag is still set
|
|
masked_raw += bytes([byte ^ mask[i] | 0x80])
|
|
elif i == 1 or i > interface.ifac_size+1:
|
|
# Mask second header byte and payload
|
|
masked_raw += bytes([byte ^ mask[i]])
|
|
else:
|
|
# Don't mask the IFAC itself
|
|
masked_raw += bytes([byte])
|
|
i += 1
|
|
|
|
# Send it
|
|
interface.processOutgoing(masked_raw)
|
|
|
|
else:
|
|
interface.processOutgoing(raw)
|
|
|
|
except Exception as e:
|
|
RNS.log(f"Error while transmitting on {interface}. The contained exception was: {e}", RNS.LOG_ERROR)
|
|
|
|
@staticmethod
|
|
def outbound(packet):
|
|
while (Transport.jobs_running):
|
|
sleep(0.0005)
|
|
|
|
Transport.jobs_locked = True
|
|
|
|
sent = False
|
|
outbound_time = time.time()
|
|
|
|
generate_receipt = False
|
|
if (packet.create_receipt == True and
|
|
# Only generate receipts for DATA packets
|
|
packet.packet_type == RNS.Packet.DATA and
|
|
# Don't generate receipts for PLAIN destinations
|
|
packet.destination.type != RNS.Destination.PLAIN and
|
|
# Don't generate receipts for link-related packets
|
|
not (packet.context >= RNS.Packet.KEEPALIVE and packet.context <= RNS.Packet.LRPROOF) and
|
|
# Don't generate receipts for resource packets
|
|
not (packet.context >= RNS.Packet.RESOURCE and packet.context <= RNS.Packet.RESOURCE_RCL)):
|
|
|
|
generate_receipt = True
|
|
|
|
def packet_sent(packet):
|
|
packet.sent = True
|
|
packet.sent_at = time.time()
|
|
|
|
if generate_receipt:
|
|
packet.receipt = RNS.PacketReceipt(packet)
|
|
Transport.receipts.append(packet.receipt)
|
|
|
|
# TODO: Enable when caching has been redesigned
|
|
# Transport.cache(packet)
|
|
|
|
# Check if we have a known path for the destination in the path table
|
|
if packet.packet_type != RNS.Packet.ANNOUNCE and packet.destination.type != RNS.Destination.PLAIN and packet.destination.type != RNS.Destination.GROUP and packet.destination_hash in Transport.destination_table:
|
|
outbound_interface = Transport.destination_table[packet.destination_hash][5]
|
|
|
|
# If there's more than one hop to the destination, and we know
|
|
# a path, we insert the packet into transport by adding the next
|
|
# transport nodes address to the header, and modifying the flags.
|
|
# This rule applies both for "normal" transport, and when connected
|
|
# to a local shared Reticulum instance.
|
|
if Transport.destination_table[packet.destination_hash][2] > 1:
|
|
if packet.header_type == RNS.Packet.HEADER_1:
|
|
# Insert packet into transport
|
|
new_flags = (RNS.Packet.HEADER_2) << 6 | (Transport.TRANSPORT) << 4 | (packet.flags & 0b00001111)
|
|
new_raw = struct.pack("!B", new_flags)
|
|
new_raw += packet.raw[1:2]
|
|
new_raw += Transport.destination_table[packet.destination_hash][1]
|
|
new_raw += packet.raw[2:]
|
|
packet_sent(packet)
|
|
Transport.transmit(outbound_interface, new_raw)
|
|
Transport.destination_table[packet.destination_hash][0] = time.time()
|
|
sent = True
|
|
|
|
# In the special case where we are connected to a local shared
|
|
# Reticulum instance, and the destination is one hop away, we
|
|
# also add transport headers to inject the packet into transport
|
|
# via the shared instance. Normally a packet for a destination
|
|
# one hop away would just be broadcast directly, but since we
|
|
# are "behind" a shared instance, we need to get that instance
|
|
# to transport it onto the network.
|
|
elif Transport.destination_table[packet.destination_hash][2] == 1 and Transport.owner.is_connected_to_shared_instance:
|
|
if packet.header_type == RNS.Packet.HEADER_1:
|
|
# Insert packet into transport
|
|
new_flags = (RNS.Packet.HEADER_2) << 6 | (Transport.TRANSPORT) << 4 | (packet.flags & 0b00001111)
|
|
new_raw = struct.pack("!B", new_flags)
|
|
new_raw += packet.raw[1:2]
|
|
new_raw += Transport.destination_table[packet.destination_hash][1]
|
|
new_raw += packet.raw[2:]
|
|
packet_sent(packet)
|
|
Transport.transmit(outbound_interface, new_raw)
|
|
Transport.destination_table[packet.destination_hash][0] = time.time()
|
|
sent = True
|
|
|
|
# If none of the above applies, we know the destination is
|
|
# directly reachable, and also on which interface, so we
|
|
# simply transmit the packet directly on that one.
|
|
else:
|
|
packet_sent(packet)
|
|
Transport.transmit(outbound_interface, packet.raw)
|
|
sent = True
|
|
|
|
# If we don't have a known path for the destination, we'll
|
|
# broadcast the packet on all outgoing interfaces, or the
|
|
# just the relevant interface if the packet has an attached
|
|
# interface, or belongs to a link.
|
|
else:
|
|
stored_hash = False
|
|
for interface in Transport.interfaces:
|
|
if interface.OUT:
|
|
should_transmit = True
|
|
|
|
if packet.destination.type == RNS.Destination.LINK:
|
|
if packet.destination.status == RNS.Link.CLOSED:
|
|
should_transmit = False
|
|
if interface != packet.destination.attached_interface:
|
|
should_transmit = False
|
|
|
|
if packet.attached_interface != None and interface != packet.attached_interface:
|
|
should_transmit = False
|
|
|
|
if packet.packet_type == RNS.Packet.ANNOUNCE:
|
|
if packet.attached_interface == None:
|
|
if interface.mode == RNS.Interfaces.Interface.Interface.MODE_ACCESS_POINT:
|
|
RNS.log(f"Blocking announce broadcast on {interface} due to AP mode", RNS.LOG_EXTREME)
|
|
should_transmit = False
|
|
|
|
elif interface.mode == RNS.Interfaces.Interface.Interface.MODE_ROAMING:
|
|
local_destination = next((d for d in Transport.destinations if d.hash == packet.destination_hash), None)
|
|
if local_destination != None:
|
|
# RNS.log("Allowing announce broadcast on roaming-mode interface from instance-local destination", RNS.LOG_EXTREME)
|
|
pass
|
|
else:
|
|
from_interface = Transport.next_hop_interface(packet.destination_hash)
|
|
if from_interface == None or not hasattr(from_interface, "mode"):
|
|
should_transmit = False
|
|
if from_interface == None:
|
|
RNS.log(f"Blocking announce broadcast on {interface} since next hop interface doesn't exist", RNS.LOG_EXTREME)
|
|
elif not hasattr(from_interface, "mode"):
|
|
RNS.log(f"Blocking announce broadcast on {interface} since next hop interface has no mode configured", RNS.LOG_EXTREME)
|
|
else:
|
|
if from_interface.mode == RNS.Interfaces.Interface.Interface.MODE_ROAMING:
|
|
RNS.log(f"Blocking announce broadcast on {interface} due to roaming-mode next-hop interface", RNS.LOG_EXTREME)
|
|
should_transmit = False
|
|
elif from_interface.mode == RNS.Interfaces.Interface.Interface.MODE_BOUNDARY:
|
|
RNS.log(f"Blocking announce broadcast on {interface} due to boundary-mode next-hop interface", RNS.LOG_EXTREME)
|
|
should_transmit = False
|
|
|
|
elif interface.mode == RNS.Interfaces.Interface.Interface.MODE_BOUNDARY:
|
|
local_destination = next((d for d in Transport.destinations if d.hash == packet.destination_hash), None)
|
|
if local_destination != None:
|
|
# RNS.log("Allowing announce broadcast on boundary-mode interface from instance-local destination", RNS.LOG_EXTREME)
|
|
pass
|
|
else:
|
|
from_interface = Transport.next_hop_interface(packet.destination_hash)
|
|
if from_interface == None or not hasattr(from_interface, "mode"):
|
|
should_transmit = False
|
|
if from_interface == None:
|
|
RNS.log(f"Blocking announce broadcast on {interface} since next hop interface doesn't exist", RNS.LOG_EXTREME)
|
|
elif not hasattr(from_interface, "mode"):
|
|
RNS.log(f"Blocking announce broadcast on {interface} since next hop interface has no mode configured", RNS.LOG_EXTREME)
|
|
else:
|
|
if from_interface.mode == RNS.Interfaces.Interface.Interface.MODE_ROAMING:
|
|
RNS.log(f"Blocking announce broadcast on {interface} due to roaming-mode next-hop interface", RNS.LOG_EXTREME)
|
|
should_transmit = False
|
|
|
|
else:
|
|
# Currently, annouces originating locally are always
|
|
# allowed, and do not conform to bandwidth caps.
|
|
# TODO: Rethink whether this is actually optimal.
|
|
if packet.hops > 0:
|
|
|
|
if not hasattr(interface, "announce_cap"):
|
|
interface.announce_cap = RNS.Reticulum.ANNOUNCE_CAP
|
|
|
|
if not hasattr(interface, "announce_allowed_at"):
|
|
interface.announce_allowed_at = 0
|
|
|
|
if not hasattr(interface, "announce_queue"):
|
|
interface.announce_queue = []
|
|
|
|
queued_announces = True if len(interface.announce_queue) > 0 else False
|
|
if not queued_announces and outbound_time > interface.announce_allowed_at and interface.bitrate != None and interface.bitrate != 0:
|
|
tx_time = (len(packet.raw)*8) / interface.bitrate
|
|
wait_time = (tx_time / interface.announce_cap)
|
|
interface.announce_allowed_at = outbound_time + wait_time
|
|
|
|
else:
|
|
should_transmit = False
|
|
if not len(interface.announce_queue) >= RNS.Reticulum.MAX_QUEUED_ANNOUNCES:
|
|
should_queue = True
|
|
|
|
already_queued = False
|
|
for e in interface.announce_queue:
|
|
if e["destination"] == packet.destination_hash:
|
|
already_queued = True
|
|
existing_entry = e
|
|
|
|
emission_timestamp = Transport.announce_emitted(packet)
|
|
if already_queued:
|
|
should_queue = False
|
|
|
|
if emission_timestamp > existing_entry["emitted"]:
|
|
e["time"] = outbound_time
|
|
e["hops"] = packet.hops
|
|
e["emitted"] = emission_timestamp
|
|
e["raw"] = packet.raw
|
|
|
|
if should_queue:
|
|
entry = {
|
|
"destination": packet.destination_hash,
|
|
"time": outbound_time,
|
|
"hops": packet.hops,
|
|
"emitted": Transport.announce_emitted(packet),
|
|
"raw": packet.raw
|
|
}
|
|
|
|
queued_announces = True if len(interface.announce_queue) > 0 else False
|
|
interface.announce_queue.append(entry)
|
|
|
|
if not queued_announces:
|
|
wait_time = max(interface.announce_allowed_at - time.time(), 0)
|
|
timer = threading.Timer(wait_time, interface.process_announce_queue)
|
|
timer.start()
|
|
|
|
if wait_time < 1:
|
|
wait_time_str = f"{round(wait_time * 1000, 2)}ms"
|
|
else:
|
|
wait_time_str = f"{round(wait_time * 1, 2)}s"
|
|
|
|
ql_str = str(len(interface.announce_queue))
|
|
RNS.log(f"Added announce to queue (height {ql_str}) on {interface} for processing in {wait_time_str}", RNS.LOG_EXTREME)
|
|
|
|
else:
|
|
wait_time = max(interface.announce_allowed_at - time.time(), 0)
|
|
|
|
if wait_time < 1:
|
|
wait_time_str = f"{round(wait_time * 1000, 2)}ms"
|
|
else:
|
|
wait_time_str = f"{round(wait_time * 1, 2)}s"
|
|
|
|
ql_str = str(len(interface.announce_queue))
|
|
RNS.log(f"Added announce to queue (height {ql_str}) on {interface} for processing in {wait_time_str}", RNS.LOG_EXTREME)
|
|
|
|
else:
|
|
pass
|
|
|
|
else:
|
|
pass
|
|
|
|
if should_transmit:
|
|
if not stored_hash:
|
|
Transport.packet_hashlist.append(packet.packet_hash)
|
|
stored_hash = True
|
|
|
|
# TODO: Re-evaluate potential for blocking
|
|
# def send_packet():
|
|
# Transport.transmit(interface, packet.raw)
|
|
# thread = threading.Thread(target=send_packet)
|
|
# thread.daemon = True
|
|
# thread.start()
|
|
|
|
Transport.transmit(interface, packet.raw)
|
|
if packet.packet_type == RNS.Packet.ANNOUNCE:
|
|
interface.sent_announce()
|
|
packet_sent(packet)
|
|
sent = True
|
|
|
|
Transport.jobs_locked = False
|
|
return sent
|
|
|
|
@staticmethod
|
|
def packet_filter(packet):
|
|
# TODO: Think long and hard about this.
|
|
# Is it even strictly necessary with the current
|
|
# transport rules?
|
|
|
|
# Filter packets intended for other transport instances
|
|
if packet.transport_id != None and packet.packet_type != RNS.Packet.ANNOUNCE:
|
|
if packet.transport_id != Transport.identity.hash:
|
|
RNS.log(f"Ignored packet {RNS.prettyhexrep(packet.packet_hash)} in transport for other transport instance", RNS.LOG_EXTREME)
|
|
return False
|
|
|
|
if packet.context == RNS.Packet.KEEPALIVE:
|
|
return True
|
|
if packet.context == RNS.Packet.RESOURCE_REQ:
|
|
return True
|
|
if packet.context == RNS.Packet.RESOURCE_PRF:
|
|
return True
|
|
if packet.context == RNS.Packet.RESOURCE:
|
|
return True
|
|
if packet.context == RNS.Packet.CACHE_REQUEST:
|
|
return True
|
|
if packet.context == RNS.Packet.CHANNEL:
|
|
return True
|
|
|
|
if packet.destination_type == RNS.Destination.PLAIN:
|
|
if packet.packet_type != RNS.Packet.ANNOUNCE:
|
|
if packet.hops > 1:
|
|
RNS.log(f"Dropped PLAIN packet {RNS.prettyhexrep(packet.hash)} with {packet.hops} hops", RNS.LOG_DEBUG)
|
|
return False
|
|
else:
|
|
return True
|
|
else:
|
|
RNS.log("Dropped invalid PLAIN announce packet", RNS.LOG_DEBUG)
|
|
return False
|
|
|
|
if packet.destination_type == RNS.Destination.GROUP:
|
|
if packet.packet_type != RNS.Packet.ANNOUNCE:
|
|
if packet.hops > 1:
|
|
RNS.log(f"Dropped GROUP packet {RNS.prettyhexrep(packet.hash)} with {packet.hops} hops", RNS.LOG_DEBUG)
|
|
return False
|
|
else:
|
|
return True
|
|
else:
|
|
RNS.log("Dropped invalid GROUP announce packet", RNS.LOG_DEBUG)
|
|
return False
|
|
|
|
if not packet.packet_hash in Transport.packet_hashlist:
|
|
return True
|
|
else:
|
|
if packet.packet_type == RNS.Packet.ANNOUNCE:
|
|
if packet.destination_type == RNS.Destination.SINGLE:
|
|
return True
|
|
else:
|
|
RNS.log("Dropped invalid announce packet", RNS.LOG_DEBUG)
|
|
return False
|
|
|
|
RNS.log(f"Filtered packet with hash {RNS.prettyhexrep(packet.packet_hash)}", RNS.LOG_EXTREME)
|
|
return False
|
|
|
|
@staticmethod
|
|
def inbound(raw, interface=None):
|
|
# If interface access codes are enabled,
|
|
# we must authenticate each packet.
|
|
if len(raw) > 2:
|
|
if interface != None and hasattr(interface, "ifac_identity") and interface.ifac_identity != None:
|
|
# Check that IFAC flag is set
|
|
if raw[0] & 0x80 == 0x80:
|
|
if len(raw) > 2+interface.ifac_size:
|
|
# Extract IFAC
|
|
ifac = raw[2:2+interface.ifac_size]
|
|
|
|
# Generate mask
|
|
mask = RNS.Cryptography.hkdf(
|
|
length=len(raw),
|
|
derive_from=ifac,
|
|
salt=interface.ifac_key,
|
|
context=None,
|
|
)
|
|
|
|
# Unmask payload
|
|
i = 0; unmasked_raw = b""
|
|
for byte in raw:
|
|
if i <= 1 or i > interface.ifac_size+1:
|
|
# Unmask header bytes and payload
|
|
unmasked_raw += bytes([byte ^ mask[i]])
|
|
else:
|
|
# Don't unmask IFAC itself
|
|
unmasked_raw += bytes([byte])
|
|
i += 1
|
|
raw = unmasked_raw
|
|
|
|
# Unset IFAC flag
|
|
new_header = bytes([raw[0] & 0x7f, raw[1]])
|
|
|
|
# Re-assemble packet
|
|
new_raw = new_header+raw[2+interface.ifac_size:]
|
|
|
|
# Calculate expected IFAC
|
|
expected_ifac = interface.ifac_identity.sign(new_raw)[-interface.ifac_size:]
|
|
|
|
# Check it
|
|
if ifac == expected_ifac:
|
|
raw = new_raw
|
|
else:
|
|
return
|
|
|
|
else:
|
|
return
|
|
|
|
else:
|
|
# If the IFAC flag is not set, but should be,
|
|
# drop the packet.
|
|
return
|
|
|
|
else:
|
|
# If the interface does not have IFAC enabled,
|
|
# check the received packet IFAC flag.
|
|
if raw[0] & 0x80 == 0x80:
|
|
# If the flag is set, drop the packet
|
|
return
|
|
|
|
else:
|
|
return
|
|
|
|
while (Transport.jobs_running):
|
|
sleep(0.0005)
|
|
|
|
if Transport.identity == None:
|
|
return
|
|
|
|
Transport.jobs_locked = True
|
|
|
|
packet = RNS.Packet(None, raw)
|
|
if not packet.unpack():
|
|
Transport.jobs_locked = False
|
|
return
|
|
|
|
packet.receiving_interface = interface
|
|
packet.hops += 1
|
|
|
|
if interface != None:
|
|
if hasattr(interface, "r_stat_rssi"):
|
|
if interface.r_stat_rssi != None:
|
|
packet.rssi = interface.r_stat_rssi
|
|
if len(Transport.local_client_interfaces) > 0:
|
|
Transport.local_client_rssi_cache.append([packet.packet_hash, packet.rssi])
|
|
|
|
while len(Transport.local_client_rssi_cache) > Transport.LOCAL_CLIENT_CACHE_MAXSIZE:
|
|
Transport.local_client_rssi_cache.pop(0)
|
|
|
|
if hasattr(interface, "r_stat_snr"):
|
|
if interface.r_stat_rssi != None:
|
|
packet.snr = interface.r_stat_snr
|
|
if len(Transport.local_client_interfaces) > 0:
|
|
Transport.local_client_snr_cache.append([packet.packet_hash, packet.snr])
|
|
|
|
while len(Transport.local_client_snr_cache) > Transport.LOCAL_CLIENT_CACHE_MAXSIZE:
|
|
Transport.local_client_snr_cache.pop(0)
|
|
|
|
if hasattr(interface, "r_stat_q"):
|
|
if interface.r_stat_q != None:
|
|
packet.q = interface.r_stat_q
|
|
if len(Transport.local_client_interfaces) > 0:
|
|
Transport.local_client_q_cache.append([packet.packet_hash, packet.q])
|
|
|
|
while len(Transport.local_client_q_cache) > Transport.LOCAL_CLIENT_CACHE_MAXSIZE:
|
|
Transport.local_client_q_cache.pop(0)
|
|
|
|
if len(Transport.local_client_interfaces) > 0:
|
|
if Transport.is_local_client_interface(interface):
|
|
packet.hops -= 1
|
|
|
|
elif Transport.interface_to_shared_instance(interface):
|
|
packet.hops -= 1
|
|
|
|
if Transport.packet_filter(packet):
|
|
# By default, remember packet hashes to avoid routing
|
|
# loops in the network, using the packet filter.
|
|
remember_packet_hash = True
|
|
|
|
# If this packet belongs to a link in our link table,
|
|
# we'll have to defer adding it to the filter list.
|
|
# In some cases, we might see a packet over a shared-
|
|
# medium interface, belonging to a link that transports
|
|
# or terminates with this instance, but before it would
|
|
# normally reach us. If the packet is appended to the
|
|
# filter list at this point, link transport will break.
|
|
if packet.destination_hash in Transport.link_table:
|
|
remember_packet_hash = False
|
|
|
|
# If this is a link request proof, don't add it until
|
|
# we are sure it's not actually somewhere else in the
|
|
# routing chain.
|
|
if packet.packet_type == RNS.Packet.PROOF and packet.context == RNS.Packet.LRPROOF:
|
|
remember_packet_hash = False
|
|
|
|
if remember_packet_hash:
|
|
Transport.packet_hashlist.append(packet.packet_hash)
|
|
# TODO: Enable when caching has been redesigned
|
|
# Transport.cache(packet)
|
|
|
|
# Check special conditions for local clients connected
|
|
# through a shared Reticulum instance
|
|
from_local_client = (packet.receiving_interface in Transport.local_client_interfaces)
|
|
for_local_client = (packet.packet_type != RNS.Packet.ANNOUNCE) and (packet.destination_hash in Transport.destination_table and Transport.destination_table[packet.destination_hash][2] == 0)
|
|
for_local_client_link = (packet.packet_type != RNS.Packet.ANNOUNCE) and (packet.destination_hash in Transport.link_table and Transport.link_table[packet.destination_hash][4] in Transport.local_client_interfaces)
|
|
for_local_client_link |= (packet.packet_type != RNS.Packet.ANNOUNCE) and (packet.destination_hash in Transport.link_table and Transport.link_table[packet.destination_hash][2] in Transport.local_client_interfaces)
|
|
proof_for_local_client = (packet.destination_hash in Transport.reverse_table) and (Transport.reverse_table[packet.destination_hash][0] in Transport.local_client_interfaces)
|
|
|
|
# Plain broadcast packets from local clients are sent
|
|
# directly on all attached interfaces, since they are
|
|
# never injected into transport.
|
|
if not packet.destination_hash in Transport.control_hashes:
|
|
if packet.destination_type == RNS.Destination.PLAIN and packet.transport_type == Transport.BROADCAST:
|
|
# Send to all interfaces except the originator
|
|
if from_local_client:
|
|
for interface in Transport.interfaces:
|
|
if interface != packet.receiving_interface:
|
|
Transport.transmit(interface, packet.raw)
|
|
# If the packet was not from a local client, send
|
|
# it directly to all local clients
|
|
else:
|
|
for interface in Transport.local_client_interfaces:
|
|
Transport.transmit(interface, packet.raw)
|
|
|
|
|
|
# General transport handling. Takes care of directing
|
|
# packets according to transport tables and recording
|
|
# entries in reverse and link tables.
|
|
if RNS.Reticulum.transport_enabled() or from_local_client or for_local_client or for_local_client_link:
|
|
|
|
# If there is no transport id, but the packet is
|
|
# for a local client, we generate the transport
|
|
# id (it was stripped on the previous hop, since
|
|
# we "spoof" the hop count for clients behind a
|
|
# shared instance, so they look directly reach-
|
|
# able), and reinsert, so the normal transport
|
|
# implementation can handle the packet.
|
|
if packet.transport_id == None and for_local_client:
|
|
packet.transport_id = Transport.identity.hash
|
|
|
|
# If this is a cache request, and we can fullfill
|
|
# it, do so and stop processing. Otherwise resume
|
|
# normal processing.
|
|
if packet.context == RNS.Packet.CACHE_REQUEST:
|
|
if Transport.cache_request_packet(packet):
|
|
Transport.jobs_locked = False
|
|
return
|
|
|
|
# If the packet is in transport, check whether we
|
|
# are the designated next hop, and process it
|
|
# accordingly if we are.
|
|
if packet.transport_id != None and packet.packet_type != RNS.Packet.ANNOUNCE:
|
|
if packet.transport_id == Transport.identity.hash:
|
|
if packet.destination_hash in Transport.destination_table:
|
|
next_hop = Transport.destination_table[packet.destination_hash][1]
|
|
remaining_hops = Transport.destination_table[packet.destination_hash][2]
|
|
|
|
if remaining_hops > 1:
|
|
# Just increase hop count and transmit
|
|
new_raw = packet.raw[0:1]
|
|
new_raw += struct.pack("!B", packet.hops)
|
|
new_raw += next_hop
|
|
new_raw += packet.raw[(RNS.Identity.TRUNCATED_HASHLENGTH//8)+2:]
|
|
elif remaining_hops == 1:
|
|
# Strip transport headers and transmit
|
|
new_flags = (RNS.Packet.HEADER_1) << 6 | (Transport.BROADCAST) << 4 | (packet.flags & 0b00001111)
|
|
new_raw = struct.pack("!B", new_flags)
|
|
new_raw += struct.pack("!B", packet.hops)
|
|
new_raw += packet.raw[(RNS.Identity.TRUNCATED_HASHLENGTH//8)+2:]
|
|
elif remaining_hops == 0:
|
|
# Just increase hop count and transmit
|
|
new_raw = packet.raw[0:1]
|
|
new_raw += struct.pack("!B", packet.hops)
|
|
new_raw += packet.raw[2:]
|
|
|
|
outbound_interface = Transport.destination_table[packet.destination_hash][5]
|
|
|
|
if packet.packet_type == RNS.Packet.LINKREQUEST:
|
|
now = time.time()
|
|
proof_timeout = Transport.extra_link_proof_timeout(packet.receiving_interface)
|
|
proof_timeout += now + RNS.Link.ESTABLISHMENT_TIMEOUT_PER_HOP * max(1, remaining_hops)
|
|
|
|
# Entry format is
|
|
link_entry = [ now, # 0: Timestamp,
|
|
next_hop, # 1: Next-hop transport ID
|
|
outbound_interface, # 2: Next-hop interface
|
|
remaining_hops, # 3: Remaining hops
|
|
packet.receiving_interface, # 4: Received on interface
|
|
packet.hops, # 5: Taken hops
|
|
packet.destination_hash, # 6: Original destination hash
|
|
False, # 7: Validated
|
|
proof_timeout] # 8: Proof timeout timestamp
|
|
|
|
Transport.link_table[packet.getTruncatedHash()] = link_entry
|
|
|
|
else:
|
|
# Entry format is
|
|
reverse_entry = [ packet.receiving_interface, # 0: Received on interface
|
|
outbound_interface, # 1: Outbound interface
|
|
time.time()] # 2: Timestamp
|
|
|
|
Transport.reverse_table[packet.getTruncatedHash()] = reverse_entry
|
|
|
|
Transport.transmit(outbound_interface, new_raw)
|
|
Transport.destination_table[packet.destination_hash][0] = time.time()
|
|
|
|
else:
|
|
# TODO: There should probably be some kind of REJECT
|
|
# mechanism here, to signal to the source that their
|
|
# expected path failed.
|
|
RNS.log(f"Got packet in transport, but no known path to final destination {RNS.prettyhexrep(packet.destination_hash)}. Dropping packet.", RNS.LOG_EXTREME)
|
|
|
|
# Link transport handling. Directs packets according
|
|
# to entries in the link tables
|
|
if packet.packet_type != RNS.Packet.ANNOUNCE and packet.packet_type != RNS.Packet.LINKREQUEST and packet.context != RNS.Packet.LRPROOF:
|
|
if packet.destination_hash in Transport.link_table:
|
|
link_entry = Transport.link_table[packet.destination_hash]
|
|
# If receiving and outbound interface is
|
|
# the same for this link, direction doesn't
|
|
# matter, and we simply repeat the packet.
|
|
outbound_interface = None
|
|
if link_entry[2] == link_entry[4]:
|
|
# But check that taken hops matches one
|
|
# of the expectede values.
|
|
if packet.hops == link_entry[3] or packet.hops == link_entry[5]:
|
|
outbound_interface = link_entry[2]
|
|
else:
|
|
# If interfaces differ, we transmit on
|
|
# the opposite interface of what the
|
|
# packet was received on.
|
|
if packet.receiving_interface == link_entry[2]:
|
|
# Also check that expected hop count matches
|
|
if packet.hops == link_entry[3]:
|
|
outbound_interface = link_entry[4]
|
|
elif packet.receiving_interface == link_entry[4]:
|
|
# Also check that expected hop count matches
|
|
if packet.hops == link_entry[5]:
|
|
outbound_interface = link_entry[2]
|
|
|
|
if outbound_interface != None:
|
|
# Add this packet to the filter hashlist if we
|
|
# have determined that it's actually our turn
|
|
# to process it.
|
|
Transport.packet_hashlist.append(packet.packet_hash)
|
|
|
|
new_raw = packet.raw[0:1]
|
|
new_raw += struct.pack("!B", packet.hops)
|
|
new_raw += packet.raw[2:]
|
|
Transport.transmit(outbound_interface, new_raw)
|
|
Transport.link_table[packet.destination_hash][0] = time.time()
|
|
|
|
# TODO: Test and possibly enable this at some point
|
|
# Transport.jobs_locked = False
|
|
# return
|
|
|
|
|
|
# Announce handling. Handles logic related to incoming
|
|
# announces, queueing rebroadcasts of these, and removal
|
|
# of queued announce rebroadcasts once handed to the next node.
|
|
if packet.packet_type == RNS.Packet.ANNOUNCE:
|
|
if interface != None and RNS.Identity.validate_announce(packet, only_validate_signature=True):
|
|
interface.received_announce()
|
|
|
|
if not packet.destination_hash in Transport.destination_table:
|
|
# This is an unknown destination, and we'll apply
|
|
# potential ingress limiting. Already known
|
|
# destinations will have re-announces controlled
|
|
# by normal announce rate limiting.
|
|
if interface.should_ingress_limit():
|
|
interface.hold_announce(packet)
|
|
Transport.jobs_locked = False
|
|
return
|
|
|
|
local_destination = next((d for d in Transport.destinations if d.hash == packet.destination_hash), None)
|
|
if local_destination == None and RNS.Identity.validate_announce(packet):
|
|
if packet.transport_id != None:
|
|
received_from = packet.transport_id
|
|
|
|
# Check if this is a next retransmission from
|
|
# another node. If it is, we're removing the
|
|
# announce in question from our pending table
|
|
if RNS.Reticulum.transport_enabled() and packet.destination_hash in Transport.announce_table:
|
|
announce_entry = Transport.announce_table[packet.destination_hash]
|
|
|
|
if packet.hops-1 == announce_entry[4]:
|
|
RNS.log(f"Heard a local rebroadcast of announce for {RNS.prettyhexrep(packet.destination_hash)}", RNS.LOG_DEBUG)
|
|
announce_entry[6] += 1
|
|
if announce_entry[6] >= Transport.LOCAL_REBROADCASTS_MAX:
|
|
RNS.log(f"Max local rebroadcasts of announce for {RNS.prettyhexrep(packet.destination_hash)} reached, dropping announce from our table", RNS.LOG_DEBUG)
|
|
if packet.destination_hash in Transport.announce_table:
|
|
Transport.announce_table.pop(packet.destination_hash)
|
|
|
|
if packet.hops-1 == announce_entry[4]+1 and announce_entry[2] > 0:
|
|
now = time.time()
|
|
if now < announce_entry[1]:
|
|
RNS.log(f"Rebroadcasted announce for {RNS.prettyhexrep(packet.destination_hash)} has been passed on to another node, no further tries needed", RNS.LOG_DEBUG)
|
|
Transport.announce_table.pop(packet.destination_hash)
|
|
|
|
else:
|
|
received_from = packet.destination_hash
|
|
|
|
# Check if this announce should be inserted into
|
|
# announce and destination tables
|
|
should_add = False
|
|
|
|
# First, check that the announce is not for a destination
|
|
# local to this system, and that hops are less than the max
|
|
if (not any(packet.destination_hash == d.hash for d in Transport.destinations) and packet.hops < Transport.PATHFINDER_M+1):
|
|
announce_emitted = Transport.announce_emitted(packet)
|
|
|
|
random_blob = packet.data[RNS.Identity.KEYSIZE//8+RNS.Identity.NAME_HASH_LENGTH//8:RNS.Identity.KEYSIZE//8+RNS.Identity.NAME_HASH_LENGTH//8+10]
|
|
random_blobs = []
|
|
if packet.destination_hash in Transport.destination_table:
|
|
random_blobs = Transport.destination_table[packet.destination_hash][4]
|
|
|
|
# If we already have a path to the announced
|
|
# destination, but the hop count is equal or
|
|
# less, we'll update our tables.
|
|
if packet.hops <= Transport.destination_table[packet.destination_hash][2]:
|
|
# Make sure we haven't heard the random
|
|
# blob before, so announces can't be
|
|
# replayed to forge paths.
|
|
# TODO: Check whether this approach works
|
|
# under all circumstances
|
|
if not random_blob in random_blobs:
|
|
Transport.mark_path_unknown_state(packet.destination_hash)
|
|
should_add = True
|
|
else:
|
|
should_add = False
|
|
else:
|
|
# If an announce arrives with a larger hop
|
|
# count than we already have in the table,
|
|
# ignore it, unless the path is expired, or
|
|
# the emission timestamp is more recent.
|
|
now = time.time()
|
|
path_expires = Transport.destination_table[packet.destination_hash][3]
|
|
|
|
path_announce_emitted = 0
|
|
for path_random_blob in random_blobs:
|
|
path_announce_emitted = max(path_announce_emitted, int.from_bytes(path_random_blob[5:10], "big"))
|
|
if path_announce_emitted >= announce_emitted:
|
|
break
|
|
|
|
# If the path has expired, consider this
|
|
# announce for adding to the path table.
|
|
if (now >= path_expires):
|
|
# We check that the announce is
|
|
# different from ones we've already heard,
|
|
# to avoid loops in the network
|
|
if not random_blob in random_blobs:
|
|
# TODO: Check that this ^ approach actually
|
|
# works under all circumstances
|
|
RNS.log(f"Replacing destination table entry for {RNS.prettyhexrep(packet.destination_hash)} with new announce due to expired path", RNS.LOG_DEBUG)
|
|
Transport.mark_path_unknown_state(packet.destination_hash)
|
|
should_add = True
|
|
else:
|
|
should_add = False
|
|
else:
|
|
# If the path is not expired, but the emission
|
|
# is more recent, and we haven't already heard
|
|
# this announce before, update the path table.
|
|
if (announce_emitted > path_announce_emitted):
|
|
if not random_blob in random_blobs:
|
|
RNS.log(f"Replacing destination table entry for {RNS.prettyhexrep(packet.destination_hash)} with new announce, since it was more recently emitted", RNS.LOG_DEBUG)
|
|
Transport.mark_path_unknown_state(packet.destination_hash)
|
|
should_add = True
|
|
else:
|
|
should_add = False
|
|
|
|
# If we have already heard this announce before,
|
|
# but the path has been marked as unresponsive
|
|
# by a failed communications attempt or similar,
|
|
# allow updating the path table to this one.
|
|
elif announce_emitted == path_announce_emitted:
|
|
if Transport.path_is_unresponsive(packet.destination_hash):
|
|
RNS.log(f"Replacing destination table entry for {RNS.prettyhexrep(packet.destination_hash)} with new announce, since previously tried path was unresponsive", RNS.LOG_DEBUG)
|
|
should_add = True
|
|
else:
|
|
should_add = False
|
|
|
|
else:
|
|
# If this destination is unknown in our table
|
|
# we should add it
|
|
should_add = True
|
|
|
|
if should_add:
|
|
now = time.time()
|
|
|
|
rate_blocked = False
|
|
if packet.context != RNS.Packet.PATH_RESPONSE and packet.receiving_interface.announce_rate_target != None:
|
|
if not packet.destination_hash in Transport.announce_rate_table:
|
|
rate_entry = { "last": now, "rate_violations": 0, "blocked_until": 0, "timestamps": [now]}
|
|
Transport.announce_rate_table[packet.destination_hash] = rate_entry
|
|
|
|
else:
|
|
rate_entry = Transport.announce_rate_table[packet.destination_hash]
|
|
rate_entry["timestamps"].append(now)
|
|
|
|
while len(rate_entry["timestamps"]) > Transport.MAX_RATE_TIMESTAMPS:
|
|
rate_entry["timestamps"].pop(0)
|
|
|
|
current_rate = now - rate_entry["last"]
|
|
|
|
if now > rate_entry["blocked_until"]:
|
|
|
|
if current_rate < packet.receiving_interface.announce_rate_target:
|
|
rate_entry["rate_violations"] += 1
|
|
|
|
else:
|
|
rate_entry["rate_violations"] = max(0, rate_entry["rate_violations"]-1)
|
|
|
|
if rate_entry["rate_violations"] > packet.receiving_interface.announce_rate_grace:
|
|
rate_target = packet.receiving_interface.announce_rate_target
|
|
rate_penalty = packet.receiving_interface.announce_rate_penalty
|
|
rate_entry["blocked_until"] = rate_entry["last"] + rate_target + rate_penalty
|
|
rate_blocked = True
|
|
else:
|
|
rate_entry["last"] = now
|
|
|
|
else:
|
|
rate_blocked = True
|
|
|
|
|
|
retries = 0
|
|
announce_hops = packet.hops
|
|
local_rebroadcasts = 0
|
|
block_rebroadcasts = False
|
|
attached_interface = None
|
|
|
|
retransmit_timeout = now + (RNS.rand() * Transport.PATHFINDER_RW)
|
|
|
|
if hasattr(packet.receiving_interface, "mode") and packet.receiving_interface.mode == RNS.Interfaces.Interface.Interface.MODE_ACCESS_POINT:
|
|
expires = now + Transport.AP_PATH_TIME
|
|
elif hasattr(packet.receiving_interface, "mode") and packet.receiving_interface.mode == RNS.Interfaces.Interface.Interface.MODE_ROAMING:
|
|
expires = now + Transport.ROAMING_PATH_TIME
|
|
else:
|
|
expires = now + Transport.PATHFINDER_E
|
|
|
|
random_blobs.append(random_blob)
|
|
random_blobs = random_blobs[-Transport.MAX_RANDOM_BLOBS:]
|
|
|
|
if (RNS.Reticulum.transport_enabled() or Transport.from_local_client(packet)) and packet.context != RNS.Packet.PATH_RESPONSE:
|
|
# Insert announce into announce table for retransmission
|
|
|
|
if rate_blocked:
|
|
RNS.log(f"Blocking rebroadcast of announce from {RNS.prettyhexrep(packet.destination_hash)} due to excessive announce rate", RNS.LOG_DEBUG)
|
|
|
|
else:
|
|
if Transport.from_local_client(packet):
|
|
# If the announce is from a local client,
|
|
# it is announced immediately, but only one time.
|
|
retransmit_timeout = now
|
|
retries = Transport.PATHFINDER_R
|
|
|
|
Transport.announce_table[packet.destination_hash] = [
|
|
now,
|
|
retransmit_timeout,
|
|
retries,
|
|
received_from,
|
|
announce_hops,
|
|
packet,
|
|
local_rebroadcasts,
|
|
block_rebroadcasts,
|
|
attached_interface
|
|
]
|
|
|
|
# TODO: Check from_local_client once and store result
|
|
elif Transport.from_local_client(packet) and packet.context == RNS.Packet.PATH_RESPONSE:
|
|
# If this is a path response from a local client,
|
|
# check if any external interfaces have pending
|
|
# path requests.
|
|
if packet.destination_hash in Transport.pending_local_path_requests:
|
|
desiring_interface = Transport.pending_local_path_requests.pop(packet.destination_hash)
|
|
retransmit_timeout = now
|
|
retries = Transport.PATHFINDER_R
|
|
|
|
Transport.announce_table[packet.destination_hash] = [
|
|
now,
|
|
retransmit_timeout,
|
|
retries,
|
|
received_from,
|
|
announce_hops,
|
|
packet,
|
|
local_rebroadcasts,
|
|
block_rebroadcasts,
|
|
attached_interface
|
|
]
|
|
|
|
# If we have any local clients connected, we re-
|
|
# transmit the announce to them immediately
|
|
if (len(Transport.local_client_interfaces)):
|
|
announce_identity = RNS.Identity.recall(packet.destination_hash)
|
|
announce_destination = RNS.Destination(announce_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, "unknown", "unknown");
|
|
announce_destination.hash = packet.destination_hash
|
|
announce_destination.hexhash = announce_destination.hash.hex()
|
|
announce_context = RNS.Packet.NONE
|
|
announce_data = packet.data
|
|
|
|
# TODO: Shouldn't the context be PATH_RESPONSE in the first case here?
|
|
if Transport.from_local_client(packet) and packet.context == RNS.Packet.PATH_RESPONSE:
|
|
for local_interface in Transport.local_client_interfaces:
|
|
if packet.receiving_interface != local_interface:
|
|
new_announce = RNS.Packet(
|
|
announce_destination,
|
|
announce_data,
|
|
RNS.Packet.ANNOUNCE,
|
|
context = announce_context,
|
|
header_type = RNS.Packet.HEADER_2,
|
|
transport_type = Transport.TRANSPORT,
|
|
transport_id = Transport.identity.hash,
|
|
attached_interface = local_interface,
|
|
context_flag = packet.context_flag,
|
|
)
|
|
|
|
new_announce.hops = packet.hops
|
|
new_announce.send()
|
|
|
|
else:
|
|
for local_interface in Transport.local_client_interfaces:
|
|
if packet.receiving_interface != local_interface:
|
|
new_announce = RNS.Packet(
|
|
announce_destination,
|
|
announce_data,
|
|
RNS.Packet.ANNOUNCE,
|
|
context = announce_context,
|
|
header_type = RNS.Packet.HEADER_2,
|
|
transport_type = Transport.TRANSPORT,
|
|
transport_id = Transport.identity.hash,
|
|
attached_interface = local_interface,
|
|
context_flag = packet.context_flag,
|
|
)
|
|
|
|
new_announce.hops = packet.hops
|
|
new_announce.send()
|
|
|
|
# If we have any waiting discovery path requests
|
|
# for this destination, we retransmit to that
|
|
# interface immediately
|
|
if packet.destination_hash in Transport.discovery_path_requests:
|
|
pr_entry = Transport.discovery_path_requests[packet.destination_hash]
|
|
attached_interface = pr_entry["requesting_interface"]
|
|
|
|
interface_str = f" on {attached_interface}"
|
|
|
|
RNS.log(f"Got matching announce, answering waiting discovery path request for {RNS.prettyhexrep(packet.destination_hash)}{interface_str}", RNS.LOG_DEBUG)
|
|
announce_identity = RNS.Identity.recall(packet.destination_hash)
|
|
announce_destination = RNS.Destination(announce_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, "unknown", "unknown");
|
|
announce_destination.hash = packet.destination_hash
|
|
announce_destination.hexhash = announce_destination.hash.hex()
|
|
announce_context = RNS.Packet.NONE
|
|
announce_data = packet.data
|
|
|
|
new_announce = RNS.Packet(
|
|
announce_destination,
|
|
announce_data,
|
|
RNS.Packet.ANNOUNCE,
|
|
context = RNS.Packet.PATH_RESPONSE,
|
|
header_type = RNS.Packet.HEADER_2,
|
|
transport_type = Transport.TRANSPORT,
|
|
transport_id = Transport.identity.hash,
|
|
attached_interface = attached_interface,
|
|
context_flag = packet.context_flag,
|
|
)
|
|
|
|
new_announce.hops = packet.hops
|
|
new_announce.send()
|
|
|
|
destination_table_entry = [now, received_from, announce_hops, expires, random_blobs, packet.receiving_interface, packet]
|
|
Transport.destination_table[packet.destination_hash] = destination_table_entry
|
|
RNS.log(f"Destination {RNS.prettyhexrep(packet.destination_hash)} is now {announce_hops} hops away via {RNS.prettyhexrep(received_from)} on {packet.receiving_interface}", RNS.LOG_DEBUG)
|
|
|
|
# If the receiving interface is a tunnel, we add the
|
|
# announce to the tunnels table
|
|
if hasattr(packet.receiving_interface, "tunnel_id") and packet.receiving_interface.tunnel_id != None:
|
|
tunnel_entry = Transport.tunnels[packet.receiving_interface.tunnel_id]
|
|
paths = tunnel_entry[2]
|
|
paths[packet.destination_hash] = destination_table_entry
|
|
expires = time.time() + Transport.DESTINATION_TIMEOUT
|
|
tunnel_entry[3] = expires
|
|
RNS.log(f"Path to {RNS.prettyhexrep(packet.destination_hash)} associated with tunnel {RNS.prettyhexrep(packet.receiving_interface.tunnel_id)}", RNS.LOG_DEBUG)
|
|
|
|
# Call externally registered callbacks from apps
|
|
# wanting to know when an announce arrives
|
|
for handler in Transport.announce_handlers:
|
|
try:
|
|
# Check that the announced destination matches
|
|
# the handlers aspect filter
|
|
execute_callback = False
|
|
announce_identity = RNS.Identity.recall(packet.destination_hash)
|
|
if handler.aspect_filter == None:
|
|
# If the handlers aspect filter is set to
|
|
# None, we execute the callback in all cases
|
|
execute_callback = True
|
|
else:
|
|
handler_expected_hash = RNS.Destination.hash_from_name_and_identity(handler.aspect_filter, announce_identity)
|
|
if packet.destination_hash == handler_expected_hash:
|
|
execute_callback = True
|
|
|
|
# If this is a path response, check whether the
|
|
# handler wants to receive it.
|
|
if packet.context == RNS.Packet.PATH_RESPONSE:
|
|
if hasattr(handler, "receive_path_responses") and handler.receive_path_responses == True:
|
|
pass
|
|
else:
|
|
execute_callback = False
|
|
|
|
if execute_callback:
|
|
handler.received_announce(
|
|
destination_hash=packet.destination_hash,
|
|
announced_identity=announce_identity,
|
|
app_data=RNS.Identity.recall_app_data(packet.destination_hash)
|
|
)
|
|
except Exception as e:
|
|
RNS.log("Error while processing external announce callback.", RNS.LOG_ERROR)
|
|
RNS.log(f"The contained exception was: {e}", RNS.LOG_ERROR)
|
|
RNS.trace_exception(e)
|
|
|
|
# Handling for link requests to local destinations
|
|
elif packet.packet_type == RNS.Packet.LINKREQUEST:
|
|
if packet.transport_id == None or packet.transport_id == Transport.identity.hash:
|
|
for destination in Transport.destinations:
|
|
if destination.hash == packet.destination_hash and destination.type == packet.destination_type:
|
|
packet.destination = destination
|
|
destination.receive(packet)
|
|
|
|
# Handling for local data packets
|
|
elif packet.packet_type == RNS.Packet.DATA:
|
|
if packet.destination_type == RNS.Destination.LINK:
|
|
for link in Transport.active_links:
|
|
if link.link_id == packet.destination_hash:
|
|
if link.attached_interface == packet.receiving_interface:
|
|
packet.link = link
|
|
if packet.context == RNS.Packet.CACHE_REQUEST:
|
|
cached_packet = Transport.get_cached_packet(packet.data)
|
|
if cached_packet != None:
|
|
cached_packet.unpack()
|
|
RNS.Packet(destination=link, data=cached_packet.data,
|
|
packet_type=cached_packet.packet_type, context=cached_packet.context).send()
|
|
|
|
Transport.jobs_locked = False
|
|
else:
|
|
link.receive(packet)
|
|
else:
|
|
# In the strange and rare case that an interface
|
|
# is partly malfunctioning, and a link-associated
|
|
# packet is being received on an interface that
|
|
# has failed sending, and transport has failed over
|
|
# to another path, we remove this packet hash from
|
|
# the filter hashlist so the link can receive the
|
|
# packet when it finally arrives over another path.
|
|
while packet.packet_hash in Transport.packet_hashlist:
|
|
Transport.packet_hashlist.remove(packet.packet_hash)
|
|
else:
|
|
for destination in Transport.destinations:
|
|
if destination.hash == packet.destination_hash and destination.type == packet.destination_type:
|
|
packet.destination = destination
|
|
destination.receive(packet)
|
|
|
|
if destination.proof_strategy == RNS.Destination.PROVE_ALL:
|
|
packet.prove()
|
|
|
|
elif destination.proof_strategy == RNS.Destination.PROVE_APP:
|
|
if destination.callbacks.proof_requested:
|
|
try:
|
|
if destination.callbacks.proof_requested(packet):
|
|
packet.prove()
|
|
except Exception as e:
|
|
RNS.log(f"Error while executing proof request callback. The contained exception was: {e}", RNS.LOG_ERROR)
|
|
|
|
# Handling for proofs and link-request proofs
|
|
elif packet.packet_type == RNS.Packet.PROOF:
|
|
if packet.context == RNS.Packet.LRPROOF:
|
|
# This is a link request proof, check if it
|
|
# needs to be transported
|
|
if (RNS.Reticulum.transport_enabled() or for_local_client_link or from_local_client) and packet.destination_hash in Transport.link_table:
|
|
link_entry = Transport.link_table[packet.destination_hash]
|
|
if packet.hops == link_entry[3]:
|
|
if packet.receiving_interface == link_entry[2]:
|
|
try:
|
|
if len(packet.data) == RNS.Identity.SIGLENGTH//8+RNS.Link.ECPUBSIZE//2:
|
|
peer_pub_bytes = packet.data[RNS.Identity.SIGLENGTH//8:RNS.Identity.SIGLENGTH//8+RNS.Link.ECPUBSIZE//2]
|
|
peer_identity = RNS.Identity.recall(link_entry[6])
|
|
peer_sig_pub_bytes = peer_identity.get_public_key()[RNS.Link.ECPUBSIZE//2:RNS.Link.ECPUBSIZE]
|
|
|
|
signed_data = packet.destination_hash+peer_pub_bytes+peer_sig_pub_bytes
|
|
signature = packet.data[:RNS.Identity.SIGLENGTH//8]
|
|
|
|
if peer_identity.validate(signature, signed_data):
|
|
RNS.log(f"Link request proof validated for transport via {link_entry[4]}", RNS.LOG_EXTREME)
|
|
new_raw = packet.raw[0:1]
|
|
new_raw += struct.pack("!B", packet.hops)
|
|
new_raw += packet.raw[2:]
|
|
Transport.link_table[packet.destination_hash][7] = True
|
|
Transport.transmit(link_entry[4], new_raw)
|
|
|
|
else:
|
|
RNS.log(f"Invalid link request proof in transport for link {RNS.prettyhexrep(packet.destination_hash)}, dropping proof.", RNS.LOG_DEBUG)
|
|
|
|
except Exception as e:
|
|
RNS.log(f"Error while transporting link request proof. The contained exception was: {e}", RNS.LOG_ERROR)
|
|
|
|
else:
|
|
RNS.log("Link request proof received on wrong interface, not transporting it.", RNS.LOG_DEBUG)
|
|
else:
|
|
RNS.log("Received link request proof with hop mismatch, not transporting it", RNS.LOG_DEBUG)
|
|
else:
|
|
# Check if we can deliver it to a local
|
|
# pending link
|
|
for link in Transport.pending_links:
|
|
if link.link_id == packet.destination_hash:
|
|
# We need to also allow an expected hops value of
|
|
# PATHFINDER_M, since in some cases, the number of hops
|
|
# to the destination will be unknown at link creation
|
|
# time. The real chance of this occuring is likely to be
|
|
# extremely small, and this allowance could probably
|
|
# be discarded without major issues, but it is kept
|
|
# for now to ensure backwards compatibility.
|
|
|
|
# TODO: Probably reset check back to
|
|
# if packet.hops == link.expected_hops:
|
|
# within one of the next releases
|
|
|
|
if packet.hops == link.expected_hops or link.expected_hops == RNS.Transport.PATHFINDER_M:
|
|
# Add this packet to the filter hashlist if we
|
|
# have determined that it's actually destined
|
|
# for this system, and then validate the proof
|
|
Transport.packet_hashlist.append(packet.packet_hash)
|
|
link.validate_proof(packet)
|
|
|
|
elif packet.context == RNS.Packet.RESOURCE_PRF:
|
|
for link in Transport.active_links:
|
|
if link.link_id == packet.destination_hash:
|
|
link.receive(packet)
|
|
else:
|
|
if packet.destination_type == RNS.Destination.LINK:
|
|
for link in Transport.active_links:
|
|
if link.link_id == packet.destination_hash:
|
|
packet.link = link
|
|
|
|
if len(packet.data) == RNS.PacketReceipt.EXPL_LENGTH:
|
|
proof_hash = packet.data[:RNS.Identity.HASHLENGTH//8]
|
|
else:
|
|
proof_hash = None
|
|
|
|
# Check if this proof needs to be transported
|
|
if (RNS.Reticulum.transport_enabled() or from_local_client or proof_for_local_client) and packet.destination_hash in Transport.reverse_table:
|
|
reverse_entry = Transport.reverse_table.pop(packet.destination_hash)
|
|
if packet.receiving_interface == reverse_entry[1]:
|
|
RNS.log(f"Proof received on correct interface, transporting it via {reverse_entry[0]}", RNS.LOG_EXTREME)
|
|
new_raw = packet.raw[0:1]
|
|
new_raw += struct.pack("!B", packet.hops)
|
|
new_raw += packet.raw[2:]
|
|
Transport.transmit(reverse_entry[0], new_raw)
|
|
else:
|
|
RNS.log("Proof received on wrong interface, not transporting it.", RNS.LOG_DEBUG)
|
|
|
|
for receipt in Transport.receipts:
|
|
receipt_validated = False
|
|
if proof_hash != None:
|
|
# Only test validation if hash matches
|
|
if receipt.hash == proof_hash:
|
|
receipt_validated = receipt.validate_proof_packet(packet)
|
|
else:
|
|
# In case of an implicit proof, we have
|
|
# to check every single outstanding receipt
|
|
receipt_validated = receipt.validate_proof_packet(packet)
|
|
|
|
if receipt_validated:
|
|
if receipt in Transport.receipts:
|
|
Transport.receipts.remove(receipt)
|
|
|
|
Transport.jobs_locked = False
|
|
|
|
@staticmethod
|
|
def synthesize_tunnel(interface):
|
|
interface_hash = interface.get_hash()
|
|
public_key = RNS.Transport.identity.get_public_key()
|
|
random_hash = RNS.Identity.get_random_hash()
|
|
|
|
tunnel_id_data = public_key+interface_hash
|
|
tunnel_id = RNS.Identity.full_hash(tunnel_id_data)
|
|
|
|
signed_data = tunnel_id_data+random_hash
|
|
signature = Transport.identity.sign(signed_data)
|
|
|
|
data = signed_data+signature
|
|
|
|
tnl_snth_dst = RNS.Destination(None, RNS.Destination.OUT, RNS.Destination.PLAIN, Transport.APP_NAME, "tunnel", "synthesize")
|
|
|
|
packet = RNS.Packet(tnl_snth_dst, data, packet_type = RNS.Packet.DATA, transport_type = RNS.Transport.BROADCAST, header_type = RNS.Packet.HEADER_1, attached_interface = interface)
|
|
packet.send()
|
|
|
|
interface.wants_tunnel = False
|
|
|
|
@staticmethod
|
|
def tunnel_synthesize_handler(data, packet):
|
|
try:
|
|
expected_length = RNS.Identity.KEYSIZE//8+RNS.Identity.HASHLENGTH//8+RNS.Reticulum.TRUNCATED_HASHLENGTH//8+RNS.Identity.SIGLENGTH//8
|
|
if len(data) == expected_length:
|
|
public_key = data[:RNS.Identity.KEYSIZE//8]
|
|
interface_hash = data[RNS.Identity.KEYSIZE//8:RNS.Identity.KEYSIZE//8+RNS.Identity.HASHLENGTH//8]
|
|
tunnel_id_data = public_key+interface_hash
|
|
tunnel_id = RNS.Identity.full_hash(tunnel_id_data)
|
|
random_hash = data[RNS.Identity.KEYSIZE//8+RNS.Identity.HASHLENGTH//8:RNS.Identity.KEYSIZE//8+RNS.Identity.HASHLENGTH//8+RNS.Reticulum.TRUNCATED_HASHLENGTH//8]
|
|
|
|
signature = data[RNS.Identity.KEYSIZE//8+RNS.Identity.HASHLENGTH//8+RNS.Reticulum.TRUNCATED_HASHLENGTH//8:expected_length]
|
|
signed_data = tunnel_id_data+random_hash
|
|
|
|
remote_transport_identity = RNS.Identity(create_keys=False)
|
|
remote_transport_identity.load_public_key(public_key)
|
|
|
|
if remote_transport_identity.validate(signature, signed_data):
|
|
Transport.handle_tunnel(tunnel_id, packet.receiving_interface)
|
|
|
|
except Exception as e:
|
|
RNS.log("An error occurred while validating tunnel establishment packet.", RNS.LOG_DEBUG)
|
|
RNS.log(f"The contained exception was: {e}", RNS.LOG_DEBUG)
|
|
|
|
@staticmethod
|
|
def handle_tunnel(tunnel_id, interface):
|
|
expires = time.time() + Transport.DESTINATION_TIMEOUT
|
|
if not tunnel_id in Transport.tunnels:
|
|
RNS.log(f"Tunnel endpoint {RNS.prettyhexrep(tunnel_id)} established.", RNS.LOG_DEBUG)
|
|
paths = {}
|
|
tunnel_entry = [tunnel_id, interface, paths, expires]
|
|
interface.tunnel_id = tunnel_id
|
|
Transport.tunnels[tunnel_id] = tunnel_entry
|
|
else:
|
|
RNS.log(f"Tunnel endpoint {RNS.prettyhexrep(tunnel_id)} reappeared. Restoring paths...", RNS.LOG_DEBUG)
|
|
tunnel_entry = Transport.tunnels[tunnel_id]
|
|
tunnel_entry[1] = interface
|
|
tunnel_entry[3] = expires
|
|
interface.tunnel_id = tunnel_id
|
|
paths = tunnel_entry[2]
|
|
|
|
deprecated_paths = []
|
|
for destination_hash, path_entry in paths.items():
|
|
received_from = path_entry[1]
|
|
announce_hops = path_entry[2]
|
|
expires = path_entry[3]
|
|
random_blobs = path_entry[4]
|
|
receiving_interface = interface
|
|
packet = path_entry[6]
|
|
new_entry = [time.time(), received_from, announce_hops, expires, random_blobs, receiving_interface, packet]
|
|
|
|
should_add = False
|
|
if destination_hash in Transport.destination_table:
|
|
old_entry = Transport.destination_table[destination_hash]
|
|
old_hops = old_entry[2]
|
|
old_expires = old_entry[3]
|
|
if announce_hops <= old_hops or time.time() > old_expires:
|
|
should_add = True
|
|
else:
|
|
RNS.log(f"Did not restore path to {RNS.prettyhexrep(packet.destination_hash)} because a newer path with fewer hops exist", RNS.LOG_DEBUG)
|
|
else:
|
|
if time.time() < expires:
|
|
should_add = True
|
|
else:
|
|
RNS.log(f"Did not restore path to {RNS.prettyhexrep(packet.destination_hash)} because it has expired", RNS.LOG_DEBUG)
|
|
|
|
if should_add:
|
|
Transport.destination_table[destination_hash] = new_entry
|
|
RNS.log(f"Restored path to {RNS.prettyhexrep(packet.destination_hash)} is now {announce_hops} hops away via {RNS.prettyhexrep(received_from)} on {receiving_interface}", RNS.LOG_DEBUG)
|
|
else:
|
|
deprecated_paths.append(destination_hash)
|
|
|
|
for deprecated_path in deprecated_paths:
|
|
RNS.log(f"Removing path to {RNS.prettyhexrep(deprecated_path)} from tunnel {RNS.prettyhexrep(tunnel_id)}", RNS.LOG_DEBUG)
|
|
paths.pop(deprecated_path)
|
|
|
|
@staticmethod
|
|
def register_destination(destination):
|
|
destination.MTU = RNS.Reticulum.MTU
|
|
if destination.direction == RNS.Destination.IN:
|
|
for registered_destination in Transport.destinations:
|
|
if destination.hash == registered_destination.hash:
|
|
raise KeyError("Attempt to register an already registered destination.")
|
|
|
|
Transport.destinations.append(destination)
|
|
|
|
if Transport.owner.is_connected_to_shared_instance:
|
|
if destination.type == RNS.Destination.SINGLE:
|
|
destination.announce(path_response=True)
|
|
|
|
@staticmethod
|
|
def deregister_destination(destination):
|
|
if destination in Transport.destinations:
|
|
Transport.destinations.remove(destination)
|
|
|
|
@staticmethod
|
|
def register_link(link):
|
|
RNS.log(f"Registering link {link}", RNS.LOG_EXTREME)
|
|
if link.initiator:
|
|
Transport.pending_links.append(link)
|
|
else:
|
|
Transport.active_links.append(link)
|
|
|
|
@staticmethod
|
|
def activate_link(link):
|
|
RNS.log(f"Activating link {link}", RNS.LOG_EXTREME)
|
|
if link in Transport.pending_links:
|
|
if link.status != RNS.Link.ACTIVE:
|
|
raise OSError(f"Invalid link state for link activation: {link.status}")
|
|
Transport.pending_links.remove(link)
|
|
Transport.active_links.append(link)
|
|
link.status = RNS.Link.ACTIVE
|
|
else:
|
|
RNS.log("Attempted to activate a link that was not in the pending table", RNS.LOG_ERROR)
|
|
|
|
@staticmethod
|
|
def register_announce_handler(handler):
|
|
"""
|
|
Registers an announce handler.
|
|
|
|
:param handler: Must be an object with an *aspect_filter* attribute and a *received_announce(destination_hash, announced_identity, app_data)*
|
|
callable. Can optionally have a *receive_path_responses* attribute set to ``True``, to also receive all path responses, in addition to live
|
|
announces. See the :ref:`Announce Example<example-announce>` for more info.
|
|
"""
|
|
if hasattr(handler, "received_announce") and callable(handler.received_announce):
|
|
if hasattr(handler, "aspect_filter"):
|
|
Transport.announce_handlers.append(handler)
|
|
|
|
@staticmethod
|
|
def deregister_announce_handler(handler):
|
|
"""
|
|
Deregisters an announce handler.
|
|
|
|
:param handler: The announce handler to be deregistered.
|
|
"""
|
|
while handler in Transport.announce_handlers:
|
|
Transport.announce_handlers.remove(handler)
|
|
|
|
@staticmethod
|
|
def find_interface_from_hash(interface_hash):
|
|
for interface in Transport.interfaces:
|
|
if interface.get_hash() == interface_hash:
|
|
return interface
|
|
|
|
return None
|
|
|
|
@staticmethod
|
|
def should_cache(packet):
|
|
# TODO: Rework the caching system. It's currently
|
|
# not very useful to even cache Resource proofs,
|
|
# disabling it for now, until redesigned.
|
|
# if packet.context == RNS.Packet.RESOURCE_PRF:
|
|
# return True
|
|
|
|
return False
|
|
|
|
# When caching packets to storage, they are written
|
|
# exactly as they arrived over their interface. This
|
|
# means that they have not had their hop count
|
|
# increased yet! Take note of this when reading from
|
|
# the packet cache.
|
|
@staticmethod
|
|
def cache(packet, force_cache=False):
|
|
if RNS.Transport.should_cache(packet) or force_cache:
|
|
try:
|
|
packet_hash = RNS.hexrep(packet.get_hash(), delimit=False)
|
|
interface_reference = None
|
|
if packet.receiving_interface != None:
|
|
interface_reference = str(packet.receiving_interface)
|
|
|
|
file = open(f"{RNS.Reticulum.cachepath}/{packet_hash}", "wb")
|
|
file.write(umsgpack.packb([packet.raw, interface_reference]))
|
|
file.close()
|
|
|
|
except Exception as e:
|
|
RNS.log(f"Error writing packet to cache. The contained exception was: {e}", RNS.LOG_ERROR)
|
|
|
|
@staticmethod
|
|
def get_cached_packet(packet_hash):
|
|
try:
|
|
packet_hash = RNS.hexrep(packet_hash, delimit=False)
|
|
path = f"{RNS.Reticulum.cachepath}/{packet_hash}"
|
|
|
|
if os.path.isfile(path):
|
|
file = open(path, "rb")
|
|
cached_data = umsgpack.unpackb(file.read())
|
|
file.close()
|
|
|
|
packet = RNS.Packet(None, cached_data[0])
|
|
interface_reference = cached_data[1]
|
|
|
|
for interface in Transport.interfaces:
|
|
if str(interface) == interface_reference:
|
|
packet.receiving_interface = interface
|
|
|
|
return packet
|
|
else:
|
|
return None
|
|
except Exception as e:
|
|
RNS.log("Exception occurred while getting cached packet.", RNS.LOG_ERROR)
|
|
RNS.log(f"The contained exception was: {e}", RNS.LOG_ERROR)
|
|
|
|
@staticmethod
|
|
def cache_request_packet(packet):
|
|
if len(packet.data) == RNS.Identity.HASHLENGTH/8:
|
|
packet = Transport.get_cached_packet(packet.data)
|
|
|
|
if packet != None:
|
|
# If the packet was retrieved from the local
|
|
# cache, replay it to the Transport instance,
|
|
# so that it can be directed towards it original
|
|
# destination.
|
|
Transport.inbound(packet.raw, packet.receiving_interface)
|
|
return True
|
|
else:
|
|
return False
|
|
else:
|
|
return False
|
|
|
|
@staticmethod
|
|
def cache_request(packet_hash, destination):
|
|
cached_packet = Transport.get_cached_packet(packet_hash)
|
|
if cached_packet:
|
|
# The packet was found in the local cache,
|
|
# replay it to the Transport instance.
|
|
Transport.inbound(packet.raw, packet.receiving_interface)
|
|
else:
|
|
# The packet is not in the local cache,
|
|
# query the network.
|
|
RNS.Packet(destination, packet_hash, context = RNS.Packet.CACHE_REQUEST).send()
|
|
|
|
@staticmethod
|
|
def has_path(destination_hash):
|
|
"""
|
|
:param destination_hash: A destination hash as *bytes*.
|
|
:returns: *True* if a path to the destination is known, otherwise *False*.
|
|
"""
|
|
if destination_hash in Transport.destination_table:
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
@staticmethod
|
|
def hops_to(destination_hash):
|
|
"""
|
|
:param destination_hash: A destination hash as *bytes*.
|
|
:returns: The number of hops to the specified destination, or ``RNS.Transport.PATHFINDER_M`` if the number of hops is unknown.
|
|
"""
|
|
if destination_hash in Transport.destination_table:
|
|
return Transport.destination_table[destination_hash][2]
|
|
else:
|
|
return Transport.PATHFINDER_M
|
|
|
|
@staticmethod
|
|
def next_hop(destination_hash):
|
|
"""
|
|
:param destination_hash: A destination hash as *bytes*.
|
|
:returns: The destination hash as *bytes* for the next hop to the specified destination, or *None* if the next hop is unknown.
|
|
"""
|
|
if destination_hash in Transport.destination_table:
|
|
return Transport.destination_table[destination_hash][1]
|
|
else:
|
|
return None
|
|
|
|
@staticmethod
|
|
def next_hop_interface(destination_hash):
|
|
"""
|
|
:param destination_hash: A destination hash as *bytes*.
|
|
:returns: The interface for the next hop to the specified destination, or *None* if the interface is unknown.
|
|
"""
|
|
if destination_hash in Transport.destination_table:
|
|
return Transport.destination_table[destination_hash][5]
|
|
else:
|
|
return None
|
|
|
|
@staticmethod
|
|
def next_hop_interface_bitrate(destination_hash):
|
|
next_hop_interface = Transport.next_hop_interface(destination_hash)
|
|
if next_hop_interface != None:
|
|
return next_hop_interface.bitrate
|
|
else:
|
|
return None
|
|
|
|
@staticmethod
|
|
def next_hop_per_bit_latency(destination_hash):
|
|
next_hop_interface_bitrate = Transport.next_hop_interface_bitrate(destination_hash)
|
|
if next_hop_interface_bitrate != None:
|
|
return (1/next_hop_interface_bitrate)
|
|
else:
|
|
return None
|
|
|
|
@staticmethod
|
|
def next_hop_per_byte_latency(destination_hash):
|
|
per_bit_latency = Transport.next_hop_per_bit_latency(destination_hash)
|
|
if per_bit_latency != None:
|
|
return per_bit_latency*8
|
|
else:
|
|
return None
|
|
|
|
@staticmethod
|
|
def first_hop_timeout(destination_hash):
|
|
latency = Transport.next_hop_per_byte_latency(destination_hash)
|
|
if latency != None:
|
|
return RNS.Reticulum.MTU * latency + RNS.Reticulum.DEFAULT_PER_HOP_TIMEOUT
|
|
else:
|
|
return RNS.Reticulum.DEFAULT_PER_HOP_TIMEOUT
|
|
|
|
@staticmethod
|
|
def extra_link_proof_timeout(interface):
|
|
if interface != None:
|
|
return ((1/interface.bitrate)*8)*RNS.Reticulum.MTU
|
|
else:
|
|
return 0
|
|
|
|
@staticmethod
|
|
def expire_path(destination_hash):
|
|
if destination_hash in Transport.destination_table:
|
|
Transport.destination_table[destination_hash][0] = 0
|
|
Transport.tables_last_culled = 0
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
@staticmethod
|
|
def mark_path_unresponsive(destination_hash):
|
|
if destination_hash in Transport.destination_table:
|
|
Transport.path_states[destination_hash] = Transport.STATE_UNRESPONSIVE
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
@staticmethod
|
|
def mark_path_responsive(destination_hash):
|
|
if destination_hash in Transport.destination_table:
|
|
Transport.path_states[destination_hash] = Transport.STATE_RESPONSIVE
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
@staticmethod
|
|
def mark_path_unknown_state(destination_hash):
|
|
if destination_hash in Transport.destination_table:
|
|
Transport.path_states[destination_hash] = Transport.STATE_UNKNOWN
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
@staticmethod
|
|
def path_is_unresponsive(destination_hash):
|
|
if destination_hash in Transport.path_states:
|
|
if Transport.path_states[destination_hash] == Transport.STATE_UNRESPONSIVE:
|
|
return True
|
|
|
|
return False
|
|
|
|
@staticmethod
|
|
def request_path(destination_hash, on_interface=None, tag=None, recursive=False):
|
|
"""
|
|
Requests a path to the destination from the network. If
|
|
another reachable peer on the network knows a path, it
|
|
will announce it.
|
|
|
|
:param destination_hash: A destination hash as *bytes*.
|
|
:param on_interface: If specified, the path request will only be sent on this interface. In normal use, Reticulum handles this automatically, and this parameter should not be used.
|
|
"""
|
|
if tag == None:
|
|
request_tag = RNS.Identity.get_random_hash()
|
|
else:
|
|
request_tag = tag
|
|
|
|
if RNS.Reticulum.transport_enabled():
|
|
path_request_data = destination_hash+Transport.identity.hash+request_tag
|
|
else:
|
|
path_request_data = destination_hash+request_tag
|
|
|
|
path_request_dst = RNS.Destination(None, RNS.Destination.OUT, RNS.Destination.PLAIN, Transport.APP_NAME, "path", "request")
|
|
packet = RNS.Packet(path_request_dst, path_request_data, packet_type = RNS.Packet.DATA, transport_type = RNS.Transport.BROADCAST, header_type = RNS.Packet.HEADER_1, attached_interface = on_interface)
|
|
|
|
if on_interface != None and recursive:
|
|
if not hasattr(on_interface, "announce_cap"):
|
|
on_interface.announce_cap = RNS.Reticulum.ANNOUNCE_CAP
|
|
|
|
if not hasattr(on_interface, "announce_allowed_at"):
|
|
on_interface.announce_allowed_at = 0
|
|
|
|
if not hasattr(on_interface, "announce_queue"):
|
|
on_interface.announce_queue = []
|
|
|
|
queued_announces = True if len(on_interface.announce_queue) > 0 else False
|
|
if queued_announces:
|
|
RNS.log(f"Blocking recursive path request on {on_interface} due to queued announces", RNS.LOG_EXTREME)
|
|
return
|
|
else:
|
|
now = time.time()
|
|
if now < on_interface.announce_allowed_at:
|
|
RNS.log(f"Blocking recursive path request on {on_interface} due to active announce cap", RNS.LOG_EXTREME)
|
|
return
|
|
else:
|
|
tx_time = ((len(path_request_data)+RNS.Reticulum.HEADER_MINSIZE)*8) / on_interface.bitrate
|
|
wait_time = (tx_time / on_interface.announce_cap)
|
|
on_interface.announce_allowed_at = now + wait_time
|
|
|
|
packet.send()
|
|
Transport.path_requests[destination_hash] = time.time()
|
|
|
|
@staticmethod
|
|
def remote_status_handler(path, data, request_id, link_id, remote_identity, requested_at):
|
|
if remote_identity != None:
|
|
response = None
|
|
try:
|
|
if isinstance(data, list) and len(data) > 0:
|
|
response = []
|
|
response.append(Transport.owner.get_interface_stats())
|
|
if data[0] == True:
|
|
response.append(Transport.owner.get_link_count())
|
|
|
|
return response
|
|
|
|
except Exception as e:
|
|
RNS.log(f"An error occurred while processing remote status request from {remote_identity}", RNS.LOG_ERROR)
|
|
RNS.log(f"The contained exception was: {e}", RNS.LOG_ERROR)
|
|
|
|
return None
|
|
|
|
@staticmethod
|
|
def remote_path_handler(path, data, request_id, link_id, remote_identity, requested_at):
|
|
if remote_identity != None:
|
|
response = None
|
|
try:
|
|
if isinstance(data, list) and len(data) > 0:
|
|
command = data[0]
|
|
destination_hash = None
|
|
max_hops = None
|
|
if len(data) > 1:
|
|
destination_hash = data[1]
|
|
if len(data) > 2:
|
|
max_hops = data[2]
|
|
|
|
if command == "table":
|
|
table = Transport.owner.get_path_table(max_hops=max_hops)
|
|
response = []
|
|
for path in table:
|
|
if destination_hash == None or destination_hash == path["hash"]:
|
|
response.append(path)
|
|
|
|
elif command == "rates":
|
|
table = Transport.owner.get_rate_table()
|
|
response = []
|
|
for path in table:
|
|
if destination_hash == None or destination_hash == path["hash"]:
|
|
response.append(path)
|
|
|
|
return response
|
|
|
|
except Exception as e:
|
|
RNS.log(f"An error occurred while processing remote status request from {remote_identity}", RNS.LOG_ERROR)
|
|
RNS.log(f"The contained exception was: {e}", RNS.LOG_ERROR)
|
|
|
|
return None
|
|
|
|
@staticmethod
|
|
def path_request_handler(data, packet):
|
|
try:
|
|
# If there is at least bytes enough for a destination
|
|
# hash in the packet, we assume those bytes are the
|
|
# destination being requested.
|
|
if len(data) >= RNS.Identity.TRUNCATED_HASHLENGTH//8:
|
|
destination_hash = data[:RNS.Identity.TRUNCATED_HASHLENGTH//8]
|
|
# If there is also enough bytes for a transport
|
|
# instance ID and at least one tag byte, we
|
|
# assume the next bytes to be the trasport ID
|
|
# of the requesting transport instance.
|
|
if len(data) > (RNS.Identity.TRUNCATED_HASHLENGTH//8)*2:
|
|
requesting_transport_instance = data[RNS.Identity.TRUNCATED_HASHLENGTH//8:(RNS.Identity.TRUNCATED_HASHLENGTH//8)*2]
|
|
else:
|
|
requesting_transport_instance = None
|
|
|
|
tag_bytes = None
|
|
if len(data) > (RNS.Identity.TRUNCATED_HASHLENGTH//8)*2:
|
|
tag_bytes = data[RNS.Identity.TRUNCATED_HASHLENGTH//8*2:]
|
|
|
|
elif len(data) > (RNS.Identity.TRUNCATED_HASHLENGTH//8):
|
|
tag_bytes = data[RNS.Identity.TRUNCATED_HASHLENGTH//8:]
|
|
|
|
if tag_bytes != None:
|
|
if len(tag_bytes) > RNS.Identity.TRUNCATED_HASHLENGTH//8:
|
|
tag_bytes = tag_bytes[:RNS.Identity.TRUNCATED_HASHLENGTH//8]
|
|
|
|
unique_tag = destination_hash+tag_bytes
|
|
|
|
if not unique_tag in Transport.discovery_pr_tags:
|
|
Transport.discovery_pr_tags.append(unique_tag)
|
|
|
|
Transport.path_request(
|
|
destination_hash,
|
|
Transport.from_local_client(packet),
|
|
packet.receiving_interface,
|
|
requestor_transport_id = requesting_transport_instance,
|
|
tag=tag_bytes
|
|
)
|
|
|
|
else:
|
|
RNS.log(f"Ignoring duplicate path request for {RNS.prettyhexrep(destination_hash)} with tag {RNS.prettyhexrep(unique_tag)}", RNS.LOG_DEBUG)
|
|
|
|
else:
|
|
RNS.log(f"Ignoring tagless path request for {RNS.prettyhexrep(destination_hash)}", RNS.LOG_DEBUG)
|
|
|
|
except Exception as e:
|
|
RNS.log(f"Error while handling path request. The contained exception was: {e}", RNS.LOG_ERROR)
|
|
|
|
@staticmethod
|
|
def path_request(destination_hash, is_from_local_client, attached_interface, requestor_transport_id=None, tag=None):
|
|
should_search_for_unknown = False
|
|
|
|
if attached_interface != None:
|
|
if RNS.Reticulum.transport_enabled() and attached_interface.mode in RNS.Interfaces.Interface.Interface.DISCOVER_PATHS_FOR:
|
|
should_search_for_unknown = True
|
|
|
|
interface_str = f" on {attached_interface}"
|
|
else:
|
|
interface_str = ""
|
|
|
|
RNS.log(f"Path request for {RNS.prettyhexrep(destination_hash)}{interface_str}", RNS.LOG_DEBUG)
|
|
|
|
destination_exists_on_local_client = False
|
|
if len(Transport.local_client_interfaces) > 0:
|
|
if destination_hash in Transport.destination_table:
|
|
destination_interface = Transport.destination_table[destination_hash][5]
|
|
|
|
if Transport.is_local_client_interface(destination_interface):
|
|
destination_exists_on_local_client = True
|
|
Transport.pending_local_path_requests[destination_hash] = attached_interface
|
|
|
|
local_destination = next((d for d in Transport.destinations if d.hash == destination_hash), None)
|
|
if local_destination != None:
|
|
local_destination.announce(path_response=True, tag=tag, attached_interface=attached_interface)
|
|
RNS.log(f"Answering path request for {RNS.prettyhexrep(destination_hash)}{interface_str}, destination is local to this system", RNS.LOG_DEBUG)
|
|
|
|
elif (RNS.Reticulum.transport_enabled() or is_from_local_client) and (destination_hash in Transport.destination_table):
|
|
packet = Transport.destination_table[destination_hash][6]
|
|
next_hop = Transport.destination_table[destination_hash][1]
|
|
received_from = Transport.destination_table[destination_hash][5]
|
|
|
|
if attached_interface.mode == RNS.Interfaces.Interface.Interface.MODE_ROAMING and attached_interface == received_from:
|
|
RNS.log("Not answering path request on roaming-mode interface, since next hop is on same roaming-mode interface", RNS.LOG_DEBUG)
|
|
|
|
else:
|
|
if requestor_transport_id != None and next_hop == requestor_transport_id:
|
|
# TODO: Find a bandwidth efficient way to invalidate our
|
|
# known path on this signal. The obvious way of signing
|
|
# path requests with transport instance keys is quite
|
|
# inefficient. There is probably a better way. Doing
|
|
# path invalidation here would decrease the network
|
|
# convergence time. Maybe just drop it?
|
|
RNS.log(f"Not answering path request for {RNS.prettyhexrep(destination_hash)}{interface_str}, since next hop is the requestor", RNS.LOG_DEBUG)
|
|
else:
|
|
RNS.log(f"Answering path request for {RNS.prettyhexrep(destination_hash)}{interface_str}, path is known", RNS.LOG_DEBUG)
|
|
|
|
now = time.time()
|
|
retries = Transport.PATHFINDER_R
|
|
local_rebroadcasts = 0
|
|
block_rebroadcasts = True
|
|
announce_hops = packet.hops
|
|
|
|
if is_from_local_client:
|
|
retransmit_timeout = now
|
|
else:
|
|
if Transport.is_local_client_interface(Transport.next_hop_interface(destination_hash)):
|
|
RNS.log(f"Path request destination {RNS.prettyhexrep(destination_hash)} is on a local client interface, rebroadcasting immediately", RNS.LOG_EXTREME)
|
|
retransmit_timeout = now
|
|
|
|
else:
|
|
retransmit_timeout = now + Transport.PATH_REQUEST_GRACE
|
|
|
|
# If we are answering on a roaming-mode interface, wait a
|
|
# little longer, to allow potential more well-connected
|
|
# peers to answer first.
|
|
if attached_interface.mode == RNS.Interfaces.Interface.Interface.MODE_ROAMING:
|
|
retransmit_timeout += Transport.PATH_REQUEST_RG
|
|
|
|
# This handles an edge case where a peer sends a past
|
|
# request for a destination just after an announce for
|
|
# said destination has arrived, but before it has been
|
|
# rebroadcast locally. In such a case the actual announce
|
|
# is temporarily held, and then reinserted when the path
|
|
# request has been served to the peer.
|
|
if packet.destination_hash in Transport.announce_table:
|
|
held_entry = Transport.announce_table[packet.destination_hash]
|
|
Transport.held_announces[packet.destination_hash] = held_entry
|
|
|
|
Transport.announce_table[packet.destination_hash] = [now, retransmit_timeout, retries, received_from, announce_hops, packet, local_rebroadcasts, block_rebroadcasts, attached_interface]
|
|
|
|
elif is_from_local_client:
|
|
# Forward path request on all interfaces
|
|
# except the local client
|
|
RNS.log(f"Forwarding path request from local client for {RNS.prettyhexrep(destination_hash)}{interface_str} to all other interfaces", RNS.LOG_DEBUG)
|
|
request_tag = RNS.Identity.get_random_hash()
|
|
for interface in Transport.interfaces:
|
|
if not interface == attached_interface:
|
|
Transport.request_path(destination_hash, interface, tag = request_tag)
|
|
|
|
elif should_search_for_unknown:
|
|
if destination_hash in Transport.discovery_path_requests:
|
|
RNS.log(f"There is already a waiting path request for {RNS.prettyhexrep(destination_hash)} on behalf of path request{interface_str}", RNS.LOG_DEBUG)
|
|
else:
|
|
# Forward path request on all interfaces
|
|
# except the requestor interface
|
|
RNS.log(f"Attempting to discover unknown path to {RNS.prettyhexrep(destination_hash)} on behalf of path request{interface_str}", RNS.LOG_DEBUG)
|
|
pr_entry = { "destination_hash": destination_hash, "timeout": time.time()+Transport.PATH_REQUEST_TIMEOUT, "requesting_interface": attached_interface }
|
|
Transport.discovery_path_requests[destination_hash] = pr_entry
|
|
|
|
for interface in Transport.interfaces:
|
|
if not interface == attached_interface:
|
|
# Use the previously extracted tag from this path request
|
|
# on the new path requests as well, to avoid potential loops
|
|
Transport.request_path(destination_hash, on_interface=interface, tag=tag, recursive=True)
|
|
|
|
elif not is_from_local_client and len(Transport.local_client_interfaces) > 0:
|
|
# Forward the path request on all local
|
|
# client interfaces
|
|
RNS.log(f"Forwarding path request for {RNS.prettyhexrep(destination_hash)}{interface_str} to local clients", RNS.LOG_DEBUG)
|
|
for interface in Transport.local_client_interfaces:
|
|
Transport.request_path(destination_hash, on_interface=interface)
|
|
|
|
else:
|
|
RNS.log(f"Ignoring path request for {RNS.prettyhexrep(destination_hash)}{interface_str}, no path known", RNS.LOG_DEBUG)
|
|
|
|
@staticmethod
|
|
def from_local_client(packet):
|
|
if hasattr(packet.receiving_interface, "parent_interface"):
|
|
return Transport.is_local_client_interface(packet.receiving_interface)
|
|
else:
|
|
return False
|
|
|
|
@staticmethod
|
|
def is_local_client_interface(interface):
|
|
if hasattr(interface, "parent_interface"):
|
|
if hasattr(interface.parent_interface, "is_local_shared_instance"):
|
|
return True
|
|
else:
|
|
return False
|
|
else:
|
|
return False
|
|
|
|
@staticmethod
|
|
def interface_to_shared_instance(interface):
|
|
if hasattr(interface, "is_connected_to_shared_instance"):
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
@staticmethod
|
|
def detach_interfaces():
|
|
detachable_interfaces = []
|
|
|
|
for interface in Transport.interfaces:
|
|
# Currently no rules are being applied
|
|
# here, and all interfaces will be sent
|
|
# the detach call on RNS teardown.
|
|
if True:
|
|
detachable_interfaces.append(interface)
|
|
else:
|
|
pass
|
|
|
|
for interface in Transport.local_client_interfaces:
|
|
# Currently no rules are being applied
|
|
# here, and all interfaces will be sent
|
|
# the detach call on RNS teardown.
|
|
if True:
|
|
detachable_interfaces.append(interface)
|
|
else:
|
|
pass
|
|
|
|
for interface in detachable_interfaces:
|
|
try:
|
|
interface.detach()
|
|
except Exception as e:
|
|
RNS.log(f"An error occurred while detaching {interface}. The contained exception was: {e}", RNS.LOG_ERROR)
|
|
|
|
@staticmethod
|
|
def shared_connection_disappeared():
|
|
for link in Transport.active_links:
|
|
link.teardown()
|
|
|
|
for link in Transport.pending_links:
|
|
link.teardown()
|
|
|
|
Transport.announce_table = {}
|
|
Transport.destination_table = {}
|
|
Transport.reverse_table = {}
|
|
Transport.link_table = {}
|
|
Transport.held_announces = {}
|
|
Transport.announce_handlers = []
|
|
Transport.tunnels = {}
|
|
|
|
|
|
@staticmethod
|
|
def shared_connection_reappeared():
|
|
if Transport.owner.is_connected_to_shared_instance:
|
|
for registered_destination in Transport.destinations:
|
|
if registered_destination.type == RNS.Destination.SINGLE:
|
|
registered_destination.announce(path_response=True)
|
|
|
|
|
|
@staticmethod
|
|
def drop_announce_queues():
|
|
for interface in Transport.interfaces:
|
|
if hasattr(interface, "announce_queue") and interface.announce_queue != None:
|
|
na = len(interface.announce_queue)
|
|
if na > 0:
|
|
if na == 1:
|
|
na_str = "1 announce"
|
|
else:
|
|
na_str = f"{na} announces"
|
|
|
|
interface.announce_queue = []
|
|
RNS.log(f"Dropped {na_str} on {interface}", RNS.LOG_VERBOSE)
|
|
|
|
|
|
@staticmethod
|
|
def announce_emitted(packet):
|
|
random_blob = packet.data[RNS.Identity.KEYSIZE//8+RNS.Identity.NAME_HASH_LENGTH//8:RNS.Identity.KEYSIZE//8+RNS.Identity.NAME_HASH_LENGTH//8+10]
|
|
announce_emitted = int.from_bytes(random_blob[5:10], "big")
|
|
|
|
return announce_emitted
|
|
|
|
|
|
@staticmethod
|
|
def save_packet_hashlist():
|
|
if not Transport.owner.is_connected_to_shared_instance:
|
|
if hasattr(Transport, "saving_packet_hashlist"):
|
|
wait_interval = 0.2
|
|
wait_timeout = 5
|
|
wait_start = time.time()
|
|
while Transport.saving_packet_hashlist:
|
|
time.sleep(wait_interval)
|
|
if time.time() > wait_start+wait_timeout:
|
|
RNS.log("Could not save packet hashlist to storage, waiting for previous save operation timed out.", RNS.LOG_ERROR)
|
|
return False
|
|
|
|
try:
|
|
Transport.saving_packet_hashlist = True
|
|
save_start = time.time()
|
|
|
|
if not RNS.Reticulum.transport_enabled():
|
|
Transport.packet_hashlist = []
|
|
else:
|
|
RNS.log("Saving packet hashlist to storage...", RNS.LOG_DEBUG)
|
|
|
|
packet_hashlist_path = f"{RNS.Reticulum.storagepath}/packet_hashlist"
|
|
file = open(packet_hashlist_path, "wb")
|
|
file.write(umsgpack.packb(Transport.packet_hashlist))
|
|
file.close()
|
|
|
|
save_time = time.time() - save_start
|
|
if save_time < 1:
|
|
time_str = f"{round(save_time * 1000, 2)}ms"
|
|
else:
|
|
time_str = f"{round(save_time, 2)}s"
|
|
RNS.log(f"Saved packet hashlist in {time_str}", RNS.LOG_DEBUG)
|
|
|
|
except Exception as e:
|
|
RNS.log(f"Could not save packet hashlist to storage, the contained exception was: {e}", RNS.LOG_ERROR)
|
|
|
|
Transport.saving_packet_hashlist = False
|
|
|
|
|
|
@staticmethod
|
|
def save_path_table():
|
|
if not Transport.owner.is_connected_to_shared_instance:
|
|
if hasattr(Transport, "saving_path_table"):
|
|
wait_interval = 0.2
|
|
wait_timeout = 5
|
|
wait_start = time.time()
|
|
while Transport.saving_path_table:
|
|
time.sleep(wait_interval)
|
|
if time.time() > wait_start+wait_timeout:
|
|
RNS.log("Could not save path table to storage, waiting for previous save operation timed out.", RNS.LOG_ERROR)
|
|
return False
|
|
|
|
try:
|
|
Transport.saving_path_table = True
|
|
save_start = time.time()
|
|
RNS.log("Saving path table to storage...", RNS.LOG_DEBUG)
|
|
|
|
serialised_destinations = []
|
|
for destination_hash in Transport.destination_table:
|
|
# Get the destination entry from the destination table
|
|
de = Transport.destination_table[destination_hash]
|
|
interface_hash = de[5].get_hash()
|
|
|
|
# Only store destination table entry if the associated
|
|
# interface is still active
|
|
interface = Transport.find_interface_from_hash(interface_hash)
|
|
if interface != None:
|
|
# Get the destination entry from the destination table
|
|
de = Transport.destination_table[destination_hash]
|
|
timestamp = de[0]
|
|
received_from = de[1]
|
|
hops = de[2]
|
|
expires = de[3]
|
|
random_blobs = de[4]
|
|
packet_hash = de[6].get_hash()
|
|
|
|
serialised_entry = [
|
|
destination_hash,
|
|
timestamp,
|
|
received_from,
|
|
hops,
|
|
expires,
|
|
random_blobs,
|
|
interface_hash,
|
|
packet_hash
|
|
]
|
|
|
|
serialised_destinations.append(serialised_entry)
|
|
|
|
Transport.cache(de[6], force_cache=True)
|
|
|
|
destination_table_path = f"{RNS.Reticulum.storagepath}/destination_table"
|
|
file = open(destination_table_path, "wb")
|
|
file.write(umsgpack.packb(serialised_destinations))
|
|
file.close()
|
|
|
|
save_time = time.time() - save_start
|
|
if save_time < 1:
|
|
time_str = f"{round(save_time * 1000, 2)}ms"
|
|
else:
|
|
time_str = f"{round(save_time, 2)}s"
|
|
RNS.log(f"Saved {len(serialised_destinations)} path table entries in {time_str}", RNS.LOG_DEBUG)
|
|
|
|
except Exception as e:
|
|
RNS.log(f"Could not save path table to storage, the contained exception was: {e}", RNS.LOG_ERROR)
|
|
|
|
Transport.saving_path_table = False
|
|
|
|
|
|
@staticmethod
|
|
def save_tunnel_table():
|
|
if not Transport.owner.is_connected_to_shared_instance:
|
|
if hasattr(Transport, "saving_tunnel_table"):
|
|
wait_interval = 0.2
|
|
wait_timeout = 5
|
|
wait_start = time.time()
|
|
while Transport.saving_tunnel_table:
|
|
time.sleep(wait_interval)
|
|
if time.time() > wait_start+wait_timeout:
|
|
RNS.log("Could not save tunnel table to storage, waiting for previous save operation timed out.", RNS.LOG_ERROR)
|
|
return False
|
|
|
|
try:
|
|
Transport.saving_tunnel_table = True
|
|
save_start = time.time()
|
|
RNS.log("Saving tunnel table to storage...", RNS.LOG_DEBUG)
|
|
|
|
serialised_tunnels = []
|
|
for tunnel_id in Transport.tunnels:
|
|
te = Transport.tunnels[tunnel_id]
|
|
interface = te[1]
|
|
tunnel_paths = te[2]
|
|
expires = te[3]
|
|
|
|
if interface != None:
|
|
interface_hash = interface.get_hash()
|
|
else:
|
|
interface_hash = None
|
|
|
|
serialised_paths = []
|
|
for destination_hash in tunnel_paths:
|
|
de = tunnel_paths[destination_hash]
|
|
|
|
timestamp = de[0]
|
|
received_from = de[1]
|
|
hops = de[2]
|
|
expires = de[3]
|
|
random_blobs = de[4][-Transport.PERSIST_RANDOM_BLOBS:]
|
|
packet_hash = de[6].get_hash()
|
|
|
|
serialised_entry = [
|
|
destination_hash,
|
|
timestamp,
|
|
received_from,
|
|
hops,
|
|
expires,
|
|
random_blobs,
|
|
interface_hash,
|
|
packet_hash
|
|
]
|
|
|
|
serialised_paths.append(serialised_entry)
|
|
|
|
Transport.cache(de[6], force_cache=True)
|
|
|
|
|
|
serialised_tunnel = [tunnel_id, interface_hash, serialised_paths, expires]
|
|
serialised_tunnels.append(serialised_tunnel)
|
|
|
|
tunnels_path = f"{RNS.Reticulum.storagepath}/tunnels"
|
|
file = open(tunnels_path, "wb")
|
|
file.write(umsgpack.packb(serialised_tunnels))
|
|
file.close()
|
|
|
|
save_time = time.time() - save_start
|
|
if save_time < 1:
|
|
time_str = f"{round(save_time * 1000, 2)}ms"
|
|
else:
|
|
time_str = f"{round(save_time, 2)}s"
|
|
RNS.log(f"Saved {len(serialised_tunnels)} tunnel table entries in {time_str}", RNS.LOG_DEBUG)
|
|
except Exception as e:
|
|
RNS.log(f"Could not save tunnel table to storage, the contained exception was: {e}", RNS.LOG_ERROR)
|
|
|
|
Transport.saving_tunnel_table = False
|
|
|
|
@staticmethod
|
|
def persist_data():
|
|
Transport.save_packet_hashlist()
|
|
Transport.save_path_table()
|
|
Transport.save_tunnel_table()
|
|
|
|
@staticmethod
|
|
def exit_handler():
|
|
if not Transport.owner.is_connected_to_shared_instance:
|
|
Transport.persist_data()
|