Implemented link establishment on ultra low bandwidth links

This commit is contained in:
Mark Qvist 2023-10-27 18:16:52 +02:00
parent 0e12442a28
commit f01074e5b8
5 changed files with 95 additions and 6 deletions

View File

@ -112,9 +112,10 @@ class Link:
link = Link(owner = owner, peer_pub_bytes=data[:Link.ECPUBSIZE//2], peer_sig_pub_bytes=data[Link.ECPUBSIZE//2:Link.ECPUBSIZE]) link = Link(owner = owner, peer_pub_bytes=data[:Link.ECPUBSIZE//2], peer_sig_pub_bytes=data[Link.ECPUBSIZE//2:Link.ECPUBSIZE])
link.set_link_id(packet) link.set_link_id(packet)
link.destination = packet.destination link.destination = packet.destination
link.establishment_timeout = Link.ESTABLISHMENT_TIMEOUT_PER_HOP * max(1, packet.hops) link.establishment_timeout = Link.ESTABLISHMENT_TIMEOUT_PER_HOP * max(1, packet.hops) + Link.KEEPALIVE
link.establishment_cost += len(packet.raw) link.establishment_cost += len(packet.raw)
RNS.log("Validating link request "+RNS.prettyhexrep(link.link_id), RNS.LOG_VERBOSE) RNS.log("Validating link request "+RNS.prettyhexrep(link.link_id), RNS.LOG_VERBOSE)
RNS.log(f"Establishment timeout is {RNS.prettytime(link.establishment_timeout)} for incoming link request "+RNS.prettyhexrep(link.link_id), RNS.LOG_EXTREME)
link.handshake() link.handshake()
link.attached_interface = packet.receiving_interface link.attached_interface = packet.receiving_interface
link.prove() link.prove()
@ -175,7 +176,8 @@ class Link:
self.sig_prv = self.owner.identity.sig_prv self.sig_prv = self.owner.identity.sig_prv
else: else:
self.initiator = True self.initiator = True
self.establishment_timeout = Link.ESTABLISHMENT_TIMEOUT_PER_HOP * max(1, RNS.Transport.hops_to(destination.hash)) self.establishment_timeout = RNS.Reticulum.get_instance().get_first_hop_timeout(destination.hash)
self.establishment_timeout += Link.ESTABLISHMENT_TIMEOUT_PER_HOP * max(1, RNS.Transport.hops_to(destination.hash))
self.prv = X25519PrivateKey.generate() self.prv = X25519PrivateKey.generate()
self.sig_prv = Ed25519PrivateKey.generate() self.sig_prv = Ed25519PrivateKey.generate()
@ -211,6 +213,7 @@ class Link:
self.packet.send() self.packet.send()
self.had_outbound() self.had_outbound()
RNS.log("Link request "+RNS.prettyhexrep(self.link_id)+" sent to "+str(self.destination), RNS.LOG_DEBUG) RNS.log("Link request "+RNS.prettyhexrep(self.link_id)+" sent to "+str(self.destination), RNS.LOG_DEBUG)
RNS.log(f"Establishment timeout is {RNS.prettytime(self.establishment_timeout)} for link request "+RNS.prettyhexrep(self.link_id), RNS.LOG_EXTREME)
def load_peer(self, peer_pub_bytes, peer_sig_pub_bytes): def load_peer(self, peer_pub_bytes, peer_sig_pub_bytes):

View File

@ -371,8 +371,8 @@ class PacketReceipt:
if packet.destination.type == RNS.Destination.LINK: if packet.destination.type == RNS.Destination.LINK:
self.timeout = packet.destination.rtt * packet.destination.traffic_timeout_factor self.timeout = packet.destination.rtt * packet.destination.traffic_timeout_factor
else: else:
self.timeout = Packet.TIMEOUT_PER_HOP * RNS.Transport.hops_to(self.destination.hash) self.timeout = RNS.Reticulum.get_instance().get_first_hop_timeout(destination.hash)
self.timeout += Packet.TIMEOUT_PER_HOP * RNS.Transport.hops_to(self.destination.hash)
def get_status(self): def get_status(self):
""" """

View File

@ -117,7 +117,7 @@ class Reticulum:
# from interface speed, but a better general approach would most # from interface speed, but a better general approach would most
# probably be to let Reticulum somehow continously build a map of # probably be to let Reticulum somehow continously build a map of
# per-hop latencies and use this map for the timeout calculation. # per-hop latencies and use this map for the timeout calculation.
DEFAULT_PER_HOP_TIMEOUT = 6 DEFAULT_PER_HOP_TIMEOUT = 4
# Length of truncated hashes in bits. # Length of truncated hashes in bits.
TRUNCATED_HASHLENGTH = 128 TRUNCATED_HASHLENGTH = 128
@ -145,6 +145,8 @@ class Reticulum:
configpath = "" configpath = ""
storagepath = "" storagepath = ""
cachepath = "" cachepath = ""
__instance = None
@staticmethod @staticmethod
def exit_handler(): def exit_handler():
@ -168,6 +170,13 @@ class Reticulum:
RNS.exit() RNS.exit()
@staticmethod
def get_instance():
"""
Return the currently running Reticulum instance
"""
return Reticulum.__instance
def __init__(self,configdir=None, loglevel=None, logdest=None, verbosity=None): def __init__(self,configdir=None, loglevel=None, logdest=None, verbosity=None):
""" """
Initialises and starts a Reticulum instance. This must be Initialises and starts a Reticulum instance. This must be
@ -177,6 +186,11 @@ class Reticulum:
:param configdir: Full path to a Reticulum configuration directory. :param configdir: Full path to a Reticulum configuration directory.
""" """
if Reticulum.__instance != None:
raise OSError("Attempt to reinitialise Reticulum, when it was already running")
else:
Reticulum.__instance = self
RNS.vendor.platformutils.platform_checks() RNS.vendor.platformutils.platform_checks()
if configdir != None: if configdir != None:
@ -303,6 +317,11 @@ class Reticulum:
self.local_interface_port self.local_interface_port
) )
interface.OUT = True interface.OUT = True
if hasattr(Reticulum, "_force_shared_instance_bitrate"):
interface.bitrate = Reticulum._force_shared_instance_bitrate
interface._force_bitrate = True
RNS.log(f"Forcing shared instance bitrate of {RNS.prettyspeed(interface.bitrate)}ps", RNS.LOG_WARNING)
interface._force_bitrate = Reticulum._force_shared_instance_bitrate
RNS.Transport.interfaces.append(interface) RNS.Transport.interfaces.append(interface)
self.is_shared_instance = True self.is_shared_instance = True
@ -317,6 +336,10 @@ class Reticulum:
self.local_interface_port) self.local_interface_port)
interface.target_port = self.local_interface_port interface.target_port = self.local_interface_port
interface.OUT = True interface.OUT = True
if hasattr(Reticulum, "_force_shared_instance_bitrate"):
interface.bitrate = Reticulum._force_shared_instance_bitrate
interface._force_bitrate = True
RNS.log(f"Forcing shared instance bitrate of {RNS.prettyspeed(interface.bitrate)}ps", RNS.LOG_WARNING)
RNS.Transport.interfaces.append(interface) RNS.Transport.interfaces.append(interface)
self.is_shared_instance = False self.is_shared_instance = False
self.is_standalone_instance = False self.is_standalone_instance = False
@ -376,6 +399,9 @@ class Reticulum:
v = self.config["reticulum"].as_bool(option) v = self.config["reticulum"].as_bool(option)
if v == True: if v == True:
Reticulum.__allow_probes = True Reticulum.__allow_probes = True
if option == "force_shared_instance_bitrate":
v = self.config["reticulum"].as_int(option)
Reticulum._force_shared_instance_bitrate = v
if option == "panic_on_interface_error": if option == "panic_on_interface_error":
v = self.config["reticulum"].as_bool(option) v = self.config["reticulum"].as_bool(option)
if v == True: if v == True:
@ -1074,6 +1100,9 @@ class Reticulum:
if path == "next_hop": if path == "next_hop":
rpc_connection.send(self.get_next_hop(call["destination_hash"])) rpc_connection.send(self.get_next_hop(call["destination_hash"]))
if path == "first_hop_timeout":
rpc_connection.send(self.get_first_hop_timeout(call["destination_hash"]))
if path == "packet_rssi": if path == "packet_rssi":
rpc_connection.send(self.get_packet_rssi(call["packet_hash"])) rpc_connection.send(self.get_packet_rssi(call["packet_hash"]))
@ -1289,6 +1318,20 @@ class Reticulum:
else: else:
return str(RNS.Transport.next_hop_interface(destination)) return str(RNS.Transport.next_hop_interface(destination))
def get_first_hop_timeout(self, destination):
if self.is_connected_to_shared_instance:
try:
rpc_connection = multiprocessing.connection.Client(self.rpc_addr, authkey=self.rpc_key)
rpc_connection.send({"get": "first_hop_timeout", "destination_hash": destination})
response = rpc_connection.recv()
return response
except Exception as e:
RNS.log("An error occurred while getting first hop timeout from shared instance: "+str(e), RNS.LOG_ERROR)
return RNS.Reticulum.DEFAULT_PER_HOP_TIMEOUT
else:
return RNS.Transport.first_hop_timeout(destination)
def get_next_hop(self, destination): def get_next_hop(self, destination):
if self.is_connected_to_shared_instance: if self.is_connected_to_shared_instance:
rpc_connection = multiprocessing.connection.Client(self.rpc_addr, authkey=self.rpc_key) rpc_connection = multiprocessing.connection.Client(self.rpc_addr, authkey=self.rpc_key)

View File

@ -1197,7 +1197,8 @@ class Transport:
if packet.packet_type == RNS.Packet.LINKREQUEST: if packet.packet_type == RNS.Packet.LINKREQUEST:
now = time.time() now = time.time()
proof_timeout = now + RNS.Link.ESTABLISHMENT_TIMEOUT_PER_HOP * max(1, remaining_hops) proof_timeout = Transport.extra_link_proof_timeout(packet.receiving_interface)
proof_timeout += now + RNS.Link.ESTABLISHMENT_TIMEOUT_PER_HOP * max(1, remaining_hops)
# Entry format is # Entry format is
link_entry = [ now, # 0: Timestamp, link_entry = [ now, # 0: Timestamp,
@ -2026,6 +2027,45 @@ class Transport:
else: else:
return None return None
@staticmethod
def next_hop_interface_bitrate(destination_hash):
next_hop_interface = Transport.next_hop_interface(destination_hash)
if next_hop_interface != None:
return next_hop_interface.bitrate
else:
return None
@staticmethod
def next_hop_per_bit_latency(destination_hash):
next_hop_interface_bitrate = Transport.next_hop_interface_bitrate(destination_hash)
if next_hop_interface_bitrate != None:
return (1/next_hop_interface_bitrate)
else:
return None
@staticmethod
def next_hop_per_byte_latency(destination_hash):
per_byte_latency = Transport.next_hop_per_bit_latency(destination_hash)
if per_byte_latency != None:
return per_byte_latency*8
else:
return None
@staticmethod
def first_hop_timeout(destination_hash):
latency = Transport.next_hop_per_byte_latency(destination_hash)
if latency != None:
return RNS.Reticulum.MTU * latency
else:
return RNS.Reticulum.DEFAULT_PER_HOP_TIMEOUT
@staticmethod
def extra_link_proof_timeout(interface):
if interface != None:
return ((1/interface.bitrate)*8)*RNS.Reticulum.MTU
else:
return 0
@staticmethod @staticmethod
def expire_path(destination_hash): def expire_path(destination_hash):
if destination_hash in Transport.destination_table: if destination_hash in Transport.destination_table:

View File

@ -161,6 +161,9 @@ def prettyhexrep(data):
hexrep = "<"+delimiter.join("{:02x}".format(c) for c in data)+">" hexrep = "<"+delimiter.join("{:02x}".format(c) for c in data)+">"
return hexrep return hexrep
def prettyspeed(num, suffix="b"):
return prettysize(num/8, suffix=suffix)+"ps"
def prettysize(num, suffix='B'): def prettysize(num, suffix='B'):
units = ['','K','M','G','T','P','E','Z'] units = ['','K','M','G','T','P','E','Z']
last_unit = 'Y' last_unit = 'Y'