Compare commits

..

5 Commits

3 changed files with 87 additions and 31 deletions

View File

@ -900,6 +900,13 @@ class Link:
def register_incoming_resource(self, resource): def register_incoming_resource(self, resource):
self.incoming_resources.append(resource) self.incoming_resources.append(resource)
def has_incoming_resource(self, resource):
for incoming_resource in self.incoming_resources:
if incoming_resource.hash == resource.hash:
return True
return False
def cancel_outgoing_resource(self, resource): def cancel_outgoing_resource(self, resource):
if resource in self.outgoing_resources: if resource in self.outgoing_resources:
self.outgoing_resources.remove(resource) self.outgoing_resources.remove(resource)

View File

@ -172,20 +172,26 @@ class Resource:
resource.consecutive_completed_height = 0 resource.consecutive_completed_height = 0
resource.link.register_incoming_resource(resource) if not resource.link.has_incoming_resource(resource):
resource.link.register_incoming_resource(resource)
RNS.log("Accepting resource advertisement for "+RNS.prettyhexrep(resource.hash), RNS.LOG_DEBUG) RNS.log("Accepting resource advertisement for "+RNS.prettyhexrep(resource.hash), RNS.LOG_DEBUG)
if resource.link.callbacks.resource_started != None: if resource.link.callbacks.resource_started != None:
try: try:
resource.link.callbacks.resource_started(resource) resource.link.callbacks.resource_started(resource)
except Exception as e: except Exception as e:
RNS.log("Error while executing resource started callback from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR) RNS.log("Error while executing resource started callback from "+str(resource)+". The contained exception was: "+str(e), RNS.LOG_ERROR)
resource.hashmap_update(0, resource.hashmap_raw) resource.hashmap_update(0, resource.hashmap_raw)
resource.watchdog_job() resource.watchdog_job()
return resource
else:
RNS.log("Ignoring resource advertisement for "+RNS.prettyhexrep(resource.hash)+", resource already transferring", RNS.LOG_DEBUG)
return None
return resource
except Exception as e: except Exception as e:
RNS.log("Could not decode resource advertisement, dropping resource", RNS.LOG_DEBUG) RNS.log("Could not decode resource advertisement, dropping resource", RNS.LOG_DEBUG)
return None return None
@ -397,8 +403,7 @@ class Resource:
thread.start() thread.start()
def __advertise_job(self): def __advertise_job(self):
data = ResourceAdvertisement(self).pack() self.advertisement_packet = RNS.Packet(self.link, ResourceAdvertisement(self).pack(), context=RNS.Packet.RESOURCE_ADV)
self.advertisement_packet = RNS.Packet(self.link, data, context=RNS.Packet.RESOURCE_ADV)
while not self.link.ready_for_new_resource(): while not self.link.ready_for_new_resource():
self.status = Resource.QUEUED self.status = Resource.QUEUED
sleep(0.25) sleep(0.25)
@ -445,7 +450,8 @@ class Resource:
try: try:
RNS.log("No part requests received, retrying resource advertisement...", RNS.LOG_DEBUG) RNS.log("No part requests received, retrying resource advertisement...", RNS.LOG_DEBUG)
self.retries_left -= 1 self.retries_left -= 1
self.advertisement_packet.resend() self.advertisement_packet = RNS.Packet(self.link, ResourceAdvertisement(self).pack(), context=RNS.Packet.RESOURCE_ADV)
self.advertisement_packet.send()
self.last_activity = time.time() self.last_activity = time.time()
self.adv_sent = self.last_activity self.adv_sent = self.last_activity
sleep_time = 0.001 sleep_time = 0.001

View File

@ -66,6 +66,7 @@ class Transport:
PATH_REQUEST_TIMEOUT = 15 # Default timuout for client path requests in seconds PATH_REQUEST_TIMEOUT = 15 # Default timuout for client path requests in seconds
PATH_REQUEST_GRACE = 0.35 # Grace time before a path announcement is made, allows directly reachable peers to respond first PATH_REQUEST_GRACE = 0.35 # Grace time before a path announcement is made, allows directly reachable peers to respond first
PATH_REQUEST_RW = 2 # Path request random window PATH_REQUEST_RW = 2 # Path request random window
PATH_REQUEST_MI = 5 # Minimum interval in seconds for automated path requests
LINK_TIMEOUT = RNS.Link.STALE_TIME * 1.25 LINK_TIMEOUT = RNS.Link.STALE_TIME * 1.25
REVERSE_TIMEOUT = 30*60 # Reverse table entries are removed after 30 minutes REVERSE_TIMEOUT = 30*60 # Reverse table entries are removed after 30 minutes
@ -92,6 +93,7 @@ class Transport:
announce_handlers = [] # A table storing externally registered announce handlers announce_handlers = [] # A table storing externally registered announce handlers
tunnels = {} # A table storing tunnels to other transport instances tunnels = {} # A table storing tunnels to other transport instances
announce_rate_table = {} # A table for keeping track of announce rates announce_rate_table = {} # A table for keeping track of announce rates
path_requests = {} # A table for storing path request timestamps
discovery_path_requests = {} # A table for keeping track of path requests on behalf of other nodes discovery_path_requests = {} # A table for keeping track of path requests on behalf of other nodes
discovery_pr_tags = [] # A table for keeping track of tagged path requests discovery_pr_tags = [] # A table for keeping track of tagged path requests
@ -285,6 +287,7 @@ class Transport:
@staticmethod @staticmethod
def jobs(): def jobs():
outgoing = [] outgoing = []
path_requests = []
Transport.jobs_running = True Transport.jobs_running = True
try: try:
@ -380,8 +383,24 @@ class Transport:
stale_links = [] stale_links = []
for link_id in Transport.link_table: for link_id in Transport.link_table:
link_entry = Transport.link_table[link_id] link_entry = Transport.link_table[link_id]
if time.time() > link_entry[0] + Transport.LINK_TIMEOUT: if link_entry[7] == True:
stale_links.append(link_id) if time.time() > link_entry[0] + Transport.LINK_TIMEOUT:
stale_links.append(link_id)
else:
if time.time() > link_entry[8]:
stale_links.append(link_id)
last_path_request = 0
if link_entry[6] in Transport.path_requests:
last_path_request = Transport.path_requests[link_entry[6]]
# If this link request was originated from this instance
# or a local client, attempt to rediscover a path to the
# destination, if it has not already happened recently.
lr_taken_hops = link_entry[5]
if lr_taken_hops == 0 and time.time() - last_path_request > Transport.PATH_REQUEST_MI:
RNS.log("Trying to rediscover path for "+RNS.prettyhexrep(link_entry[6])+" since an attempted link was never established", RNS.LOG_DEBUG)
path_requests.append(link_entry[6])
# Cull the path table # Cull the path table
stale_paths = [] stale_paths = []
@ -513,6 +532,9 @@ class Transport:
for packet in outgoing: for packet in outgoing:
packet.send() packet.send()
for destination_hash in path_requests:
Transport.request_path(destination_hash)
@staticmethod @staticmethod
def transmit(interface, raw): def transmit(interface, raw):
try: try:
@ -998,15 +1020,19 @@ class Transport:
outbound_interface = Transport.destination_table[packet.destination_hash][5] outbound_interface = Transport.destination_table[packet.destination_hash][5]
if packet.packet_type == RNS.Packet.LINKREQUEST: if packet.packet_type == RNS.Packet.LINKREQUEST:
now = time.time()
proof_timeout = now + RNS.Link.ESTABLISHMENT_TIMEOUT_PER_HOP * max(1, remaining_hops)
# Entry format is # Entry format is
link_entry = [ time.time(), # 0: Timestamp, link_entry = [ now, # 0: Timestamp,
next_hop, # 1: Next-hop transport ID next_hop, # 1: Next-hop transport ID
outbound_interface, # 2: Next-hop interface outbound_interface, # 2: Next-hop interface
remaining_hops, # 3: Remaining hops remaining_hops, # 3: Remaining hops
packet.receiving_interface, # 4: Received on interface packet.receiving_interface, # 4: Received on interface
packet.hops, # 5: Taken hops packet.hops, # 5: Taken hops
packet.destination_hash, # 6: Original destination hash packet.destination_hash, # 6: Original destination hash
False] # 7: Validated False, # 7: Validated
proof_timeout] # 8: Proof timeout timestamp
Transport.link_table[packet.getTruncatedHash()] = link_entry Transport.link_table[packet.getTruncatedHash()] = link_entry
@ -1206,9 +1232,9 @@ class Transport:
retransmit_timeout = now + (RNS.rand() * Transport.PATHFINDER_RW) retransmit_timeout = now + (RNS.rand() * Transport.PATHFINDER_RW)
if packet.receiving_interface.mode == RNS.Interfaces.Interface.Interface.MODE_ACCESS_POINT: if hasattr(packet.receiving_interface, "mode") and packet.receiving_interface.mode == RNS.Interfaces.Interface.Interface.MODE_ACCESS_POINT:
expires = now + Transport.AP_PATH_TIME expires = now + Transport.AP_PATH_TIME
elif packet.receiving_interface.mode == RNS.Interfaces.Interface.Interface.MODE_ROAMING: elif hasattr(packet.receiving_interface, "mode") and packet.receiving_interface.mode == RNS.Interfaces.Interface.Interface.MODE_ROAMING:
expires = now + Transport.ROAMING_PATH_TIME expires = now + Transport.ROAMING_PATH_TIME
else: else:
expires = now + Transport.PATHFINDER_E expires = now + Transport.PATHFINDER_E
@ -1381,10 +1407,11 @@ class Transport:
# Handling for linkrequests to local destinations # Handling for linkrequests to local destinations
elif packet.packet_type == RNS.Packet.LINKREQUEST: elif packet.packet_type == RNS.Packet.LINKREQUEST:
for destination in Transport.destinations: if packet.transport_id == None or packet.transport_id == Transport.identity.hash:
if destination.hash == packet.destination_hash and destination.type == packet.destination_type: for destination in Transport.destinations:
packet.destination = destination if destination.hash == packet.destination_hash and destination.type == packet.destination_type:
destination.receive(packet) packet.destination = destination
destination.receive(packet)
# Handling for local data packets # Handling for local data packets
elif packet.packet_type == RNS.Packet.DATA: elif packet.packet_type == RNS.Packet.DATA:
@ -1418,14 +1445,29 @@ class Transport:
if (RNS.Reticulum.transport_enabled() or for_local_client_link or from_local_client) and packet.destination_hash in Transport.link_table: if (RNS.Reticulum.transport_enabled() or for_local_client_link or from_local_client) and packet.destination_hash in Transport.link_table:
link_entry = Transport.link_table[packet.destination_hash] link_entry = Transport.link_table[packet.destination_hash]
if packet.receiving_interface == link_entry[2]: if packet.receiving_interface == link_entry[2]:
# TODO: Should we validate the LR proof at each transport try:
# step before transporting it? if len(packet.data) == RNS.Identity.SIGLENGTH//8+RNS.Link.ECPUBSIZE//2:
# RNS.log("Link request proof received on correct interface, transporting it via "+str(link_entry[4]), RNS.LOG_EXTREME) peer_pub_bytes = packet.data[RNS.Identity.SIGLENGTH//8:RNS.Identity.SIGLENGTH//8+RNS.Link.ECPUBSIZE//2]
new_raw = packet.raw[0:1] peer_identity = RNS.Identity.recall(link_entry[6])
new_raw += struct.pack("!B", packet.hops) peer_sig_pub_bytes = peer_identity.get_public_key()[RNS.Link.ECPUBSIZE//2:RNS.Link.ECPUBSIZE]
new_raw += packet.raw[2:]
Transport.link_table[packet.destination_hash][7] = True signed_data = packet.destination_hash+peer_pub_bytes+peer_sig_pub_bytes
Transport.transmit(link_entry[4], new_raw) signature = packet.data[:RNS.Identity.SIGLENGTH//8]
if peer_identity.validate(signature, signed_data):
RNS.log("Link request proof validated for transport via "+str(link_entry[4]), RNS.LOG_EXTREME)
new_raw = packet.raw[0:1]
new_raw += struct.pack("!B", packet.hops)
new_raw += packet.raw[2:]
Transport.link_table[packet.destination_hash][7] = True
Transport.transmit(link_entry[4], new_raw)
else:
RNS.log("Invalid link request proof in transport for link "+RNS.prettyhexrep(packet.destination_hash)+", dropping proof.", RNS.LOG_DEBUG)
except Exception as e:
RNS.log("Error while transporting link request proof. The contained exception was: "+str(e), RNS.LOG_ERROR)
else: else:
RNS.log("Link request proof received on wrong interface, not transporting it.", RNS.LOG_DEBUG) RNS.log("Link request proof received on wrong interface, not transporting it.", RNS.LOG_DEBUG)
else: else:
@ -1829,6 +1871,7 @@ class Transport:
on_interface.announce_allowed_at = now + wait_time on_interface.announce_allowed_at = now + wait_time
packet.send() packet.send()
Transport.path_requests[destination_hash] = time.time()
@staticmethod @staticmethod
def path_request_handler(data, packet): def path_request_handler(data, packet):