mirror of
https://github.com/markqvist/Reticulum.git
synced 2024-11-26 15:30:18 +00:00
Implemented I2PInterface recovery on I2P router restart
This commit is contained in:
parent
49ed335e19
commit
9e20ba2dac
@ -72,6 +72,7 @@ class I2PController:
|
|||||||
|
|
||||||
self.client_tunnels = {}
|
self.client_tunnels = {}
|
||||||
self.server_tunnels = {}
|
self.server_tunnels = {}
|
||||||
|
self.i2plib_tunnels = {}
|
||||||
self.loop = None
|
self.loop = None
|
||||||
self.i2plib = i2plib
|
self.i2plib = i2plib
|
||||||
self.utils = i2plib.utils
|
self.utils = i2plib.utils
|
||||||
@ -118,8 +119,7 @@ class I2PController:
|
|||||||
|
|
||||||
def client_tunnel(self, owner, i2p_destination):
|
def client_tunnel(self, owner, i2p_destination):
|
||||||
self.client_tunnels[i2p_destination] = False
|
self.client_tunnels[i2p_destination] = False
|
||||||
|
self.i2plib_tunnels[i2p_destination] = None
|
||||||
self.refs = {}
|
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
if not self.client_tunnels[i2p_destination]:
|
if not self.client_tunnels[i2p_destination]:
|
||||||
@ -127,17 +127,17 @@ class I2PController:
|
|||||||
async def tunnel_up():
|
async def tunnel_up():
|
||||||
RNS.log("Bringing up I2P tunnel to "+str(owner)+", this may take a while...", RNS.LOG_INFO)
|
RNS.log("Bringing up I2P tunnel to "+str(owner)+", this may take a while...", RNS.LOG_INFO)
|
||||||
tunnel = self.i2plib.ClientTunnel(i2p_destination, owner.local_addr, sam_address=self.sam_address, loop=self.loop)
|
tunnel = self.i2plib.ClientTunnel(i2p_destination, owner.local_addr, sam_address=self.sam_address, loop=self.loop)
|
||||||
self.refs[i2p_destination] = tunnel
|
self.i2plib_tunnels[i2p_destination] = tunnel
|
||||||
await tunnel.run()
|
await tunnel.run()
|
||||||
|
|
||||||
self.loop.ext_owner = self
|
self.loop.ext_owner = self
|
||||||
result = asyncio.run_coroutine_threadsafe(tunnel_up(), self.loop).result()
|
result = asyncio.run_coroutine_threadsafe(tunnel_up(), self.loop).result()
|
||||||
|
|
||||||
if not i2p_destination in self.refs:
|
if not i2p_destination in self.i2plib_tunnels:
|
||||||
raise IOError("No tunnel control instance was created")
|
raise IOError("No tunnel control instance was created")
|
||||||
|
|
||||||
else:
|
else:
|
||||||
tn = self.refs[i2p_destination]
|
tn = self.i2plib_tunnels[i2p_destination]
|
||||||
if tn != None and hasattr(tn, "status"):
|
if tn != None and hasattr(tn, "status"):
|
||||||
|
|
||||||
RNS.log("Waiting for status from I2P control process", RNS.LOG_EXTREME)
|
RNS.log("Waiting for status from I2P control process", RNS.LOG_EXTREME)
|
||||||
@ -151,6 +151,19 @@ class I2PController:
|
|||||||
else:
|
else:
|
||||||
self.client_tunnels[i2p_destination] = True
|
self.client_tunnels[i2p_destination] = True
|
||||||
owner.awaiting_i2p_tunnel = False
|
owner.awaiting_i2p_tunnel = False
|
||||||
|
if owner.socket != None:
|
||||||
|
if hasattr(owner.socket, "close"):
|
||||||
|
if callable(owner.socket.close):
|
||||||
|
try:
|
||||||
|
owner.socket.shutdown(socket.SHUT_RDWR)
|
||||||
|
except Exception as e:
|
||||||
|
RNS.log("Error while shutting down socket for "+str(owner)+": "+str(e))
|
||||||
|
|
||||||
|
try:
|
||||||
|
owner.socket.close()
|
||||||
|
except Exception as e:
|
||||||
|
RNS.log("Error while closing socket for "+str(owner)+": "+str(e))
|
||||||
|
|
||||||
RNS.log(str(owner)+" tunnel setup complete", RNS.LOG_VERBOSE)
|
RNS.log(str(owner)+" tunnel setup complete", RNS.LOG_VERBOSE)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
@ -166,6 +179,30 @@ class I2PController:
|
|||||||
RNS.log("Unexpected error type from I2P SAM: "+str(e), RNS.LOG_ERROR)
|
RNS.log("Unexpected error type from I2P SAM: "+str(e), RNS.LOG_ERROR)
|
||||||
raise e
|
raise e
|
||||||
|
|
||||||
|
else:
|
||||||
|
i2ptunnel = self.i2plib_tunnels[i2p_destination]
|
||||||
|
if hasattr(i2ptunnel, "status"):
|
||||||
|
# TODO: Remove
|
||||||
|
# RNS.log(str(i2ptunnel.status))
|
||||||
|
i2p_exception = i2ptunnel.status["exception"]
|
||||||
|
|
||||||
|
if i2ptunnel.status["setup_ran"] == False:
|
||||||
|
RNS.log(str(self)+" I2P tunnel setup did not complete", RNS.LOG_ERROR)
|
||||||
|
return False
|
||||||
|
|
||||||
|
elif i2p_exception != None:
|
||||||
|
RNS.log(str(self)+" An error ocurred while setting up I2P tunnel. The contained exception was: "+str(i2p_exception), RNS.LOG_ERROR)
|
||||||
|
RNS.log("Resetting I2P tunnel", RNS.LOG_ERROR)
|
||||||
|
return False
|
||||||
|
|
||||||
|
elif i2ptunnel.status["setup_failed"] == True:
|
||||||
|
RNS.log(str(self)+" Unspecified I2P tunnel setup error, resetting I2P tunnel", RNS.LOG_ERROR)
|
||||||
|
return False
|
||||||
|
|
||||||
|
else:
|
||||||
|
RNS.log(str(self)+" Got no status from SAM API, resetting I2P tunnel", RNS.LOG_ERROR)
|
||||||
|
return False
|
||||||
|
|
||||||
time.sleep(5)
|
time.sleep(5)
|
||||||
|
|
||||||
|
|
||||||
@ -204,22 +241,49 @@ class I2PController:
|
|||||||
owner.b32 = i2p_b32
|
owner.b32 = i2p_b32
|
||||||
|
|
||||||
self.server_tunnels[i2p_b32] = False
|
self.server_tunnels[i2p_b32] = False
|
||||||
|
self.i2plib_tunnels[i2p_b32] = None
|
||||||
|
|
||||||
while self.server_tunnels[i2p_b32] == False:
|
while True:
|
||||||
|
if self.server_tunnels[i2p_b32] == False:
|
||||||
try:
|
try:
|
||||||
async def tunnel_up():
|
async def tunnel_up():
|
||||||
RNS.log(str(owner)+" Bringing up I2P endpoint, this may take a while...", RNS.LOG_INFO)
|
RNS.log(str(owner)+" Bringing up I2P endpoint, this may take a while...", RNS.LOG_INFO)
|
||||||
tunnel = self.i2plib.ServerTunnel((owner.bind_ip, owner.bind_port), loop=self.loop, destination=i2p_dest, sam_address=self.sam_address)
|
tunnel = self.i2plib.ServerTunnel((owner.bind_ip, owner.bind_port), loop=self.loop, destination=i2p_dest, sam_address=self.sam_address)
|
||||||
|
self.i2plib_tunnels[i2p_b32] = tunnel
|
||||||
await tunnel.run()
|
await tunnel.run()
|
||||||
RNS.log(str(owner)+ " endpoint setup complete. Now reachable at: "+str(i2p_dest.base32)+".b32.i2p", RNS.LOG_VERBOSE)
|
RNS.log(str(owner)+ " endpoint setup complete. Now reachable at: "+str(i2p_dest.base32)+".b32.i2p", RNS.LOG_VERBOSE)
|
||||||
|
|
||||||
asyncio.run_coroutine_threadsafe(tunnel_up(), self.loop).result()
|
asyncio.run_coroutine_threadsafe(tunnel_up(), self.loop).result()
|
||||||
self.server_tunnels[i2p_b32] = True
|
self.server_tunnels[i2p_b32] = True
|
||||||
return True
|
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
raise e
|
raise e
|
||||||
|
|
||||||
|
else:
|
||||||
|
|
||||||
|
i2ptunnel = self.i2plib_tunnels[i2p_b32]
|
||||||
|
if hasattr(i2ptunnel, "status"):
|
||||||
|
# TODO: Remove
|
||||||
|
# RNS.log(str(i2ptunnel.status))
|
||||||
|
i2p_exception = i2ptunnel.status["exception"]
|
||||||
|
|
||||||
|
if i2ptunnel.status["setup_ran"] == False:
|
||||||
|
RNS.log(str(self)+" I2P tunnel setup did not complete", RNS.LOG_ERROR)
|
||||||
|
return False
|
||||||
|
|
||||||
|
elif i2p_exception != None:
|
||||||
|
RNS.log(str(self)+" An error ocurred while setting up I2P tunnel. The contained exception was: "+str(i2p_exception), RNS.LOG_ERROR)
|
||||||
|
RNS.log("Resetting I2P tunnel", RNS.LOG_ERROR)
|
||||||
|
return False
|
||||||
|
|
||||||
|
elif i2ptunnel.status["setup_failed"] == True:
|
||||||
|
RNS.log(str(self)+" Unspecified I2P tunnel setup error, resetting I2P tunnel", RNS.LOG_ERROR)
|
||||||
|
return False
|
||||||
|
|
||||||
|
else:
|
||||||
|
RNS.log(str(self)+" Got no status from SAM API, resetting I2P tunnel", RNS.LOG_ERROR)
|
||||||
|
return False
|
||||||
|
|
||||||
time.sleep(5)
|
time.sleep(5)
|
||||||
|
|
||||||
def get_loop(self):
|
def get_loop(self):
|
||||||
@ -288,17 +352,21 @@ class I2PInterfacePeer(Interface):
|
|||||||
self.initiator = True
|
self.initiator = True
|
||||||
|
|
||||||
self.bind_ip = "127.0.0.1"
|
self.bind_ip = "127.0.0.1"
|
||||||
self.bind_port = self.parent_interface.i2p.get_free_port()
|
|
||||||
self.local_addr = (self.bind_ip, self.bind_port)
|
|
||||||
self.target_ip = self.bind_ip
|
|
||||||
self.target_port = self.bind_port
|
|
||||||
|
|
||||||
self.awaiting_i2p_tunnel = True
|
self.awaiting_i2p_tunnel = True
|
||||||
|
|
||||||
def tunnel_job():
|
def tunnel_job():
|
||||||
while self.awaiting_i2p_tunnel:
|
while self.awaiting_i2p_tunnel:
|
||||||
try:
|
try:
|
||||||
self.parent_interface.i2p.client_tunnel(self, target_i2p_dest)
|
self.bind_port = self.parent_interface.i2p.get_free_port()
|
||||||
|
self.local_addr = (self.bind_ip, self.bind_port)
|
||||||
|
self.target_ip = self.bind_ip
|
||||||
|
self.target_port = self.bind_port
|
||||||
|
|
||||||
|
if not self.parent_interface.i2p.client_tunnel(self, target_i2p_dest):
|
||||||
|
RNS.log(str(self)+" I2P control process experienced an error, requesting new tunnel...", RNS.LOG_ERROR)
|
||||||
|
self.awaiting_i2p_tunnel = True
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
RNS.log("Error while while configuring "+str(self)+": "+str(e), RNS.LOG_ERROR)
|
RNS.log("Error while while configuring "+str(self)+": "+str(e), RNS.LOG_ERROR)
|
||||||
RNS.log("Check that I2P is installed and running, and that SAM is enabled. Retrying tunnel setup later.", RNS.LOG_ERROR)
|
RNS.log("Check that I2P is installed and running, and that SAM is enabled. Retrying tunnel setup later.", RNS.LOG_ERROR)
|
||||||
@ -646,11 +714,11 @@ class I2PInterface(Interface):
|
|||||||
|
|
||||||
if self.connectable:
|
if self.connectable:
|
||||||
def tunnel_job():
|
def tunnel_job():
|
||||||
tunnel_ready = False
|
while True:
|
||||||
while not tunnel_ready:
|
|
||||||
try:
|
try:
|
||||||
tunnel_ready = self.i2p.server_tunnel(self)
|
if not self.i2p.server_tunnel(self):
|
||||||
self.online = True
|
RNS.log(str(self)+" I2P control process experienced an error, requesting new tunnel...", RNS.LOG_ERROR)
|
||||||
|
self.online = False
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
RNS.log("Error while while configuring "+str(self)+": "+str(e), RNS.LOG_ERROR)
|
RNS.log("Error while while configuring "+str(self)+": "+str(e), RNS.LOG_ERROR)
|
||||||
|
29
RNS/vendor/i2plib/tunnel.py
vendored
29
RNS/vendor/i2plib/tunnel.py
vendored
@ -85,13 +85,16 @@ class ClientTunnel(I2PTunnel):
|
|||||||
"""A coroutine used to run the tunnel"""
|
"""A coroutine used to run the tunnel"""
|
||||||
await self._pre_run()
|
await self._pre_run()
|
||||||
|
|
||||||
self.status = { "setup_ran": False, "setup_failed": False, "exception": None }
|
self.status = { "setup_ran": False, "setup_failed": False, "exception": None, "connect_tasks": [] }
|
||||||
async def handle_client(client_reader, client_writer):
|
async def handle_client(client_reader, client_writer):
|
||||||
"""Handle local client connection"""
|
"""Handle local client connection"""
|
||||||
try:
|
try:
|
||||||
remote_reader, remote_writer = await aiosam.stream_connect(
|
sc_task = aiosam.stream_connect(
|
||||||
self.session_name, self.remote_destination,
|
self.session_name, self.remote_destination,
|
||||||
sam_address=self.sam_address, loop=self.loop)
|
sam_address=self.sam_address, loop=self.loop)
|
||||||
|
self.status["connect_tasks"].append(sc_task)
|
||||||
|
|
||||||
|
remote_reader, remote_writer = await sc_task
|
||||||
asyncio.ensure_future(proxy_data(remote_reader, client_writer),
|
asyncio.ensure_future(proxy_data(remote_reader, client_writer),
|
||||||
loop=self.loop)
|
loop=self.loop)
|
||||||
asyncio.ensure_future(proxy_data(client_reader, remote_writer),
|
asyncio.ensure_future(proxy_data(client_reader, remote_writer),
|
||||||
@ -102,9 +105,16 @@ class ClientTunnel(I2PTunnel):
|
|||||||
self.status["setup_failed"] = True
|
self.status["setup_failed"] = True
|
||||||
self.status["exception"] = e
|
self.status["exception"] = e
|
||||||
|
|
||||||
|
|
||||||
|
try:
|
||||||
self.server = await asyncio.start_server(handle_client, *self.local_address)
|
self.server = await asyncio.start_server(handle_client, *self.local_address)
|
||||||
self.status["setup_ran"] = True
|
self.status["setup_ran"] = True
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
self.status["setup_ran"] = True
|
||||||
|
self.status["setup_failed"] = True
|
||||||
|
self.status["exception"] = e
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
super().stop()
|
super().stop()
|
||||||
self.server.close()
|
self.server.close()
|
||||||
@ -125,26 +135,38 @@ class ServerTunnel(I2PTunnel):
|
|||||||
"""A coroutine used to run the tunnel"""
|
"""A coroutine used to run the tunnel"""
|
||||||
await self._pre_run()
|
await self._pre_run()
|
||||||
|
|
||||||
|
self.status = { "setup_ran": False, "setup_failed": False, "exception": None, "connect_tasks": [] }
|
||||||
async def handle_client(incoming, client_reader, client_writer):
|
async def handle_client(incoming, client_reader, client_writer):
|
||||||
|
try:
|
||||||
# data and dest may come in one chunk
|
# data and dest may come in one chunk
|
||||||
dest, data = incoming.split(b"\n", 1)
|
dest, data = incoming.split(b"\n", 1)
|
||||||
remote_destination = sam.Destination(dest.decode())
|
remote_destination = sam.Destination(dest.decode())
|
||||||
logger.debug("{} client connected: {}.b32.i2p".format(
|
logger.debug("{} client connected: {}.b32.i2p".format(
|
||||||
self.session_name, remote_destination.base32))
|
self.session_name, remote_destination.base32))
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
self.status["exception"] = e
|
||||||
|
self.status["setup_failed"] = True
|
||||||
|
|
||||||
try:
|
try:
|
||||||
remote_reader, remote_writer = await asyncio.wait_for(
|
sc_task = asyncio.wait_for(
|
||||||
asyncio.open_connection(
|
asyncio.open_connection(
|
||||||
host=self.local_address[0],
|
host=self.local_address[0],
|
||||||
port=self.local_address[1]),
|
port=self.local_address[1]),
|
||||||
timeout=5)
|
timeout=5)
|
||||||
|
self.status["connect_tasks"].append(sc_task)
|
||||||
|
|
||||||
|
remote_reader, remote_writer = await sc_task
|
||||||
if data: remote_writer.write(data)
|
if data: remote_writer.write(data)
|
||||||
asyncio.ensure_future(proxy_data(remote_reader, client_writer),
|
asyncio.ensure_future(proxy_data(remote_reader, client_writer),
|
||||||
loop=self.loop)
|
loop=self.loop)
|
||||||
asyncio.ensure_future(proxy_data(client_reader, remote_writer),
|
asyncio.ensure_future(proxy_data(client_reader, remote_writer),
|
||||||
loop=self.loop)
|
loop=self.loop)
|
||||||
|
|
||||||
except ConnectionRefusedError:
|
except ConnectionRefusedError:
|
||||||
client_writer.close()
|
client_writer.close()
|
||||||
|
self.status["exception"] = e
|
||||||
|
self.status["setup_failed"] = True
|
||||||
|
|
||||||
async def server_loop():
|
async def server_loop():
|
||||||
try:
|
try:
|
||||||
@ -159,6 +181,7 @@ class ServerTunnel(I2PTunnel):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
self.server_loop = asyncio.ensure_future(server_loop(), loop=self.loop)
|
self.server_loop = asyncio.ensure_future(server_loop(), loop=self.loop)
|
||||||
|
self.status["setup_ran"] = True
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
super().stop()
|
super().stop()
|
||||||
|
Loading…
Reference in New Issue
Block a user