From 9e20ba2dacb28907655b135f391ad1250fa3667a Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Sat, 28 May 2022 02:24:01 +0200 Subject: [PATCH] Implemented I2PInterface recovery on I2P router restart --- RNS/Interfaces/I2PInterface.py | 122 +++++++++++++++++++++++++-------- RNS/vendor/i2plib/tunnel.py | 45 +++++++++--- 2 files changed, 129 insertions(+), 38 deletions(-) diff --git a/RNS/Interfaces/I2PInterface.py b/RNS/Interfaces/I2PInterface.py index f01ecd5..f72f848 100644 --- a/RNS/Interfaces/I2PInterface.py +++ b/RNS/Interfaces/I2PInterface.py @@ -72,6 +72,7 @@ class I2PController: self.client_tunnels = {} self.server_tunnels = {} + self.i2plib_tunnels = {} self.loop = None self.i2plib = i2plib self.utils = i2plib.utils @@ -118,8 +119,7 @@ class I2PController: def client_tunnel(self, owner, i2p_destination): self.client_tunnels[i2p_destination] = False - - self.refs = {} + self.i2plib_tunnels[i2p_destination] = None while True: if not self.client_tunnels[i2p_destination]: @@ -127,17 +127,17 @@ class I2PController: async def tunnel_up(): RNS.log("Bringing up I2P tunnel to "+str(owner)+", this may take a while...", RNS.LOG_INFO) tunnel = self.i2plib.ClientTunnel(i2p_destination, owner.local_addr, sam_address=self.sam_address, loop=self.loop) - self.refs[i2p_destination] = tunnel + self.i2plib_tunnels[i2p_destination] = tunnel await tunnel.run() self.loop.ext_owner = self result = asyncio.run_coroutine_threadsafe(tunnel_up(), self.loop).result() - if not i2p_destination in self.refs: + if not i2p_destination in self.i2plib_tunnels: raise IOError("No tunnel control instance was created") else: - tn = self.refs[i2p_destination] + tn = self.i2plib_tunnels[i2p_destination] if tn != None and hasattr(tn, "status"): RNS.log("Waiting for status from I2P control process", RNS.LOG_EXTREME) @@ -151,7 +151,20 @@ class I2PController: else: self.client_tunnels[i2p_destination] = True owner.awaiting_i2p_tunnel = False - RNS.log(str(owner)+ " tunnel setup complete", RNS.LOG_VERBOSE) + if owner.socket != None: + if hasattr(owner.socket, "close"): + if callable(owner.socket.close): + try: + owner.socket.shutdown(socket.SHUT_RDWR) + except Exception as e: + RNS.log("Error while shutting down socket for "+str(owner)+": "+str(e)) + + try: + owner.socket.close() + except Exception as e: + RNS.log("Error while closing socket for "+str(owner)+": "+str(e)) + + RNS.log(str(owner)+" tunnel setup complete", RNS.LOG_VERBOSE) else: raise IOError("Got no status response from SAM API") @@ -166,6 +179,30 @@ class I2PController: RNS.log("Unexpected error type from I2P SAM: "+str(e), RNS.LOG_ERROR) raise e + else: + i2ptunnel = self.i2plib_tunnels[i2p_destination] + if hasattr(i2ptunnel, "status"): + # TODO: Remove + # RNS.log(str(i2ptunnel.status)) + i2p_exception = i2ptunnel.status["exception"] + + if i2ptunnel.status["setup_ran"] == False: + RNS.log(str(self)+" I2P tunnel setup did not complete", RNS.LOG_ERROR) + return False + + elif i2p_exception != None: + RNS.log(str(self)+" An error ocurred while setting up I2P tunnel. The contained exception was: "+str(i2p_exception), RNS.LOG_ERROR) + RNS.log("Resetting I2P tunnel", RNS.LOG_ERROR) + return False + + elif i2ptunnel.status["setup_failed"] == True: + RNS.log(str(self)+" Unspecified I2P tunnel setup error, resetting I2P tunnel", RNS.LOG_ERROR) + return False + + else: + RNS.log(str(self)+" Got no status from SAM API, resetting I2P tunnel", RNS.LOG_ERROR) + return False + time.sleep(5) @@ -204,21 +241,48 @@ class I2PController: owner.b32 = i2p_b32 self.server_tunnels[i2p_b32] = False + self.i2plib_tunnels[i2p_b32] = None - while self.server_tunnels[i2p_b32] == False: - try: - async def tunnel_up(): - RNS.log(str(owner)+" Bringing up I2P endpoint, this may take a while...", RNS.LOG_INFO) - tunnel = self.i2plib.ServerTunnel((owner.bind_ip, owner.bind_port), loop=self.loop, destination=i2p_dest, sam_address=self.sam_address) - await tunnel.run() - RNS.log(str(owner)+ " endpoint setup complete. Now reachable at: "+str(i2p_dest.base32)+".b32.i2p", RNS.LOG_VERBOSE) + while True: + if self.server_tunnels[i2p_b32] == False: + try: + async def tunnel_up(): + RNS.log(str(owner)+" Bringing up I2P endpoint, this may take a while...", RNS.LOG_INFO) + tunnel = self.i2plib.ServerTunnel((owner.bind_ip, owner.bind_port), loop=self.loop, destination=i2p_dest, sam_address=self.sam_address) + self.i2plib_tunnels[i2p_b32] = tunnel + await tunnel.run() + RNS.log(str(owner)+ " endpoint setup complete. Now reachable at: "+str(i2p_dest.base32)+".b32.i2p", RNS.LOG_VERBOSE) - asyncio.run_coroutine_threadsafe(tunnel_up(), self.loop).result() - self.server_tunnels[i2p_b32] = True - return True + asyncio.run_coroutine_threadsafe(tunnel_up(), self.loop).result() + self.server_tunnels[i2p_b32] = True - except Exception as e: - raise e + except Exception as e: + raise e + + else: + + i2ptunnel = self.i2plib_tunnels[i2p_b32] + if hasattr(i2ptunnel, "status"): + # TODO: Remove + # RNS.log(str(i2ptunnel.status)) + i2p_exception = i2ptunnel.status["exception"] + + if i2ptunnel.status["setup_ran"] == False: + RNS.log(str(self)+" I2P tunnel setup did not complete", RNS.LOG_ERROR) + return False + + elif i2p_exception != None: + RNS.log(str(self)+" An error ocurred while setting up I2P tunnel. The contained exception was: "+str(i2p_exception), RNS.LOG_ERROR) + RNS.log("Resetting I2P tunnel", RNS.LOG_ERROR) + return False + + elif i2ptunnel.status["setup_failed"] == True: + RNS.log(str(self)+" Unspecified I2P tunnel setup error, resetting I2P tunnel", RNS.LOG_ERROR) + return False + + else: + RNS.log(str(self)+" Got no status from SAM API, resetting I2P tunnel", RNS.LOG_ERROR) + return False time.sleep(5) @@ -288,17 +352,21 @@ class I2PInterfacePeer(Interface): self.initiator = True self.bind_ip = "127.0.0.1" - self.bind_port = self.parent_interface.i2p.get_free_port() - self.local_addr = (self.bind_ip, self.bind_port) - self.target_ip = self.bind_ip - self.target_port = self.bind_port self.awaiting_i2p_tunnel = True def tunnel_job(): while self.awaiting_i2p_tunnel: try: - self.parent_interface.i2p.client_tunnel(self, target_i2p_dest) + self.bind_port = self.parent_interface.i2p.get_free_port() + self.local_addr = (self.bind_ip, self.bind_port) + self.target_ip = self.bind_ip + self.target_port = self.bind_port + + if not self.parent_interface.i2p.client_tunnel(self, target_i2p_dest): + RNS.log(str(self)+" I2P control process experienced an error, requesting new tunnel...", RNS.LOG_ERROR) + self.awaiting_i2p_tunnel = True + except Exception as e: RNS.log("Error while while configuring "+str(self)+": "+str(e), RNS.LOG_ERROR) RNS.log("Check that I2P is installed and running, and that SAM is enabled. Retrying tunnel setup later.", RNS.LOG_ERROR) @@ -646,11 +714,11 @@ class I2PInterface(Interface): if self.connectable: def tunnel_job(): - tunnel_ready = False - while not tunnel_ready: + while True: try: - tunnel_ready = self.i2p.server_tunnel(self) - self.online = True + if not self.i2p.server_tunnel(self): + RNS.log(str(self)+" I2P control process experienced an error, requesting new tunnel...", RNS.LOG_ERROR) + self.online = False except Exception as e: RNS.log("Error while while configuring "+str(self)+": "+str(e), RNS.LOG_ERROR) diff --git a/RNS/vendor/i2plib/tunnel.py b/RNS/vendor/i2plib/tunnel.py index b42b99e..68da9a8 100644 --- a/RNS/vendor/i2plib/tunnel.py +++ b/RNS/vendor/i2plib/tunnel.py @@ -85,25 +85,35 @@ class ClientTunnel(I2PTunnel): """A coroutine used to run the tunnel""" await self._pre_run() - self.status = { "setup_ran": False, "setup_failed": False, "exception": None } + self.status = { "setup_ran": False, "setup_failed": False, "exception": None, "connect_tasks": [] } async def handle_client(client_reader, client_writer): """Handle local client connection""" try: - remote_reader, remote_writer = await aiosam.stream_connect( + sc_task = aiosam.stream_connect( self.session_name, self.remote_destination, sam_address=self.sam_address, loop=self.loop) + self.status["connect_tasks"].append(sc_task) + + remote_reader, remote_writer = await sc_task asyncio.ensure_future(proxy_data(remote_reader, client_writer), loop=self.loop) asyncio.ensure_future(proxy_data(client_reader, remote_writer), loop=self.loop) - + except Exception as e: self.status["setup_ran"] = True self.status["setup_failed"] = True self.status["exception"] = e - self.server = await asyncio.start_server(handle_client, *self.local_address) - self.status["setup_ran"] = True + + try: + self.server = await asyncio.start_server(handle_client, *self.local_address) + self.status["setup_ran"] = True + + except Exception as e: + self.status["setup_ran"] = True + self.status["setup_failed"] = True + self.status["exception"] = e def stop(self): super().stop() @@ -125,26 +135,38 @@ class ServerTunnel(I2PTunnel): """A coroutine used to run the tunnel""" await self._pre_run() + self.status = { "setup_ran": False, "setup_failed": False, "exception": None, "connect_tasks": [] } async def handle_client(incoming, client_reader, client_writer): - # data and dest may come in one chunk - dest, data = incoming.split(b"\n", 1) - remote_destination = sam.Destination(dest.decode()) - logger.debug("{} client connected: {}.b32.i2p".format( - self.session_name, remote_destination.base32)) + try: + # data and dest may come in one chunk + dest, data = incoming.split(b"\n", 1) + remote_destination = sam.Destination(dest.decode()) + logger.debug("{} client connected: {}.b32.i2p".format( + self.session_name, remote_destination.base32)) + + except Exception as e: + self.status["exception"] = e + self.status["setup_failed"] = True try: - remote_reader, remote_writer = await asyncio.wait_for( + sc_task = asyncio.wait_for( asyncio.open_connection( host=self.local_address[0], port=self.local_address[1]), timeout=5) + self.status["connect_tasks"].append(sc_task) + + remote_reader, remote_writer = await sc_task if data: remote_writer.write(data) asyncio.ensure_future(proxy_data(remote_reader, client_writer), loop=self.loop) asyncio.ensure_future(proxy_data(client_reader, remote_writer), loop=self.loop) + except ConnectionRefusedError: client_writer.close() + self.status["exception"] = e + self.status["setup_failed"] = True async def server_loop(): try: @@ -159,6 +181,7 @@ class ServerTunnel(I2PTunnel): pass self.server_loop = asyncio.ensure_future(server_loop(), loop=self.loop) + self.status["setup_ran"] = True def stop(self): super().stop()