diff --git a/RNS/Destination.py b/RNS/Destination.py index 96fc587..8ba3224 100755 --- a/RNS/Destination.py +++ b/RNS/Destination.py @@ -208,7 +208,11 @@ class Destination: signature = self.identity.sign(signed_data) + # TODO: Check if this could be optimised by only + # carrying the hash in the destination field, not + # also redundantly inside the signed blob as here announce_data = self.hash+self.identity.getPublicKey()+random_hash+signature + if app_data != None: announce_data += app_data diff --git a/RNS/Packet.py b/RNS/Packet.py index c9f8e26..0cd561a 100755 --- a/RNS/Packet.py +++ b/RNS/Packet.py @@ -62,7 +62,6 @@ class Packet: self.transport_id = transport_id self.data = data self.flags = self.getPackedFlags() - self.MTU = RNS.Reticulum.MTU self.raw = None self.packed = False @@ -74,7 +73,8 @@ class Packet: self.packed = True self.fromPacked = True - self.sent_at = None + self.MTU = RNS.Reticulum.MTU + self.sent_at = None self.packet_hash = None def getPackedFlags(self): @@ -120,8 +120,13 @@ class Packet: self.ciphertext = self.destination.encrypt(self.data) if self.header_type == Packet.HEADER_2: - if t_destination != None: - self.header += self.t_destination + if self.transport_id != None: + self.header += self.transport_id + self.header += self.destination.hash + + if self.packet_type == Packet.ANNOUNCE: + # Announce packets are not encrypted + self.ciphertext = self.data else: raise IOError("Packet with header type 2 must have a transport ID") diff --git a/RNS/Transport.py b/RNS/Transport.py index 72b629a..9217661 100755 --- a/RNS/Transport.py +++ b/RNS/Transport.py @@ -1,6 +1,7 @@ import os import RNS import time +import math import threading import traceback from time import sleep @@ -14,6 +15,15 @@ class Transport: TUNNEL = 0x03; types = [BROADCAST, TRANSPORT, RELAY, TUNNEL] + REACHABILITY_UNREACHABLE = 0x00 + REACHABILITY_DIRECT = 0x01 + REACHABILITY_TRANSPORT = 0x02 + + PATHFINDER_M = 18 # Max hops + PATHFINDER_C = 2.0 # Decay constant + PATHFINDER_R = 2 # Retransmit retries + PATHFINDER_T = 10 # Retry grace period + interfaces = [] # All active interfaces destinations = [] # All active destinations pending_links = [] # Links that are being established @@ -21,12 +31,17 @@ class Transport: packet_hashlist = [] # A list of packet hashes for duplicate detection receipts = [] # Receipts of all outgoing packets for proof processing + announce_table = {} + destination_table = {} + jobs_locked = False jobs_running = False job_interval = 0.250 - receipts_last_checked = 0.0 - receipts_check_interval = 1.0 - hashlist_maxsize = 1000000 + receipts_last_checked = 0.0 + receipts_check_interval = 1.0 + announces_last_checked = 0.0 + announces_check_interval = 1.0 + hashlist_maxsize = 1000000 identity = None @@ -68,11 +83,12 @@ class Transport: @staticmethod def jobs(): + outgoing = [] Transport.jobs_running = True try: if not Transport.jobs_locked: # Process receipts list for timed-out packets - if Transport.receipts_last_checked+Transport.receipts_check_interval < time.time(): + if time.time() > Transport.receipts_last_checked+Transport.receipts_check_interval: for receipt in Transport.receipts: thread = threading.Thread(target=receipt.check_timeout) thread.setDaemon(True) @@ -82,6 +98,37 @@ class Transport: Transport.receipts_last_checked = time.time() + # Process announces needing retransmission + if time.time() > Transport.announces_last_checked+Transport.announces_check_interval: + for destination_hash in Transport.announce_table: + announce_entry = Transport.announce_table[destination_hash] + # TODO: remove comment and log output + # [time_heard, retransmit_timeout, retries, received_from, packet.hops, packet] + # RNS.log("Announce entry retries: "+str(announce_entry[2]), RNS.LOG_INFO) + # RNS.log("Max retries: "+str(Transport.PATHFINDER_R), RNS.LOG_INFO) + if announce_entry[2] > Transport.PATHFINDER_R: + # RNS.log("Removing due to exceeded retries", RNS.LOG_INFO) + Transport.announce_table.pop(destination_hash) + break + else: + if time.time() > announce_entry[1]: + # RNS.log("Rebroadcasting announce", RNS.LOG_INFO) + announce_entry[1] = time.time() + math.pow(Transport.PATHFINDER_C, announce_entry[4]) + Transport.PATHFINDER_T + announce_entry[2] += 1 + packet = announce_entry[5] + announce_data = packet.data + announce_identity = RNS.Identity.recall(packet.destination_hash) + announce_destination = RNS.Destination(announce_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, "unknown", "unknown"); + announce_destination.hash = packet.destination_hash + announce_destination.hexhash = announce_destination.hash.encode("hex_codec") + new_packet = RNS.Packet(announce_destination, announce_data, RNS.Packet.ANNOUNCE, header_type = RNS.Packet.HEADER_2, transport_type = Transport.TRANSPORT, transport_id = Transport.identity.hash) + new_packet.hops = announce_entry[4] + RNS.log("Rebroadcasting announce for "+RNS.prettyhexrep(announce_destination.hash)+" with hop count "+str(new_packet.hops), RNS.LOG_INFO) + outgoing.append(new_packet) + + Transport.announces_last_checked = time.time() + + # Cull the packet hashlist if it has reached max size while (len(Transport.packet_hashlist) > Transport.hashlist_maxsize): Transport.packet_hashlist.pop(0) @@ -93,6 +140,9 @@ class Transport: Transport.jobs_running = False + for packet in outgoing: + packet.send() + @staticmethod def outbound(packet): while (Transport.jobs_running): @@ -165,7 +215,33 @@ class Transport: if packet.packet_type == RNS.Packet.ANNOUNCE: if RNS.Identity.validateAnnounce(packet): - Transport.cache(packet) + if (packet.transport_id != None): + received_from = packet.transport_id + + # Check if this is a next retransmission from + # another node. If it is, we're removing the + # announcein question from our pending table + if packet.destination_hash in Transport.announce_table: + announce_entry = Transport.announce_table[packet.destination_hash] + if packet.hops == announce_entry[4]+1 and announce_entry[2] > 0: + now = time.time() + if now < announce_entry[2]: + # TODO: Remove + RNS.log("Another node retransmitted our transported announce for "+RNS.prettyhexrep(packet.destination_hash)+", removing it from pending table", RNS.LOG_DEBUG) + Transport.announce_table.pop(announce_entry) + + else: + received_from = packet.destination_hash + + # If the announce has not reached the retransmission + # limit, insert it into our pending table + packet.hops += 1 + if (packet.hops < Transport.PATHFINDER_M+1): + now = time.time() + retransmit_timeout = now + math.pow(Transport.PATHFINDER_C, packet.hops) + retries = 0 + Transport.announce_table[packet.destination_hash] = [now, retransmit_timeout, retries, received_from, packet.hops, packet] + Transport.destination_table[packet.destination_hash] = [now, received_from, packet.hops] elif packet.packet_type == RNS.Packet.LINKREQUEST: for destination in Transport.destinations: @@ -288,6 +364,8 @@ class Transport: def cache_request_packet(packet): if len(packet.data) == RNS.Identity.HASHLENGTH/8: packet_hash = RNS.hexrep(packet.data, delimit=False) + # TODO: There's some pretty obvious file access + # issues here. Make sure this can't happen path = RNS.Reticulum.cachepath+"/"+packet_hash if os.path.isfile(path): file = open(path, "r")