From 1d2a0fe4c80bc1c6644a10f74461266312aa0c92 Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Thu, 3 Nov 2022 15:22:34 +0100 Subject: [PATCH] Improved I2P tunnel state detection. Fixed missing IFAC init on spawned I2P interfaces. --- RNS/Interfaces/I2PInterface.py | 114 +++++++++++++++++++++++++++------ RNS/Reticulum.py | 10 +-- 2 files changed, 99 insertions(+), 25 deletions(-) diff --git a/RNS/Interfaces/I2PInterface.py b/RNS/Interfaces/I2PInterface.py index 564ea9f..92a6c57 100644 --- a/RNS/Interfaces/I2PInterface.py +++ b/RNS/Interfaces/I2PInterface.py @@ -383,6 +383,7 @@ class I2PInterfacePeer(Interface): I2P_PROBE_AFTER = 10 I2P_PROBE_INTERVAL = 9 I2P_PROBES = 5 + I2P_READ_TIMEOUT = I2P_PROBE_INTERVAL * I2P_PROBES + I2P_PROBE_AFTER def __init__(self, parent_interface, owner, name, target_i2p_dest=None, connected_socket=None, max_reconnect_tries=None): self.rxb = 0 @@ -409,6 +410,29 @@ class I2PInterfacePeer(Interface): self.i2p_tunnel_ready = False self.mode = RNS.Interfaces.Interface.Interface.MODE_FULL self.bitrate = I2PInterface.BITRATE_GUESS + self.last_read = 0 + self.last_write = 0 + self.wd_reset = False + + self.ifac_size = self.parent_interface.ifac_size + self.ifac_netname = self.parent_interface.ifac_netname + self.ifac_netkey = self.parent_interface.ifac_netkey + if self.ifac_netname != None or self.ifac_netkey != None: + ifac_origin = b"" + if self.ifac_netname != None: + ifac_origin += RNS.Identity.full_hash(self.ifac_netname.encode("utf-8")) + if self.ifac_netkey != None: + ifac_origin += RNS.Identity.full_hash(self.ifac_netkey.encode("utf-8")) + + ifac_origin_hash = RNS.Identity.full_hash(ifac_origin) + self.ifac_key = RNS.Cryptography.hkdf( + length=64, + derive_from=ifac_origin_hash, + salt=RNS.Reticulum.IFAC_SALT, + context=None + ) + self.ifac_identity = RNS.Identity.from_bytes(self.ifac_key) + self.ifac_signature = self.ifac_identity.sign(RNS.Identity.full_hash(self.ifac_key)) self.announce_rate_target = None self.announce_rate_grace = None @@ -482,18 +506,11 @@ class I2PInterfacePeer(Interface): def set_timeouts_linux(self): - if not self.i2p_tunneled: - self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_USER_TIMEOUT, int(I2PInterfacePeer.TCP_USER_TIMEOUT * 1000)) - self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) - self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, int(I2PInterfacePeer.TCP_PROBE_AFTER)) - self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, int(I2PInterfacePeer.TCP_PROBE_INTERVAL)) - self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, int(I2PInterfacePeer.TCP_PROBES)) - else: - self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_USER_TIMEOUT, int(I2PInterfacePeer.I2P_USER_TIMEOUT * 1000)) - self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) - self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, int(I2PInterfacePeer.I2P_PROBE_AFTER)) - self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, int(I2PInterfacePeer.I2P_PROBE_INTERVAL)) - self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, int(I2PInterfacePeer.I2P_PROBES)) + self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_USER_TIMEOUT, int(I2PInterfacePeer.I2P_USER_TIMEOUT * 1000)) + self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) + self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, int(I2PInterfacePeer.I2P_PROBE_AFTER)) + self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, int(I2PInterfacePeer.I2P_PROBE_INTERVAL)) + self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, int(I2PInterfacePeer.I2P_PROBES)) def set_timeouts_osx(self): if hasattr(socket, "TCP_KEEPALIVE"): @@ -502,11 +519,7 @@ class I2PInterfacePeer(Interface): TCP_KEEPIDLE = 0x10 self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) - - if not self.i2p_tunneled: - self.socket.setsockopt(socket.IPPROTO_TCP, TCP_KEEPIDLE, int(I2PInterfacePeer.TCP_PROBE_AFTER)) - else: - self.socket.setsockopt(socket.IPPROTO_TCP, TCP_KEEPIDLE, int(I2PInterfacePeer.I2P_PROBE_AFTER)) + self.socket.setsockopt(socket.IPPROTO_TCP, TCP_KEEPIDLE, int(I2PInterfacePeer.I2P_PROBE_AFTER)) def shutdown_socket(self, socket): if callable(socket.close): @@ -571,7 +584,6 @@ class I2PInterfacePeer(Interface): return True - def reconnect(self): if self.initiator: if not self.reconnecting: @@ -632,6 +644,7 @@ class I2PInterfacePeer(Interface): self.socket.sendall(data) self.writing = False self.txb += len(data) + self.last_write = time.time() if hasattr(self, "parent_interface") and self.parent_interface != None and self.parent_count: self.parent_interface.txb += len(data) @@ -642,8 +655,42 @@ class I2PInterfacePeer(Interface): self.teardown() + def read_watchdog(self): + while self.wd_reset: + time.sleep(0.25) + + should_run = True + try: + while should_run and not self.wd_reset: + if (time.time()-self.last_write > I2PInterfacePeer.I2P_PROBE_AFTER*0.66): + self.processOutgoing(bytes([0x00])) + + if (time.time()-self.last_read > I2PInterfacePeer.I2P_READ_TIMEOUT): + RNS.log("I2P socket seems dead, restarting...", RNS.LOG_DEBUG) + if self.socket != None: + try: + self.socket.shutdown(socket.SHUT_RDWR) + except Exception as e: + RNS.log("Error while shutting down socket for "+str(self)+": "+str(e)) + + try: + self.socket.close() + except Exception as e: + RNS.log("Error while closing socket for "+str(self)+": "+str(e)) + + should_run = False + + time.sleep(1) + finally: + self.wd_reset = False + def read_loop(self): try: + self.last_read = time.time() + self.last_write = time.time() + + wd_thread = threading.Thread(target=self.read_watchdog, daemon=True).start() + in_frame = False escape = False data_buffer = b"" @@ -653,6 +700,7 @@ class I2PInterfacePeer(Interface): data_in = self.socket.recv(4096) if len(data_in) > 0: pointer = 0 + last_read = time.time() while pointer < len(data_in): byte = data_in[pointer] pointer += 1 @@ -661,7 +709,8 @@ class I2PInterfacePeer(Interface): # Read loop for KISS framing if (in_frame and byte == KISS.FEND and command == KISS.CMD_DATA): in_frame = False - self.processIncoming(data_buffer) + if len(data_buffer > RNS.Reticulum.HEADER_MINSIZE+1): + self.processIncoming(data_buffer) elif (byte == KISS.FEND): in_frame = True command = KISS.CMD_UNKNOWN @@ -688,7 +737,8 @@ class I2PInterfacePeer(Interface): # Read loop for HDLC framing if (in_frame and byte == HDLC.FLAG): in_frame = False - self.processIncoming(data_buffer) + if len(data_buffer > RNS.Reticulum.HEADER_MINSIZE+1): + self.processIncoming(data_buffer) elif (byte == HDLC.FLAG): in_frame = True data_buffer = b"" @@ -704,6 +754,7 @@ class I2PInterfacePeer(Interface): escape = False data_buffer = data_buffer+bytes([byte]) else: + self.wd_reset = True self.online = False if self.initiator and not self.detached: RNS.log("Socket for "+str(self)+" was closed, attempting to reconnect...", RNS.LOG_WARNING) @@ -754,7 +805,7 @@ class I2PInterfacePeer(Interface): class I2PInterface(Interface): BITRATE_GUESS = 256*1000 - def __init__(self, owner, name, rns_storagepath, peers, connectable = False): + def __init__(self, owner, name, rns_storagepath, peers, connectable = False, ifac_size = 16, ifac_netname = None, ifac_netkey = None): self.rxb = 0 self.txb = 0 @@ -780,6 +831,9 @@ class I2PInterface(Interface): self.bind_port = self.i2p.get_free_port() self.address = (self.bind_ip, self.bind_port) self.bitrate = I2PInterface.BITRATE_GUESS + self.ifac_size = ifac_size + self.ifac_netname = ifac_netname + self.ifac_netkey = ifac_netkey self.online = False @@ -850,9 +904,27 @@ class I2PInterface(Interface): spawned_interface.parent_interface = self spawned_interface.online = True spawned_interface.bitrate = self.bitrate + spawned_interface.ifac_size = self.ifac_size spawned_interface.ifac_netname = self.ifac_netname spawned_interface.ifac_netkey = self.ifac_netkey + if spawned_interface.ifac_netname != None or spawned_interface.ifac_netkey != None: + ifac_origin = b"" + if spawned_interface.ifac_netname != None: + ifac_origin += RNS.Identity.full_hash(spawned_interface.ifac_netname.encode("utf-8")) + if spawned_interface.ifac_netkey != None: + ifac_origin += RNS.Identity.full_hash(spawned_interface.ifac_netkey.encode("utf-8")) + + ifac_origin_hash = RNS.Identity.full_hash(ifac_origin) + spawned_interface.ifac_key = RNS.Cryptography.hkdf( + length=64, + derive_from=ifac_origin_hash, + salt=RNS.Reticulum.IFAC_SALT, + context=None + ) + spawned_interface.ifac_identity = RNS.Identity.from_bytes(spawned_interface.ifac_key) + spawned_interface.ifac_signature = spawned_interface.ifac_identity.sign(RNS.Identity.full_hash(spawned_interface.ifac_key)) + spawned_interface.announce_rate_target = self.announce_rate_target spawned_interface.announce_rate_grace = self.announce_rate_grace spawned_interface.announce_rate_penalty = self.announce_rate_penalty diff --git a/RNS/Reticulum.py b/RNS/Reticulum.py index 37000d7..f804431 100755 --- a/RNS/Reticulum.py +++ b/RNS/Reticulum.py @@ -631,12 +631,18 @@ class Reticulum: i2p_peers = c.as_list("peers") if "peers" in c else None connectable = c.as_bool("connectable") if "connectable" in c else False + if ifac_size == None: + ifac_size = 16 + interface = I2PInterface.I2PInterface( RNS.Transport, name, Reticulum.storagepath, i2p_peers, connectable = connectable, + ifac_size = ifac_size, + ifac_netname = ifac_netname, + ifac_netkey = ifac_netkey, ) if "outgoing" in c and c.as_bool("outgoing") == False: @@ -653,10 +659,6 @@ class Reticulum: interface.announce_cap = announce_cap if configured_bitrate: interface.bitrate = configured_bitrate - if ifac_size != None: - interface.ifac_size = ifac_size - else: - interface.ifac_size = 16 if c["type"] == "SerialInterface": port = c["port"] if "port" in c else None