Improved I2P client tunnel error handling

This commit is contained in:
Mark Qvist 2022-05-25 20:18:06 +02:00
parent 24d6de8490
commit fb5172ff10
2 changed files with 57 additions and 18 deletions

View File

@ -107,28 +107,52 @@ class I2PController:
def client_tunnel(self, owner, i2p_destination): def client_tunnel(self, owner, i2p_destination):
self.client_tunnels[i2p_destination] = False self.client_tunnels[i2p_destination] = False
self.refs = {}
while True: while True:
if not self.client_tunnels[i2p_destination]: if not self.client_tunnels[i2p_destination]:
try: try:
async def tunnel_up(): async def tunnel_up():
RNS.log("Bringing up I2P tunnel to "+str(owner)+", this may take a while...", RNS.LOG_INFO) RNS.log("Bringing up I2P tunnel to "+str(owner)+", this may take a while...", RNS.LOG_INFO)
tunnel = self.i2plib.ClientTunnel(i2p_destination, owner.local_addr, sam_address=self.sam_address, loop=self.loop) tunnel = self.i2plib.ClientTunnel(i2p_destination, owner.local_addr, sam_address=self.sam_address, loop=self.loop)
self.refs[i2p_destination] = tunnel
await tunnel.run() await tunnel.run()
owner.awaiting_i2p_tunnel = False
RNS.log(str(owner)+ " tunnel setup complete", RNS.LOG_VERBOSE)
try: self.loop.ext_owner = self
self.loop.ext_owner = self result = asyncio.run_coroutine_threadsafe(tunnel_up(), self.loop).result()
future = asyncio.run_coroutine_threadsafe(tunnel_up(), self.loop).result()
self.client_tunnels[i2p_destination] = True
except Exception as e: if not i2p_destination in self.refs:
RNS.log("Error while setting up I2P tunnel: "+str(e)) raise IOError("No tunnel control instance was created")
raise e
else:
tn = self.refs[i2p_destination]
if tn != None and hasattr(tn, "status"):
RNS.log("Waiting for status from I2P control process", RNS.LOG_EXTREME)
while not tn.status["setup_ran"]:
time.sleep(0.1)
RNS.log("Got status from I2P control process", RNS.LOG_EXTREME)
if tn.status["setup_failed"]:
raise tn.status["exception"]
else:
self.client_tunnels[i2p_destination] = True
owner.awaiting_i2p_tunnel = False
RNS.log(str(owner)+ " tunnel setup complete", RNS.LOG_VERBOSE)
else:
raise IOError("Got no status response from SAM API")
except ConnectionRefusedError as e:
raise e
except ConnectionAbortedError as e:
raise e
except Exception as e: except Exception as e:
raise IOError("Could not connect to I2P SAM API while configuring to "+str(owner)+". Check that I2P is running and SAM is enabled.") RNS.log("Unexpected error type from I2P SAM: "+str(e), RNS.LOG_ERROR)
raise e
time.sleep(5) time.sleep(5)
@ -245,7 +269,14 @@ class I2PInterfacePeer(Interface):
self.awaiting_i2p_tunnel = True self.awaiting_i2p_tunnel = True
def tunnel_job(): def tunnel_job():
self.parent_interface.i2p.client_tunnel(self, target_i2p_dest) while self.awaiting_i2p_tunnel:
try:
self.parent_interface.i2p.client_tunnel(self, target_i2p_dest)
except Exception as e:
RNS.log("Error while while configuring "+str(self)+": "+str(e), RNS.LOG_ERROR)
RNS.log("Check that I2P is installed and running, and that SAM is enabled. Retrying tunnel setup later.", RNS.LOG_ERROR)
time.sleep(15)
thread = threading.Thread(target=tunnel_job) thread = threading.Thread(target=tunnel_job)
thread.setDaemon(True) thread.setDaemon(True)

View File

@ -85,17 +85,25 @@ class ClientTunnel(I2PTunnel):
"""A coroutine used to run the tunnel""" """A coroutine used to run the tunnel"""
await self._pre_run() await self._pre_run()
self.status = { "setup_ran": False, "setup_failed": False, "exception": None }
async def handle_client(client_reader, client_writer): async def handle_client(client_reader, client_writer):
"""Handle local client connection""" """Handle local client connection"""
remote_reader, remote_writer = await aiosam.stream_connect( try:
self.session_name, self.remote_destination, remote_reader, remote_writer = await aiosam.stream_connect(
sam_address=self.sam_address, loop=self.loop) self.session_name, self.remote_destination,
asyncio.ensure_future(proxy_data(remote_reader, client_writer), sam_address=self.sam_address, loop=self.loop)
loop=self.loop) asyncio.ensure_future(proxy_data(remote_reader, client_writer),
asyncio.ensure_future(proxy_data(client_reader, remote_writer), loop=self.loop)
loop=self.loop) asyncio.ensure_future(proxy_data(client_reader, remote_writer),
loop=self.loop)
except Exception as e:
self.status["setup_ran"] = True
self.status["setup_failed"] = True
self.status["exception"] = e
self.server = await asyncio.start_server(handle_client, *self.local_address, loop=self.loop) self.server = await asyncio.start_server(handle_client, *self.local_address, loop=self.loop)
self.status["setup_ran"] = True
def stop(self): def stop(self):
super().stop() super().stop()