Optimised resource transfer implementation

This commit is contained in:
Mark Qvist 2020-05-12 16:45:51 +02:00
parent bf49eb2475
commit c595fdd54b
6 changed files with 227 additions and 111 deletions

View File

@ -123,7 +123,10 @@ def client_request(message, packet):
# read it and pack it as a resource
RNS.log("Client requested \""+filename+"\"")
file = open(os.path.join(serve_path, filename), "rb")
file_resource = RNS.Resource(file.read(), packet.link, callback=resource_sending_concluded)
file_data = file.read()
file.close()
file_resource = RNS.Resource(file_data, packet.link, callback=resource_sending_concluded)
file_resource.filename = filename
except:
# If somethign went wrong, we close
@ -138,10 +141,15 @@ def client_request(message, packet):
# This function is called on the server when a
# resource transfer concludes.
def resource_sending_concluded(resource):
if hasattr(resource, "filename"):
name = resource.filename
else:
name = "resource"
if resource.status == RNS.Resource.COMPLETE:
RNS.log("Done sending \""+resource.filename+"\" to client")
RNS.log("Done sending \""+name+"\" to client")
elif resource.status == RNS.Resource.FAILED:
RNS.log("Sending \""+resource.filename+"\" to client failed")
RNS.log("Sending \""+name+"\" to client failed")
def list_delivered(receipt):
RNS.log("The file list was received by the client")
@ -297,8 +305,7 @@ def print_menu():
if time.time() > download_began+APP_TIMEOUT:
print("The download timed out")
time.sleep(1)
menu_mode = "main"
print_menu()
server_link.teardown()
if menu_mode == "downloading":
print("Download started")

View File

@ -118,7 +118,8 @@ class TCPClientInterface(Interface):
self.online = False
self.OUT = False
self.IN = False
RNS.Transport.interfaces.remove(self)
if self in RNS.Transport.interfaces:
RNS.Transport.interfaces.remove(self)
def __str__(self):

View File

@ -34,6 +34,7 @@ class Link:
# first-hop RTT latency and distance
DEFAULT_TIMEOUT = 15.0
TIMEOUT_FACTOR = 3
STALE_GRACE = 2
KEEPALIVE = 180
PENDING = 0x00
@ -201,7 +202,9 @@ class Link:
self.status = Link.ACTIVE
if self.callbacks.link_established != None:
self.callbacks.link_established(self)
thread = threading.Thread(target=self.callbacks.link_established, args=(self,))
thread.setDaemon(True)
thread.start()
else:
RNS.log("Invalid link proof signature received by "+str(self), RNS.LOG_VERBOSE)
# TODO: should we really do this, or just wait
@ -309,7 +312,7 @@ class Link:
elif self.status == Link.ACTIVE:
if time.time() >= self.last_inbound + self.keepalive:
sleep_time = self.rtt * self.timeout_factor
sleep_time = self.rtt * self.timeout_factor + Link.STALE_GRACE
self.status = Link.STALE
if self.initiator:
self.send_keepalive()
@ -326,8 +329,8 @@ class Link:
if sleep_time == 0:
RNS.log("Warning! Link watchdog sleep time of 0!", RNS.LOG_ERROR)
if sleep_time == None or sleep_time < 0:
RNS.log("Timing error! Closing Reticulum now.", RNS.LOG_CRITICAL)
RNS.panic()
RNS.log("Timing error! Tearing down link "+str(self)+" now.", RNS.LOG_ERROR)
self.teardown()
sleep(sleep_time)
@ -352,7 +355,9 @@ class Link:
if packet.context == RNS.Packet.NONE:
plaintext = self.decrypt(packet.data)
if self.callbacks.packet != None:
self.callbacks.packet(plaintext, packet)
thread = threading.Thread(target=self.callbacks.packet, args=(plaintext, packet))
thread.setDaemon(True)
thread.start()
if self.destination.proof_strategy == RNS.Destination.PROVE_ALL:
packet.prove()
@ -374,7 +379,9 @@ class Link:
pass
elif self.resource_strategy == Link.ACCEPT_APP:
if self.callbacks.resource != None:
self.callbacks.resource(packet)
thread = threading.Thread(target=self.callbacks.resource, args=(packet))
thread.setDaemon(True)
thread.start()
elif self.resource_strategy == Link.ACCEPT_ALL:
RNS.Resource.accept(packet, self.callbacks.resource_concluded)
@ -497,7 +504,7 @@ class Link:
if resource in self.outgoing_resources:
self.outgoing_resources.remove(resource)
else:
RNS.log("Attempt to cancel a non-existing incoming resource", RNS.LOG_ERROR)
RNS.log("Attempt to cancel a non-existing outgoing resource", RNS.LOG_ERROR)
def cancel_incoming_resource(self, resource):
if resource in self.incoming_resources:

View File

@ -189,8 +189,9 @@ class Packet:
if RNS.Transport.outbound(self):
return self.receipt
else:
# TODO: Don't raise error here, handle gracefully
raise IOError("Packet could not be sent! Do you have any outbound interfaces configured?")
# TODO: Decide whether this failure should simply
# return none, or raise an error
raise IOError("No interfaces could process the outbound packet")
else:
raise IOError("Packet was already sent")

View File

@ -7,12 +7,17 @@ from .vendor import umsgpack as umsgpack
from time import sleep
class Resource:
WINDOW_MIN = 1
WINDOW_MAX = 7
WINDOW = 4
MAPHASH_LEN = 4
SDU = RNS.Packet.MDU
RANDOM_HASH_SIZE = 4
WINDOW_FLEXIBILITY = 4
WINDOW_MIN = 1
WINDOW_MAX = 10
WINDOW = 4
MAPHASH_LEN = 4
SDU = RNS.Packet.MDU
RANDOM_HASH_SIZE = 4
# The maximum size to auto-compress with
# bz2 before sending.
AUTO_COMPRESS_MAX_SIZE = 32 * 1024 * 1024
# TODO: Should be allocated more
# intelligently
@ -58,11 +63,16 @@ class Resource:
resource.outstanding_parts = 0
resource.parts = [None] * resource.total_parts
resource.window = Resource.WINDOW
resource.window_max = Resource.WINDOW_MAX
resource.window_min = Resource.WINDOW_MIN
resource.window_flexibility = Resource.WINDOW_FLEXIBILITY
resource.last_activity = time.time()
resource.hashmap = [None] * resource.total_parts
resource.hashmap_height = 0
resource.waiting_for_hmu = False
resource.consecutive_completed_height = 0
resource.link.register_incoming_resource(resource)
@ -78,7 +88,7 @@ class Resource:
RNS.log("Could not decode resource advertisement, dropping resource", RNS.LOG_DEBUG)
return None
def __init__(self, data, link, advertise=True, auto_compress=True, callback=None, progress_callback=None):
def __init__(self, data, link, advertise=True, auto_compress=True, must_compress=False, callback=None, progress_callback=None):
self.status = Resource.NONE
self.link = link
self.max_retries = Resource.MAX_RETRIES
@ -92,69 +102,100 @@ class Resource:
self.__progress_callback = progress_callback
self.rtt = None
self.receiver_min_consecutive_height = 0
if data != None:
self.initiator = True
self.callback = callback
self.uncompressed_data = data
compression_began = time.time()
if must_compress or (auto_compress and len(self.uncompressed_data) < Resource.AUTO_COMPRESS_MAX_SIZE):
RNS.log("Compressing resource data...", RNS.LOG_DEBUG)
self.compressed_data = bz2.compress(self.uncompressed_data)
RNS.log("Compression completed in "+str(round(time.time()-compression_began, 3))+" seconds", RNS.LOG_DEBUG)
else:
self.compressed_data = self.uncompressed_data
self.uncompressed_size = len(self.uncompressed_data)
self.compressed_size = len(self.compressed_data)
if (self.compressed_size < self.uncompressed_size and auto_compress):
saved_bytes = len(self.uncompressed_data) - len(self.compressed_data)
RNS.log("Compression saved "+str(saved_bytes)+" bytes, sending compressed", RNS.LOG_DEBUG)
self.data = b""
self.data += RNS.Identity.getRandomHash()[:Resource.RANDOM_HASH_SIZE]
self.data += self.compressed_data
self.compressed = True
self.uncompressed_data = None
else:
self.data = b""
self.data += RNS.Identity.getRandomHash()[:Resource.RANDOM_HASH_SIZE]
self.data += self.uncompressed_data
self.uncompressed_data = self.data
self.compressed = False
self.compressed_data = None
if auto_compress:
RNS.log("Compression did not decrease size, sending uncompressed", RNS.LOG_DEBUG)
# Resources handle encryption directly to
# make optimal use of packet MTU on an entire
# encrypted stream. The Resource instance will
# use it's underlying link directly to encrypt.
if not self.link.encryption_disabled():
self.data = self.link.encrypt(self.data)
self.encrypted = True
else:
self.encrypted = False
self.size = len(self.data)
self.sent_parts = 0
hashmap_entries = int(math.ceil(self.size/float(Resource.SDU)))
hashmap_ok = False
while not hashmap_ok:
self.initiator = True
self.callback = callback
# TODO: Remove
#self.progress_callback = progress_callback
self.random_hash = RNS.Identity.getRandomHash()[:Resource.RANDOM_HASH_SIZE]
self.uncompressed_data = data
self.compressed_data = bz2.compress(self.uncompressed_data)
self.uncompressed_size = len(self.uncompressed_data)
self.compressed_size = len(self.compressed_data)
hashmap_computation_began = time.time()
RNS.log("Starting resource hashmap computation with "+str(hashmap_entries)+" entries...", RNS.LOG_DEBUG)
self.random_hash = RNS.Identity.getRandomHash()[:Resource.RANDOM_HASH_SIZE]
self.hash = RNS.Identity.fullHash(data+self.random_hash)
self.expected_proof = RNS.Identity.fullHash(data+self.hash)
if (self.compressed_size < self.uncompressed_size and auto_compress):
self.data = self.compressed_data
self.compressed = True
self.uncompressed_data = None
else:
self.data = self.uncompressed_data
self.compressed = False
self.compressed_data = None
if not self.link.encryption_disabled():
self.data = self.link.encrypt(self.data)
self.encrypted = True
else:
self.encrypted = False
self.size = len(self.data)
self.hashmap = b""
self.sent_parts = 0
self.parts = []
for i in range(0,int(math.ceil(self.size/float(Resource.SDU)))):
self.hashmap = b""
collision_guard_list = []
for i in range(0,hashmap_entries):
data = self.data[i*Resource.SDU:(i+1)*Resource.SDU]
part = RNS.Packet(link, data, context=RNS.Packet.RESOURCE)
part.pack()
part.map_hash = self.getMapHash(data)
self.hashmap += part.map_hash
self.parts.append(part)
map_hash = self.getMapHash(data)
hashmap_ok = self.checkHashMap()
if not hashmap_ok:
RNS.log("Found hash collision in resource map, remapping...", RNS.LOG_VERBOSE)
if map_hash in collision_guard_list:
RNS.log("Found hash collision in resource map, remapping...", RNS.LOG_VERBOSE)
hashmap_ok = False
break
else:
hashmap_ok = True
collision_guard_list.append(map_hash)
if len(collision_guard_list) > ResourceAdvertisement.COLLISION_GUARD_SIZE:
collision_guard_list.pop(0)
if advertise:
self.advertise()
part = RNS.Packet(link, data, context=RNS.Packet.RESOURCE)
part.pack()
part.map_hash = map_hash
self.hashmap += part.map_hash
self.parts.append(part)
RNS.log("Hashmap computation concluded in "+str(round(time.time()-hashmap_computation_began, 3))+" seconds", RNS.LOG_DEBUG)
if advertise:
self.advertise()
else:
pass
def checkHashMap(self):
checked_hashes = []
for part in self.parts:
if part.map_hash in checked_hashes:
return False
checked_hashes.append(part.map_hash)
return True
def hashmap_update_packet(self, plaintext):
if not self.status == Resource.FAILED:
self.last_activity = time.time()
@ -178,6 +219,10 @@ class Resource:
self.request_next()
def getMapHash(self, data):
# TODO: This will break if running unencrypted,
# uncompressed transfers on streams with long blocks
# of identical bytes. Doing so would be very silly
# anyways but maybe it should be handled gracefully.
return RNS.Identity.fullHash(data+self.random_hash)[:Resource.MAPHASH_LEN]
def advertise(self):
@ -192,12 +237,18 @@ class Resource:
self.status = Resource.QUEUED
sleep(0.25)
self.advertisement_packet.send()
self.last_activity = time.time()
self.adv_sent = self.last_activity
self.rtt = None
self.status = Resource.ADVERTISED
self.link.register_outgoing_resource(self)
try:
self.advertisement_packet.send()
self.last_activity = time.time()
self.adv_sent = self.last_activity
self.rtt = None
self.status = Resource.ADVERTISED
self.link.register_outgoing_resource(self)
RNS.log("Sent resource advertisement for "+RNS.prettyhexrep(self.hash), RNS.LOG_DEBUG)
except Exception as e:
RNS.log("Could not advertise resource, the contained exception was: "+str(e), RNS.LOG_ERROR)
self.cancel()
return
self.watchdog_job()
@ -224,12 +275,16 @@ class Resource:
self.cancel()
sleep_time = 0.001
else:
RNS.log("No part requests received, retrying resource advertisement...", RNS.LOG_DEBUG)
self.retries_left -= 1
self.advertisement_packet.resend()
self.last_activity = time.time()
self.adv_sent = self.last_activity
sleep_time = 0.001
try:
RNS.log("No part requests received, retrying resource advertisement...", RNS.LOG_DEBUG)
self.retries_left -= 1
self.advertisement_packet.resend()
self.last_activity = time.time()
self.adv_sent = self.last_activity
sleep_time = 0.001
except Exception as e:
RNS.log("Could not resend advertisement packet, cancelling resource", RNS.LOG_VERBOSE)
self.cancel()
elif self.status == Resource.TRANSFERRING:
@ -240,6 +295,13 @@ class Resource:
if sleep_time < 0:
if self.retries_left > 0:
RNS.log("Timeout waiting for parts, requesting retry", RNS.LOG_DEBUG)
if self.window > self.window_min:
self.window -= 1
if self.window_max > self.window_min:
self.window_max -= 1
if (self.window_max - self.window) > (self.window_flexibility-1):
self.window_max -= 1
sleep_time = 0.001
self.retries_left -= 1
self.waiting_for_hmu = False
@ -275,11 +337,11 @@ class Resource:
if sleep_time == 0:
RNS.log("Warning! Link watchdog sleep time of 0!", RNS.LOG_WARNING)
if sleep_time == None or sleep_time < 0:
# TODO: This should probably not be here forever
RNS.log("Timing error! Closing Reticulum now.", RNS.LOG_CRITICAL)
RNS.panic()
sleep(sleep_time)
RNS.log("Timing error, cancelling resource transfer.", RNS.LOG_ERROR)
self.cancel()
if sleep_time != None:
sleep(sleep_time)
def assemble(self):
if not self.status == Resource.FAILED:
@ -294,6 +356,9 @@ class Resource:
else:
data = stream
# Strip off random hash
data = data[Resource.RANDOM_HASH_SIZE:]
if self.compressed:
self.data = bz2.decompress(data)
else:
@ -307,6 +372,7 @@ class Resource:
else:
self.status = Resource.CORRUPT
except Exception as e:
RNS.log("Error while assembling received resource.", RNS.LOG_ERROR)
RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR)
@ -319,10 +385,15 @@ class Resource:
def prove(self):
if not self.status == Resource.FAILED:
proof = RNS.Identity.fullHash(self.data+self.hash)
proof_data = self.hash+proof
proof_packet = RNS.Packet(self.link, proof_data, packet_type=RNS.Packet.PROOF, context=RNS.Packet.RESOURCE_PRF)
proof_packet.send()
try:
proof = RNS.Identity.fullHash(self.data+self.hash)
proof_data = self.hash+proof
proof_packet = RNS.Packet(self.link, proof_data, packet_type=RNS.Packet.PROOF, context=RNS.Packet.RESOURCE_PRF)
proof_packet.send()
except Exception as e:
RNS.log("Could not send proof packet, cancelling resource", RNS.LOG_DEBUG)
RNS.log("The contained exception was: "+str(e), RNS.LOG_DEBUG)
self.cancel()
def validateProof(self, proof_data):
if not self.status == Resource.FAILED:
@ -356,13 +427,22 @@ class Resource:
part_data = packet.data
part_hash = self.getMapHash(part_data)
i = 0
for map_hash in self.hashmap:
i = self.consecutive_completed_height
for map_hash in self.hashmap[self.consecutive_completed_height:self.consecutive_completed_height+self.window]:
if map_hash == part_hash:
if self.parts[i] == None:
# Insert data into parts list
self.parts[i] = part_data
self.received_count += 1
self.outstanding_parts -= 1
# Update consecutive completed pointer
if i == 0 or self.parts[i-1] != None and i == self.consecutive_completed_height:
self.consecutive_completed_height = i+1
cp = i+1
while cp < len(self.parts) and self.parts[cp] != None:
self.consecutive_completed_height = cp
cp += 1
i += 1
if self.__progress_callback != None:
@ -371,8 +451,13 @@ class Resource:
if self.outstanding_parts == 0 and self.received_count == self.total_parts:
self.assemble()
elif self.outstanding_parts == 0:
if self.window < Resource.WINDOW_MAX:
# TODO: Figure out if ther is a mathematically
# optimal way to adjust windows
if self.window < self.window_max:
self.window += 1
if (self.window - self.window_min) > (self.window_flexibility-1):
self.window_min += 1
self.request_next()
# Called on incoming resource to send a request for more data
@ -383,8 +468,8 @@ class Resource:
hashmap_exhausted = Resource.HASHMAP_IS_NOT_EXHAUSTED
requested_hashes = b""
i = 0; pn = 0
for part in self.parts:
i = 0; pn = self.consecutive_completed_height
for part in self.parts[self.consecutive_completed_height:self.consecutive_completed_height+self.window]:
if part == None:
part_hash = self.hashmap[pn]
@ -409,10 +494,15 @@ class Resource:
request_data = hmu_part + self.hash + requested_hashes
request_packet = RNS.Packet(self.link, request_data, context = RNS.Packet.RESOURCE_REQ)
request_packet.send()
self.last_activity = time.time()
self.req_sent = self.last_activity
self.req_resp = None
try:
request_packet.send()
self.last_activity = time.time()
self.req_sent = self.last_activity
self.req_resp = None
except Exception as e:
RNS.log("Could not send resource request packet, cancelling resource", RNS.LOG_DEBUG)
RNS.log("The contained exception was: "+str(e), RNS.LOG_DEBUG)
self.cancel()
# Called on outgoing resource to make it send more data
def request(self, request_data):
@ -435,8 +525,9 @@ class Resource:
for i in range(0,len(requested_hashes)//Resource.MAPHASH_LEN):
requested_hash = requested_hashes[i*Resource.MAPHASH_LEN:(i+1)*Resource.MAPHASH_LEN]
pi = 0
for part in self.parts:
search_start = self.receiver_min_consecutive_height
search_end = self.receiver_min_consecutive_height+ResourceAdvertisement.COLLISION_GUARD_SIZE
for part in self.parts[search_start:search_end]:
if part.map_hash == requested_hash:
try:
if not part.sent:
@ -448,20 +539,21 @@ class Resource:
self.last_part_sent = self.last_activity
break
except Exception as e:
RNS.log("Resource could not send parts, cancelling transfer!", RNS.LOG_ERROR)
RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR)
RNS.log("Resource could not send parts, cancelling transfer!", RNS.LOG_DEBUG)
RNS.log("The contained exception was: "+str(e), RNS.LOG_DEBUG)
self.cancel()
pi += 1
if wants_more_hashmap:
last_map_hash = request_data[1:Resource.MAPHASH_LEN+1]
part_index = 0
for part in self.parts:
part_index = self.receiver_min_consecutive_height
for part in self.parts[self.receiver_min_consecutive_height:]:
part_index += 1
if part.map_hash == last_map_hash:
break
self.receiver_min_consecutive_height = max(part_index-1-Resource.WINDOW_MAX, 0)
if part_index % ResourceAdvertisement.HASHMAP_MAX_LEN != 0:
RNS.log("Resource sequencing error, cancelling transfer!", RNS.LOG_ERROR)
self.cancel()
@ -479,8 +571,13 @@ class Resource:
hmu = self.hash+umsgpack.packb([segment, hashmap])
hmu_packet = RNS.Packet(self.link, hmu, context = RNS.Packet.RESOURCE_HMU)
hmu_packet.send()
self.last_activity = time.time()
try:
hmu_packet.send()
self.last_activity = time.time()
except Exception as e:
RNS.log("Could not send resource HMU packet, cancelling resource", RNS.LOG_DEBUG)
RNS.log("The contained exception was: "+str(e), RNS.LOG_DEBUG)
self.cancel()
if self.sent_parts == len(self.parts):
self.status = Resource.AWAITING_PROOF
@ -522,7 +619,8 @@ class Resource:
class ResourceAdvertisement:
# TODO: Can this be allocated dynamically? Keep in mind hashmap_update inference
HASHMAP_MAX_LEN = 84
HASHMAP_MAX_LEN = 84
COLLISION_GUARD_SIZE = 2*Resource.WINDOW_MAX+HASHMAP_MAX_LEN
def __init__(self, resource=None):
if resource != None:

View File

@ -306,6 +306,8 @@ class Transport:
return True
if packet.context == RNS.Packet.RESOURCE_PRF:
return True
if packet.context == RNS.Packet.RESOURCE:
return True
if not packet.packet_hash in Transport.packet_hashlist:
return True
else: