From fb5172ff10f61a8121e4245bf9edd280c0e8272d Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Wed, 25 May 2022 20:18:06 +0200 Subject: [PATCH] Improved I2P client tunnel error handling --- RNS/Interfaces/I2PInterface.py | 53 +++++++++++++++++++++++++++------- RNS/vendor/i2plib/tunnel.py | 22 +++++++++----- 2 files changed, 57 insertions(+), 18 deletions(-) diff --git a/RNS/Interfaces/I2PInterface.py b/RNS/Interfaces/I2PInterface.py index 78079b1..5b70fff 100644 --- a/RNS/Interfaces/I2PInterface.py +++ b/RNS/Interfaces/I2PInterface.py @@ -107,28 +107,52 @@ class I2PController: def client_tunnel(self, owner, i2p_destination): self.client_tunnels[i2p_destination] = False + self.refs = {} + while True: if not self.client_tunnels[i2p_destination]: try: async def tunnel_up(): RNS.log("Bringing up I2P tunnel to "+str(owner)+", this may take a while...", RNS.LOG_INFO) tunnel = self.i2plib.ClientTunnel(i2p_destination, owner.local_addr, sam_address=self.sam_address, loop=self.loop) + self.refs[i2p_destination] = tunnel await tunnel.run() - owner.awaiting_i2p_tunnel = False - RNS.log(str(owner)+ " tunnel setup complete", RNS.LOG_VERBOSE) - try: - self.loop.ext_owner = self - future = asyncio.run_coroutine_threadsafe(tunnel_up(), self.loop).result() - self.client_tunnels[i2p_destination] = True + self.loop.ext_owner = self + result = asyncio.run_coroutine_threadsafe(tunnel_up(), self.loop).result() + + if not i2p_destination in self.refs: + raise IOError("No tunnel control instance was created") - except Exception as e: - RNS.log("Error while setting up I2P tunnel: "+str(e)) - raise e + else: + tn = self.refs[i2p_destination] + if tn != None and hasattr(tn, "status"): + RNS.log("Waiting for status from I2P control process", RNS.LOG_EXTREME) + while not tn.status["setup_ran"]: + time.sleep(0.1) + RNS.log("Got status from I2P control process", RNS.LOG_EXTREME) + + if tn.status["setup_failed"]: + raise tn.status["exception"] + + else: + self.client_tunnels[i2p_destination] = True + owner.awaiting_i2p_tunnel = False + RNS.log(str(owner)+ " tunnel setup complete", RNS.LOG_VERBOSE) + + else: + raise IOError("Got no status response from SAM API") + + except ConnectionRefusedError as e: + raise e + + except ConnectionAbortedError as e: + raise e except Exception as e: - raise IOError("Could not connect to I2P SAM API while configuring to "+str(owner)+". Check that I2P is running and SAM is enabled.") + RNS.log("Unexpected error type from I2P SAM: "+str(e), RNS.LOG_ERROR) + raise e time.sleep(5) @@ -245,7 +269,14 @@ class I2PInterfacePeer(Interface): self.awaiting_i2p_tunnel = True def tunnel_job(): - self.parent_interface.i2p.client_tunnel(self, target_i2p_dest) + while self.awaiting_i2p_tunnel: + try: + self.parent_interface.i2p.client_tunnel(self, target_i2p_dest) + except Exception as e: + RNS.log("Error while while configuring "+str(self)+": "+str(e), RNS.LOG_ERROR) + RNS.log("Check that I2P is installed and running, and that SAM is enabled. Retrying tunnel setup later.", RNS.LOG_ERROR) + + time.sleep(15) thread = threading.Thread(target=tunnel_job) thread.setDaemon(True) diff --git a/RNS/vendor/i2plib/tunnel.py b/RNS/vendor/i2plib/tunnel.py index 96eb4c1..27af081 100644 --- a/RNS/vendor/i2plib/tunnel.py +++ b/RNS/vendor/i2plib/tunnel.py @@ -85,17 +85,25 @@ class ClientTunnel(I2PTunnel): """A coroutine used to run the tunnel""" await self._pre_run() + self.status = { "setup_ran": False, "setup_failed": False, "exception": None } async def handle_client(client_reader, client_writer): """Handle local client connection""" - remote_reader, remote_writer = await aiosam.stream_connect( - self.session_name, self.remote_destination, - sam_address=self.sam_address, loop=self.loop) - asyncio.ensure_future(proxy_data(remote_reader, client_writer), - loop=self.loop) - asyncio.ensure_future(proxy_data(client_reader, remote_writer), - loop=self.loop) + try: + remote_reader, remote_writer = await aiosam.stream_connect( + self.session_name, self.remote_destination, + sam_address=self.sam_address, loop=self.loop) + asyncio.ensure_future(proxy_data(remote_reader, client_writer), + loop=self.loop) + asyncio.ensure_future(proxy_data(client_reader, remote_writer), + loop=self.loop) + + except Exception as e: + self.status["setup_ran"] = True + self.status["setup_failed"] = True + self.status["exception"] = e self.server = await asyncio.start_server(handle_client, *self.local_address, loop=self.loop) + self.status["setup_ran"] = True def stop(self): super().stop()