Improved I2P Interface

This commit is contained in:
Mark Qvist 2022-02-24 01:30:10 +01:00
parent 987ff0658b
commit 48184134e4
3 changed files with 89 additions and 36 deletions

View File

@ -48,6 +48,8 @@ class I2PController:
import RNS.vendor.i2plib as i2plib import RNS.vendor.i2plib as i2plib
import RNS.vendor.i2plib.utils import RNS.vendor.i2plib.utils
self.client_tunnels = {}
self.server_tunnels = {}
self.loop = None self.loop = None
self.i2plib = i2plib self.i2plib = i2plib
self.utils = i2plib.utils self.utils = i2plib.utils
@ -68,28 +70,46 @@ class I2PController:
finally: finally:
self.loop.close() self.loop.close()
def stop(self): def stop(self):
for task in asyncio.Task.all_tasks(loop=self.loop): for task in asyncio.Task.all_tasks(loop=self.loop):
task.cancel() task.cancel()
self.loop.stop() self.loop.stop()
def get_free_port(self): def get_free_port(self):
return self.i2plib.utils.get_free_port() return self.i2plib.utils.get_free_port()
def client_tunnel(self, owner, i2p_destination): def client_tunnel(self, owner, i2p_destination):
self.client_tunnels[i2p_destination] = False
while True:
if not self.client_tunnels[i2p_destination]:
try: try:
async def tunnel_up(): async def tunnel_up():
RNS.log("Bringing up I2P tunnel to "+str(owner)+" in background, this may take a while...", RNS.LOG_INFO) RNS.log("Bringing up I2P tunnel to "+str(owner)+", this may take a while...", RNS.LOG_INFO)
tunnel = self.i2plib.ClientTunnel(i2p_destination, owner.local_addr, sam_address=self.sam_address) tunnel = self.i2plib.ClientTunnel(i2p_destination, owner.local_addr, sam_address=self.sam_address, loop=self.loop)
await tunnel.run() await tunnel.run()
owner.awaiting_i2p_tunnel = False
RNS.log(str(owner)+ " tunnel setup complete", RNS.LOG_VERBOSE) RNS.log(str(owner)+ " tunnel setup complete", RNS.LOG_VERBOSE)
asyncio.run_coroutine_threadsafe(tunnel_up(), self.loop) try:
self.loop.ext_owner = self
future = asyncio.run_coroutine_threadsafe(tunnel_up(), self.loop).result()
self.client_tunnels[i2p_destination] = True
except Exception as e:
RNS.log("Error while setting up I2P tunnel: "+str(e))
raise e
except Exception as e: except Exception as e:
raise IOError("Could not connect to I2P SAM API while configuring to "+str(owner)+". Check that I2P is running and SAM is enabled.") raise IOError("Could not connect to I2P SAM API while configuring to "+str(owner)+". Check that I2P is running and SAM is enabled.")
time.sleep(5)
def server_tunnel(self, owner): def server_tunnel(self, owner):
i2p_dest_hash = RNS.Identity.full_hash(RNS.Identity.full_hash(owner.name.encode("utf-8"))) i2p_dest_hash = RNS.Identity.full_hash(RNS.Identity.full_hash(owner.name.encode("utf-8")))
@ -110,19 +130,24 @@ class I2PController:
i2p_b32 = i2p_dest.base32 i2p_b32 = i2p_dest.base32
self.server_tunnels[i2p_b32] = False
while self.server_tunnels[i2p_b32] == False:
try: try:
async def tunnel_up(): async def tunnel_up():
RNS.log(str(owner)+" Bringing up I2P tunnel in background, this may take a while...", RNS.LOG_INFO) RNS.log(str(owner)+" Bringing up I2P endpoint, this may take a while...", RNS.LOG_INFO)
tunnel = self.i2plib.ServerTunnel((owner.bind_ip, owner.bind_port), loop=self.loop, destination=i2p_dest, sam_address=self.sam_address) tunnel = self.i2plib.ServerTunnel((owner.bind_ip, owner.bind_port), loop=self.loop, destination=i2p_dest, sam_address=self.sam_address)
await tunnel.run() await tunnel.run()
owner.awaiting_i2p_tunnel = False RNS.log(str(owner)+ " endpoint setup complete. Now reachable at: "+str(i2p_dest.base32)+".b32.i2p", RNS.LOG_VERBOSE)
RNS.log(str(owner)+ " tunnel setup complete, instance reachable at: "+str(i2p_dest.base32)+".b32.i2p", RNS.LOG_VERBOSE)
asyncio.run_coroutine_threadsafe(tunnel_up(), self.loop) asyncio.run_coroutine_threadsafe(tunnel_up(), self.loop).result()
self.server_tunnels[i2p_b32] = True
except Exception as e: except Exception as e:
raise IOError("Could not connect to I2P SAM API while configuring "+str(self)+". Check that I2P is running and SAM is enabled.") raise IOError("Could not connect to I2P SAM API while configuring "+str(self)+". Check that I2P is running and SAM is enabled.")
time.sleep(5)
def get_loop(self): def get_loop(self):
return asyncio.get_event_loop() return asyncio.get_event_loop()
@ -148,6 +173,7 @@ class I2PInterfacePeer(Interface):
self.OUT = False self.OUT = False
self.socket = None self.socket = None
self.parent_interface = parent_interface self.parent_interface = parent_interface
self.parent_count = True
self.name = name self.name = name
self.initiator = False self.initiator = False
self.reconnecting = False self.reconnecting = False
@ -188,8 +214,21 @@ class I2PInterfacePeer(Interface):
self.target_port = self.bind_port self.target_port = self.bind_port
self.awaiting_i2p_tunnel = True self.awaiting_i2p_tunnel = True
def tunnel_job():
self.parent_interface.i2p.client_tunnel(self, target_i2p_dest) self.parent_interface.i2p.client_tunnel(self, target_i2p_dest)
thread = threading.Thread(target=tunnel_job)
thread.setDaemon(True)
thread.start()
def wait_job():
while self.awaiting_i2p_tunnel:
time.sleep(0.25)
if not self.kiss_framing:
self.wants_tunnel = True
if not self.connect(initial=True): if not self.connect(initial=True):
thread = threading.Thread(target=self.reconnect) thread = threading.Thread(target=self.reconnect)
thread.setDaemon(True) thread.setDaemon(True)
@ -198,8 +237,10 @@ class I2PInterfacePeer(Interface):
thread = threading.Thread(target=self.read_loop) thread = threading.Thread(target=self.read_loop)
thread.setDaemon(True) thread.setDaemon(True)
thread.start() thread.start()
if not self.kiss_framing:
self.wants_tunnel = True thread = threading.Thread(target=wait_job)
thread.setDaemon(True)
thread.start()
def set_timeouts_linux(self): def set_timeouts_linux(self):
@ -274,6 +315,9 @@ class I2PInterfacePeer(Interface):
self.writing = False self.writing = False
self.never_connected = False self.never_connected = False
if not self.kiss_framing and self.wants_tunnel:
RNS.Transport.synthesize_tunnel(self)
return True return True
@ -316,7 +360,7 @@ class I2PInterfacePeer(Interface):
def processIncoming(self, data): def processIncoming(self, data):
self.rxb += len(data) self.rxb += len(data)
if hasattr(self, "parent_interface") and self.parent_interface != None: if hasattr(self, "parent_interface") and self.parent_interface != None and self.parent_count:
self.parent_interface.rxb += len(data) self.parent_interface.rxb += len(data)
self.owner.inbound(data, self) self.owner.inbound(data, self)
@ -337,7 +381,7 @@ class I2PInterfacePeer(Interface):
self.socket.sendall(data) self.socket.sendall(data)
self.writing = False self.writing = False
self.txb += len(data) self.txb += len(data)
if hasattr(self, "parent_interface") and self.parent_interface != None: if hasattr(self, "parent_interface") and self.parent_interface != None and self.parent_count:
self.parent_interface.txb += len(data) self.parent_interface.txb += len(data)
except Exception as e: except Exception as e:
@ -456,12 +500,13 @@ class I2PInterfacePeer(Interface):
class I2PInterface(Interface): class I2PInterface(Interface):
def __init__(self, owner, name, rns_storagepath, peers): def __init__(self, owner, name, rns_storagepath, peers, connectable = True):
self.rxb = 0 self.rxb = 0
self.txb = 0 self.txb = 0
self.online = False self.online = False
self.clients = 0 self.clients = 0
self.owner = owner self.owner = owner
self.connectable = connectable
self.i2p_tunneled = True self.i2p_tunneled = True
self.i2p = I2PController(rns_storagepath) self.i2p = I2PController(rns_storagepath)
@ -492,8 +537,14 @@ class I2PInterface(Interface):
thread.setDaemon(True) thread.setDaemon(True)
thread.start() thread.start()
if self.connectable:
def tunnel_job():
self.i2p.server_tunnel(self) self.i2p.server_tunnel(self)
thread = threading.Thread(target=tunnel_job)
thread.setDaemon(True)
thread.start()
if peers != None: if peers != None:
for peer_addr in peers: for peer_addr in peers:
interface_name = peer_addr interface_name = peer_addr
@ -501,6 +552,7 @@ class I2PInterface(Interface):
peer_interface.OUT = True peer_interface.OUT = True
peer_interface.IN = True peer_interface.IN = True
peer_interface.parent_interface = self peer_interface.parent_interface = self
peer_interface.parent_count = False
RNS.Transport.interfaces.append(peer_interface) RNS.Transport.interfaces.append(peer_interface)
self.online = True self.online = True

View File

@ -399,12 +399,14 @@ class Reticulum:
if c["type"] == "I2PInterface": if c["type"] == "I2PInterface":
i2p_peers = c.as_list("peers") if "peers" in c else None i2p_peers = c.as_list("peers") if "peers" in c else None
connectable = c.as_bool("connectable") if "connectable" in c else False
interface = I2PInterface.I2PInterface( interface = I2PInterface.I2PInterface(
RNS.Transport, RNS.Transport,
name, name,
Reticulum.storagepath, Reticulum.storagepath,
i2p_peers i2p_peers,
connectable = connectable,
) )
if "outgoing" in c and c.as_bool("outgoing") == True: if "outgoing" in c and c.as_bool("outgoing") == True:

View File

@ -95,8 +95,7 @@ class ClientTunnel(I2PTunnel):
asyncio.ensure_future(proxy_data(client_reader, remote_writer), asyncio.ensure_future(proxy_data(client_reader, remote_writer),
loop=self.loop) loop=self.loop)
self.server = await asyncio.start_server(handle_client, *self.local_address, self.server = await asyncio.start_server(handle_client, *self.local_address, loop=self.loop)
loop=self.loop)
def stop(self): def stop(self):
super().stop() super().stop()