From e0e1868e507b8202db59029cc16ea1767f02b607 Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Wed, 13 May 2020 20:33:10 +0200 Subject: [PATCH] Transport handling of announces and path requests for shared instance --- RNS/Identity.py | 3 +- RNS/Interfaces/LocalInterface.py | 9 ++ RNS/Reticulum.py | 3 - RNS/Transport.py | 217 ++++++++++++++++++++++++------- 4 files changed, 178 insertions(+), 54 deletions(-) diff --git a/RNS/Identity.py b/RNS/Identity.py index 025abbb..a7e0058 100644 --- a/RNS/Identity.py +++ b/RNS/Identity.py @@ -33,7 +33,6 @@ class Identity: @staticmethod def remember(packet_hash, destination_hash, public_key, app_data = None): - RNS.log("Remembering "+RNS.prettyhexrep(destination_hash), RNS.LOG_VERBOSE) Identity.known_destinations[destination_hash] = [time.time(), packet_hash, public_key, app_data] @@ -108,7 +107,7 @@ class Identity: if announced_identity.pub != None and announced_identity.validate(signature, signed_data): RNS.Identity.remember(packet.getHash(), destination_hash, public_key) - RNS.log("Stored valid announce from "+RNS.prettyhexrep(destination_hash), RNS.LOG_INFO) + RNS.log("Stored valid announce from "+RNS.prettyhexrep(destination_hash), RNS.LOG_DEBUG) del announced_identity return True else: diff --git a/RNS/Interfaces/LocalInterface.py b/RNS/Interfaces/LocalInterface.py index 4582814..c187f4f 100644 --- a/RNS/Interfaces/LocalInterface.py +++ b/RNS/Interfaces/LocalInterface.py @@ -44,6 +44,8 @@ class LocalClientInterface(Interface): self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.socket.connect((self.target_ip, self.target_port)) + self.is_connected_to_shared_instance = True + self.owner = owner self.online = True self.writing = False @@ -118,9 +120,13 @@ class LocalClientInterface(Interface): self.online = False self.OUT = False self.IN = False + if self in RNS.Transport.interfaces: RNS.Transport.interfaces.remove(self) + if self in RNS.Transport.local_client_interfaces: + RNS.Transport.local_client_interfaces.remove(self) + def __str__(self): return "LocalInterface["+str(self.target_port)+"]" @@ -144,6 +150,8 @@ class LocalServerInterface(Interface): return createHandler self.owner = owner + self.is_local_shared_instance = True + address = (self.bind_ip, self.bind_port) self.server = ThreadingTCPServer(address, handlerFactory(self.incoming_connection)) @@ -162,6 +170,7 @@ class LocalServerInterface(Interface): spawned_interface.parent_interface = self RNS.log("Accepting new connection to shared instance: "+str(spawned_interface), RNS.LOG_VERBOSE) RNS.Transport.interfaces.append(spawned_interface) + RNS.Transport.local_client_interfaces.append(spawned_interface) spawned_interface.read_loop() def processOutgoing(self, data): diff --git a/RNS/Reticulum.py b/RNS/Reticulum.py index b98cb51..5f4321e 100755 --- a/RNS/Reticulum.py +++ b/RNS/Reticulum.py @@ -159,9 +159,6 @@ class Reticulum: self.start_local_interface() if self.is_shared_instance or self.is_standalone_instance: - # TODO: Remove - RNS.log("Starting local interfaces...") - interface_names = [] for name in self.config["interfaces"]: if not name in interface_names: diff --git a/RNS/Transport.py b/RNS/Transport.py index 3f2db41..bcda7c2 100755 --- a/RNS/Transport.py +++ b/RNS/Transport.py @@ -35,7 +35,7 @@ class Transport: # various situations LOCAL_REBROADCASTS_MAX = 2 # How many local rebroadcasts of an announce is allowed - PATH_REQUEST_GRACE = 0.25 # Grace time before a path announcement is made, allows directly reachable peers to respond first + PATH_REQUEST_GRACE = 0.35 # Grace time before a path announcement is made, allows directly reachable peers to respond first PATH_REQUEST_RW = 2 # Path request random window LINK_TIMEOUT = RNS.Link.KEEPALIVE * 2 @@ -50,11 +50,17 @@ class Transport: packet_hashlist = [] # A list of packet hashes for duplicate detection receipts = [] # Receipts of all outgoing packets for proof processing + # Interfaces for communicating with + # local clients connected to a shared + # Reticulum instance + local_client_interfaces = [] + # TODO: "destination_table" should really be renamed to "path_table" - announce_table = {} # A table for storing announces currently waiting to be retransmitted - destination_table = {} # A lookup table containing the next hop to a given destination - reverse_table = {} # A lookup table for storing packet hashes used to return proofs and replies - link_table = {} # A lookup table containing hops for links + announce_table = {} # A table for storing announces currently waiting to be retransmitted + destination_table = {} # A lookup table containing the next hop to a given destination + reverse_table = {} # A lookup table for storing packet hashes used to return proofs and replies + link_table = {} # A lookup table containing hops for links + held_announces = {} # A table containing temporarily held announce-table entries jobs_locked = False jobs_running = False @@ -168,35 +174,60 @@ class Transport: Transport.receipts_last_checked = time.time() - if RNS.Reticulum.transport_enabled(): - # Process announces needing retransmission - if time.time() > Transport.announces_last_checked+Transport.announces_check_interval: - for destination_hash in Transport.announce_table: - announce_entry = Transport.announce_table[destination_hash] - if announce_entry[2] > Transport.PATHFINDER_R: - RNS.log("Dropping announce for "+RNS.prettyhexrep(destination_hash)+", retries exceeded", RNS.LOG_DEBUG) - Transport.announce_table.pop(destination_hash) - break - else: - if time.time() > announce_entry[1]: - announce_entry[1] = time.time() + math.pow(Transport.PATHFINDER_C, announce_entry[4]) + Transport.PATHFINDER_T + Transport.PATHFINDER_RW - announce_entry[2] += 1 - packet = announce_entry[5] - block_rebroadcasts = announce_entry[7] - announce_context = RNS.Packet.NONE - if block_rebroadcasts: - announce_context = RNS.Packet.PATH_RESPONSE - announce_data = packet.data - announce_identity = RNS.Identity.recall(packet.destination_hash) - announce_destination = RNS.Destination(announce_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, "unknown", "unknown"); - announce_destination.hash = packet.destination_hash - announce_destination.hexhash = announce_destination.hash.hex() - new_packet = RNS.Packet(announce_destination, announce_data, RNS.Packet.ANNOUNCE, context = announce_context, header_type = RNS.Packet.HEADER_2, transport_type = Transport.TRANSPORT, transport_id = Transport.identity.hash) - new_packet.hops = announce_entry[4] - RNS.log("Rebroadcasting announce for "+RNS.prettyhexrep(announce_destination.hash)+" with hop count "+str(new_packet.hops), RNS.LOG_DEBUG) - outgoing.append(new_packet) + # Process announces needing retransmission + if time.time() > Transport.announces_last_checked+Transport.announces_check_interval: + for destination_hash in Transport.announce_table: + announce_entry = Transport.announce_table[destination_hash] + if announce_entry[2] > Transport.PATHFINDER_R: + RNS.log("Dropping announce for "+RNS.prettyhexrep(destination_hash)+", retries exceeded", RNS.LOG_DEBUG) + Transport.announce_table.pop(destination_hash) + break + else: + if time.time() > announce_entry[1]: + announce_entry[1] = time.time() + math.pow(Transport.PATHFINDER_C, announce_entry[4]) + Transport.PATHFINDER_T + Transport.PATHFINDER_RW + announce_entry[2] += 1 + packet = announce_entry[5] + block_rebroadcasts = announce_entry[7] + attached_interface = announce_entry[8] + announce_context = RNS.Packet.NONE + if block_rebroadcasts: + announce_context = RNS.Packet.PATH_RESPONSE + announce_data = packet.data + announce_identity = RNS.Identity.recall(packet.destination_hash) + announce_destination = RNS.Destination(announce_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, "unknown", "unknown"); + announce_destination.hash = packet.destination_hash + announce_destination.hexhash = announce_destination.hash.hex() + + new_packet = RNS.Packet( + announce_destination, + announce_data, + RNS.Packet.ANNOUNCE, + context = announce_context, + header_type = RNS.Packet.HEADER_2, + transport_type = Transport.TRANSPORT, + transport_id = Transport.identity.hash, + attached_interface = attached_interface + ) - Transport.announces_last_checked = time.time() + new_packet.hops = announce_entry[4] + if block_rebroadcasts: + RNS.log("Rebroadcasting announce as path response for "+RNS.prettyhexrep(announce_destination.hash)+" with hop count "+str(new_packet.hops), RNS.LOG_DEBUG) + else: + RNS.log("Rebroadcasting announce for "+RNS.prettyhexrep(announce_destination.hash)+" with hop count "+str(new_packet.hops), RNS.LOG_DEBUG) + outgoing.append(new_packet) + + # This handles an edge case where a peer sends a past + # request for a destination just after an announce for + # said destination has arrived, but before it has been + # rebroadcast locally. In such a case the actual announce + # is temporarily held, and then reinserted when the path + # request has been served to the peer. + if destination_hash in Transport.held_announces: + held_entry = Transport.held_announces.pop(destination_hash) + Transport.announce_table[destination_hash] = held_entry + RNS.log("Reinserting held announce into table", RNS.LOG_DEBUG) + + Transport.announces_last_checked = time.time() # Cull the packet hashlist if it has reached max size @@ -365,6 +396,21 @@ class Transport: RNS.log(str(interface)+" received packet with hash "+RNS.prettyhexrep(packet.packet_hash), RNS.LOG_EXTREME) + if len(Transport.local_client_interfaces) > 0: + new_raw = packet.raw[0:1] + new_raw += struct.pack("!B", packet.hops) + new_raw += packet.raw[2:] + + for local_client in Transport.local_client_interfaces: + if local_client != interface: + local_client.processOutgoing(new_raw) + + if Transport.is_local_client_interface(interface): + packet.hops -= 1 + elif Transport.interface_to_shared_instance(interface): + packet.hops -= 1 + + if Transport.packet_filter(packet): Transport.packet_hashlist.append(packet.packet_hash) Transport.cache(packet) @@ -382,7 +428,7 @@ class Transport: RNS.log("Next hop to destination is "+RNS.prettyhexrep(next_hop)+" with "+str(remaining_hops)+" hops remaining, transporting it.", RNS.LOG_DEBUG) if remaining_hops > 1: # Just increase hop count and transmit - new_raw = packet.raw[0:1] + new_raw = packet.raw[0:1] new_raw += struct.pack("!B", packet.hops) new_raw += next_hop new_raw += packet.raw[12:] @@ -542,19 +588,39 @@ class Transport: should_add = True if should_add: - now = time.time() - retries = 0 - expires = now + Transport.PATHFINDER_E + now = time.time() + retries = 0 + expires = now + Transport.PATHFINDER_E + announce_hops = packet.hops local_rebroadcasts = 0 block_rebroadcasts = False - random_blobs.append(random_blob) + attached_interface = None retransmit_timeout = now + math.pow(Transport.PATHFINDER_C, packet.hops) + (RNS.rand() * Transport.PATHFINDER_RW) + + random_blobs.append(random_blob) - if RNS.Reticulum.transport_enabled() and packet.context != RNS.Packet.PATH_RESPONSE: - Transport.announce_table[packet.destination_hash] = [now, retransmit_timeout, retries, received_from, packet.hops, packet, local_rebroadcasts, block_rebroadcasts] + if (RNS.Reticulum.transport_enabled() or Transport.from_local_client(packet)) and packet.context != RNS.Packet.PATH_RESPONSE: + # If the announce is from a local client, + # we announce it immediately, but only one + # time, and also set the hops to 0. + if Transport.from_local_client(packet): + retransmit_timeout = now + retries = Transport.PATHFINDER_R - Transport.destination_table[packet.destination_hash] = [now, received_from, packet.hops, expires, random_blobs, packet.receiving_interface, packet] - RNS.log("Path to "+RNS.prettyhexrep(packet.destination_hash)+" is now "+str(packet.hops)+" hops away via "+RNS.prettyhexrep(received_from)+" on "+str(packet.receiving_interface), RNS.LOG_DEBUG) + Transport.announce_table[packet.destination_hash] = [ + now, + retransmit_timeout, + retries, + received_from, + announce_hops, + packet, + local_rebroadcasts, + block_rebroadcasts, + attached_interface + ] + + Transport.destination_table[packet.destination_hash] = [now, received_from, announce_hops, expires, random_blobs, packet.receiving_interface, packet] + RNS.log("Path to "+RNS.prettyhexrep(packet.destination_hash)+" is now "+str(announce_hops)+" hops away via "+RNS.prettyhexrep(received_from)+" on "+str(packet.receiving_interface), RNS.LOG_VERBOSE) elif packet.packet_type == RNS.Packet.LINKREQUEST: for destination in Transport.destinations: @@ -764,12 +830,23 @@ class Transport: packet.send() @staticmethod - def pathRequestHandler(data, packet): - if len(data) >= RNS.Identity.TRUNCATED_HASHLENGTH//8: - Transport.pathRequest(data[:RNS.Identity.TRUNCATED_HASHLENGTH//8]) + def requestPathOnInterface(destination_hash, interface): + path_request_data = destination_hash + RNS.Identity.getRandomHash() + path_request_dst = RNS.Destination(None, RNS.Destination.OUT, RNS.Destination.PLAIN, Transport.APP_NAME, "path", "request") + packet = RNS.Packet(path_request_dst, path_request_data, packet_type = RNS.Packet.DATA, transport_type = RNS.Transport.BROADCAST, header_type = RNS.Packet.HEADER_1, attached_interface = interface) + packet.send() @staticmethod - def pathRequest(destination_hash): + def pathRequestHandler(data, packet): + if len(data) >= RNS.Identity.TRUNCATED_HASHLENGTH//8: + Transport.pathRequest( + data[:RNS.Identity.TRUNCATED_HASHLENGTH//8], + Transport.from_local_client(packet), + packet.receiving_interface + ) + + @staticmethod + def pathRequest(destination_hash, is_from_local_client, attached_interface): RNS.log("Path request for "+RNS.prettyhexrep(destination_hash), RNS.LOG_DEBUG) local_destination = next((d for d in Transport.destinations if d.hash == destination_hash), None) @@ -777,7 +854,7 @@ class Transport: RNS.log("Destination is local to this system, announcing", RNS.LOG_DEBUG) local_destination.announce(path_response=True) - elif RNS.Reticulum.transport_enabled() and destination_hash in Transport.destination_table: + elif (RNS.Reticulum.transport_enabled() or is_from_local_client) and destination_hash in Transport.destination_table: RNS.log("Path found, inserting announce for transmission", RNS.LOG_DEBUG) packet = Transport.destination_table[destination_hash][6] received_from = Transport.destination_table[destination_hash][5] @@ -786,9 +863,27 @@ class Transport: retries = Transport.PATHFINDER_R local_rebroadcasts = 0 block_rebroadcasts = True + announce_hops = packet.hops retransmit_timeout = now + Transport.PATH_REQUEST_GRACE # + (RNS.rand() * Transport.PATHFINDER_RW) - Transport.announce_table[packet.destination_hash] = [now, retransmit_timeout, retries, received_from, packet.hops, packet, local_rebroadcasts, block_rebroadcasts] + # This handles an edge case where a peer sends a past + # request for a destination just after an announce for + # said destination has arrived, but before it has been + # rebroadcast locally. In such a case the actual announce + # is temporarily held, and then reinserted when the path + # request has been served to the peer. + if packet.destination_hash in Transport.announce_table: + held_entry = Transport.announce_table[packet.destination_hash] + Transport.held_announces[packet.destination_hash] = held_entry + + Transport.announce_table[packet.destination_hash] = [now, retransmit_timeout, retries, received_from, announce_hops, packet, local_rebroadcasts, block_rebroadcasts, attached_interface] + + elif is_from_local_client: + # Forward path request on all interfaces + # except the local client + for interface in Transport.interfaces: + if not interface == attached_interface: + Transport.requestPathOnInterface(destination_hash, interface) else: RNS.log("No known path to requested destination, ignoring request", RNS.LOG_DEBUG) @@ -800,6 +895,30 @@ class Transport: # TODO: implement this pass + @staticmethod + def from_local_client(packet): + if hasattr(packet.receiving_interface, "parent_interface"): + return Transport.is_local_client_interface(packet.receiving_interface) + else: + return False + + @staticmethod + def is_local_client_interface(interface): + if hasattr(interface, "parent_interface"): + if hasattr(interface.parent_interface, "is_local_shared_instance"): + return True + else: + return False + else: + return False + + @staticmethod + def interface_to_shared_instance(interface): + if hasattr(interface, "is_connected_to_shared_instance"): + return True + else: + return False + @staticmethod def exitHandler(): RNS.log("Saving packet hashlist to storage...", RNS.LOG_VERBOSE) @@ -820,7 +939,7 @@ class Transport: de = Transport.destination_table[destination_hash] interface_hash = de[5].get_hash() - # Only store destination tablee entry if the associated + # Only store destination table entry if the associated # interface is still active interface = Transport.find_interface_from_hash(interface_hash) if interface != None: