Compare commits

..

9 Commits

Author SHA1 Message Date
Mark Qvist
b007530123 Adjusted resource timeout calculation 2024-01-14 01:06:43 +01:00
Mark Qvist
4066bba303 Merge branch 'master' of github.com:markqvist/Reticulum 2024-01-14 00:48:14 +01:00
Mark Qvist
8951517d01 Updated version 2024-01-14 00:47:45 +01:00
Mark Qvist
ae1d962b9b Fixed large resource transfers failing under some conditions 2024-01-14 00:46:55 +01:00
Mark Qvist
a2caa47334 Improved link tests 2024-01-14 00:12:30 +01:00
Mark Qvist
9f43da9105 Fixed rnprobe formatting issue 2024-01-13 16:37:48 +01:00
Mark Qvist
038c696db9 Fixed missing check on malformed advertisement packets 2024-01-13 16:36:11 +01:00
Mark Qvist
8fa6ec144c Updated readme 2024-01-03 12:05:30 +01:00
Mark Qvist
a8ccff7c55 Updated contribution guidelines 2024-01-03 12:00:10 +01:00
7 changed files with 100 additions and 55 deletions

View File

@ -12,11 +12,11 @@ First and foremost, there is one simple requirement for taking part in this comm
If you want to ask a question, **do not open an issue**. The issue tracker is used by people *working on Reticulum* to track bugs, issues and improvements.
Instead, ask away on the [discussions](https://github.com/markqvist/Reticulum/discussions) or on the [Reticulum Matrix channel](https://unsigned.io/contact.html#reticulum:matrix.org) at `#reticulum:matrix.org`
Instead, ask away on the [discussions](https://github.com/markqvist/Reticulum/discussions) or on the [Reticulum Matrix channel](https://matrix.to/#/#reticulum:matrix.org) at `#reticulum:matrix.org`
## Providing Feedback & Ideas
Likewise, feedback, ideas and feature requests are a very welcome way to contribute, and should also be posted on the [discussions](https://github.com/markqvist/Reticulum/discussions), or on the [Reticulum Matrix channel](https://unsigned.io/contact.html#reticulum:matrix.org) at `#reticulum:matrix.org`.
Likewise, feedback, ideas and feature requests are a very welcome way to contribute, and should also be posted on the [discussions](https://github.com/markqvist/Reticulum/discussions), or on the [Reticulum Matrix channel](https://matrix.to/#/#reticulum:matrix.org) at `#reticulum:matrix.org`.
Please do not post feature requests or general ideas on the issue tracker, or in direct messages to the primary developers. You are much more likely to get a response and start a constructive discussion by posting your ideas in the public channels created for these purposes.

View File

@ -192,7 +192,7 @@ provide a dynamic performance envelope from 250 bits per second, to 1 gigabit
per second on normal hardware.
Currently, the usable performance envelope is approximately 150 bits per second
to 20 megabits per second, with physical mediums faster than that not being
to 40 megabits per second, with physical mediums faster than that not being
saturated. Performance beyond the current level is intended for future
upgrades, but not highly prioritised at this point in time.

View File

@ -352,7 +352,7 @@ class Link:
packed_request = umsgpack.packb(unpacked_request)
if timeout == None:
timeout = self.rtt * self.traffic_timeout_factor + RNS.Resource.RESPONSE_MAX_GRACE_TIME/4.0
timeout = self.rtt * self.traffic_timeout_factor + RNS.Resource.RESPONSE_MAX_GRACE_TIME*1.125
if len(packed_request) <= Link.MDU:
request_packet = RNS.Packet(self, packed_request, RNS.Packet.DATA, context = RNS.Packet.REQUEST)
@ -687,7 +687,9 @@ class Link:
remove = pending_request
try:
pending_request.response_size = response_size
pending_request.response_transfer_size = response_transfer_size
if pending_request.response_transfer_size == None:
pending_request.response_transfer_size = 0
pending_request.response_transfer_size += response_transfer_size
pending_request.response_received(response_data)
except Exception as e:
RNS.log("Error occurred while handling response. The contained exception was: "+str(e), RNS.LOG_ERROR)
@ -835,10 +837,15 @@ class Link:
for pending_request in self.pending_requests:
if pending_request.request_id == request_id:
response_resource = RNS.Resource.accept(packet, callback=self.response_resource_concluded, progress_callback=pending_request.response_resource_progress, request_id = request_id)
pending_request.response_size = RNS.ResourceAdvertisement.read_size(packet)
pending_request.response_transfer_size = RNS.ResourceAdvertisement.read_transfer_size(packet)
pending_request.started_at = time.time()
pending_request.response_resource_progress(response_resource)
if response_resource != None:
if pending_request.response_size == None:
pending_request.response_size = RNS.ResourceAdvertisement.read_size(packet)
if pending_request.response_transfer_size == None:
pending_request.response_transfer_size = 0
pending_request.response_transfer_size += RNS.ResourceAdvertisement.read_transfer_size(packet)
if pending_request.started_at == None:
pending_request.started_at = time.time()
pending_request.response_resource_progress(response_resource)
elif self.resource_strategy == Link.ACCEPT_NONE:
pass
@ -1132,7 +1139,8 @@ class RequestReceipt():
def request_resource_concluded(self, resource):
if resource.status == RNS.Resource.COMPLETE:
RNS.log("Request "+RNS.prettyhexrep(self.request_id)+" successfully sent as resource.", RNS.LOG_DEBUG)
self.started_at = time.time()
if self.started_at == None:
self.started_at = time.time()
self.status = RequestReceipt.DELIVERED
self.__resource_response_timeout = time.time()+self.timeout
response_timeout_thread = threading.Thread(target=self.__response_timeout_job)
@ -1173,25 +1181,26 @@ class RequestReceipt():
def response_resource_progress(self, resource):
if not self.status == RequestReceipt.FAILED:
self.status = RequestReceipt.RECEIVING
if self.packet_receipt != None:
if self.packet_receipt.status != RNS.PacketReceipt.DELIVERED:
self.packet_receipt.status = RNS.PacketReceipt.DELIVERED
self.packet_receipt.proved = True
self.packet_receipt.concluded_at = time.time()
if self.packet_receipt.callbacks.delivery != None:
self.packet_receipt.callbacks.delivery(self.packet_receipt)
if resource != None:
if not self.status == RequestReceipt.FAILED:
self.status = RequestReceipt.RECEIVING
if self.packet_receipt != None:
if self.packet_receipt.status != RNS.PacketReceipt.DELIVERED:
self.packet_receipt.status = RNS.PacketReceipt.DELIVERED
self.packet_receipt.proved = True
self.packet_receipt.concluded_at = time.time()
if self.packet_receipt.callbacks.delivery != None:
self.packet_receipt.callbacks.delivery(self.packet_receipt)
self.progress = resource.get_progress()
self.progress = resource.get_progress()
if self.callbacks.progress != None:
try:
self.callbacks.progress(self)
except Exception as e:
RNS.log("Error while executing response progress callback from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR)
else:
resource.cancel()
if self.callbacks.progress != None:
try:
self.callbacks.progress(self)
except Exception as e:
RNS.log("Error while executing response progress callback from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR)
else:
resource.cancel()
def response_received(self, response):

View File

@ -25,6 +25,7 @@ import os
import bz2
import math
import time
import tempfile
import threading
from threading import Lock
from .vendor import umsgpack as umsgpack
@ -106,7 +107,8 @@ class Resource:
PART_TIMEOUT_FACTOR_AFTER_RTT = 2
MAX_RETRIES = 16
MAX_ADV_RETRIES = 4
SENDER_GRACE_TIME = 10
SENDER_GRACE_TIME = 10.0
PROCESSING_GRACE = 1.0
RETRY_GRACE_TIME = 0.25
PER_RETRY_DELAY = 0.5
@ -203,9 +205,19 @@ class Resource:
resource_data = None
self.assembly_lock = False
if data != None:
if not hasattr(data, "read") and len(data) > Resource.MAX_EFFICIENT_SIZE:
original_data = data
data_size = len(original_data)
data = tempfile.TemporaryFile()
data.write(original_data)
del original_data
if hasattr(data, "read"):
data_size = os.stat(data.name).st_size
self.total_size = data_size
if data_size == None:
data_size = os.stat(data.name).st_size
self.total_size = data_size
self.grand_total_parts = math.ceil(data_size/Resource.SDU)
if data_size <= Resource.MAX_EFFICIENT_SIZE:
@ -278,7 +290,7 @@ class Resource:
self.uncompressed_data = data
compression_began = time.time()
if (auto_compress and len(self.uncompressed_data) < Resource.AUTO_COMPRESS_MAX_SIZE):
if (auto_compress and len(self.uncompressed_data) <= Resource.AUTO_COMPRESS_MAX_SIZE):
RNS.log("Compressing resource data...", RNS.LOG_DEBUG)
self.compressed_data = bz2.compress(self.uncompressed_data)
RNS.log("Compression completed in "+str(round(time.time()-compression_began, 3))+" seconds", RNS.LOG_DEBUG)
@ -440,7 +452,7 @@ class Resource:
sleep_time = None
if self.status == Resource.ADVERTISED:
sleep_time = (self.adv_sent+self.timeout)-time.time()
sleep_time = (self.adv_sent+self.timeout+Resource.PROCESSING_GRACE)-time.time()
if sleep_time < 0:
if self.retries_left <= 0:
RNS.log("Resource transfer timeout after sending advertisement", RNS.LOG_DEBUG)
@ -456,7 +468,7 @@ class Resource:
self.adv_sent = self.last_activity
sleep_time = 0.001
except Exception as e:
RNS.log("Could not resend advertisement packet, cancelling resource", RNS.LOG_VERBOSE)
RNS.log("Could not resend advertisement packet, cancelling resource. The contained exception was: "+str(e), RNS.LOG_VERBOSE)
self.cancel()
@ -612,10 +624,24 @@ class Resource:
self.callback(self)
except Exception as e:
RNS.log("Error while executing resource concluded callback from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR)
finally:
if hasattr(self.input_file, "close") and callable(self.input_file.close):
try:
self.input_file.close()
except Exception as e:
RNS.log("Error while closing resource input file: "+str(e), RNS.LOG_ERROR)
else:
# Otherwise we'll recursively create the
# next segment of the resource
Resource(self.input_file, self.link, callback = self.callback, segment_index = self.segment_index+1, original_hash=self.original_hash, progress_callback = self.__progress_callback)
Resource(
self.input_file, self.link,
callback = self.callback,
segment_index = self.segment_index+1,
original_hash=self.original_hash,
progress_callback = self.__progress_callback,
request_id = self.request_id,
is_response = self.is_response,
)
else:
pass
else:
@ -732,20 +758,6 @@ class Resource:
search_start = pn
search_size = self.window
# TODO: Remove
# tpm = []
# tpi = 0
# try:
# for p in self.parts:
# if p == None:
# tpm.append(None)
# else:
# tpm.append(tpi)
# tpi+=1
# except Exception as e:
# print(str(e))
# RNS.log(f"Partmap: "+str(tpm))
for part in self.parts[search_start:search_start+search_size]:
if part == None:
part_hash = self.hashmap[pn]

View File

@ -132,7 +132,7 @@ def program_setup(configdir, destination_hexhash, size=None, full_name = None, v
i = (i+1)%len(syms)
if time.time() > _timeout:
print("\r \rProbe timed out")
print("\r \rProbe timed out")
else:
print("\b\b ")

View File

@ -1 +1 @@
__version__ = "0.6.9"
__version__ = "0.7.0"

View File

@ -189,6 +189,9 @@ class TestLink(unittest.TestCase):
id1 = RNS.Identity.from_bytes(bytes.fromhex(fixed_keys[0][0]))
self.assertEqual(id1.hash, bytes.fromhex(fixed_keys[0][1]))
RNS.Transport.request_path(bytes.fromhex("fb48da0e82e6e01ba0c014513f74540d"))
time.sleep(0.2)
dest = RNS.Destination(id1, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "link", "establish")
self.assertEqual(dest.hash, bytes.fromhex("fb48da0e82e6e01ba0c014513f74540d"))
@ -204,6 +207,7 @@ class TestLink(unittest.TestCase):
resource = RNS.Resource(data, l1, timeout=resource_timeout)
start = time.time()
# This is a hack, don't do it. Use the callbacks instead.
while resource.status < RNS.Resource.COMPLETE:
time.sleep(0.01)
@ -225,6 +229,9 @@ class TestLink(unittest.TestCase):
id1 = RNS.Identity.from_bytes(bytes.fromhex(fixed_keys[0][0]))
self.assertEqual(id1.hash, bytes.fromhex(fixed_keys[0][1]))
RNS.Transport.request_path(bytes.fromhex("fb48da0e82e6e01ba0c014513f74540d"))
time.sleep(0.2)
dest = RNS.Destination(id1, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "link", "establish")
self.assertEqual(dest.hash, bytes.fromhex("fb48da0e82e6e01ba0c014513f74540d"))
@ -240,6 +247,7 @@ class TestLink(unittest.TestCase):
resource = RNS.Resource(data, l1, timeout=resource_timeout)
start = time.time()
# This is a hack, don't do it. Use the callbacks instead.
while resource.status < RNS.Resource.COMPLETE:
time.sleep(0.01)
@ -261,6 +269,9 @@ class TestLink(unittest.TestCase):
id1 = RNS.Identity.from_bytes(bytes.fromhex(fixed_keys[0][0]))
self.assertEqual(id1.hash, bytes.fromhex(fixed_keys[0][1]))
RNS.Transport.request_path(bytes.fromhex("fb48da0e82e6e01ba0c014513f74540d"))
time.sleep(0.2)
dest = RNS.Destination(id1, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "link", "establish")
self.assertEqual(dest.hash, bytes.fromhex("fb48da0e82e6e01ba0c014513f74540d"))
@ -275,6 +286,7 @@ class TestLink(unittest.TestCase):
resource = RNS.Resource(data, l1, timeout=resource_timeout)
start = time.time()
# This is a hack, don't do it. Use the callbacks instead.
while resource.status < RNS.Resource.COMPLETE:
time.sleep(0.01)
@ -301,6 +313,9 @@ class TestLink(unittest.TestCase):
id1 = RNS.Identity.from_bytes(bytes.fromhex(fixed_keys[0][0]))
self.assertEqual(id1.hash, bytes.fromhex(fixed_keys[0][1]))
RNS.Transport.request_path(bytes.fromhex("fb48da0e82e6e01ba0c014513f74540d"))
time.sleep(0.2)
dest = RNS.Destination(id1, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "link", "establish")
self.assertEqual(dest.hash, bytes.fromhex("fb48da0e82e6e01ba0c014513f74540d"))
@ -315,6 +330,7 @@ class TestLink(unittest.TestCase):
resource = RNS.Resource(data, l1, timeout=resource_timeout)
start = time.time()
# This is a hack, don't do it. Use the callbacks instead.
while resource.status < RNS.Resource.COMPLETE:
time.sleep(0.01)
@ -326,6 +342,10 @@ class TestLink(unittest.TestCase):
time.sleep(0.5)
self.assertEqual(l1.status, RNS.Link.CLOSED)
large_resource_status = None
def lr_callback(self, resource):
TestLink.large_resource_status = resource.status
@skipIf(os.getenv('SKIP_NORMAL_TESTS') != None, "Skipping")
def test_9_large_resource(self):
if RNS.Cryptography.backend() == "internal":
@ -340,6 +360,9 @@ class TestLink(unittest.TestCase):
id1 = RNS.Identity.from_bytes(bytes.fromhex(fixed_keys[0][0]))
self.assertEqual(id1.hash, bytes.fromhex(fixed_keys[0][1]))
RNS.Transport.request_path(bytes.fromhex("fb48da0e82e6e01ba0c014513f74540d"))
time.sleep(0.2)
dest = RNS.Destination(id1, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "link", "establish")
self.assertEqual(dest.hash, bytes.fromhex("fb48da0e82e6e01ba0c014513f74540d"))
@ -348,17 +371,18 @@ class TestLink(unittest.TestCase):
self.assertEqual(l1.status, RNS.Link.ACTIVE)
resource_timeout = 120
resource_size = 35*1000*1000
resource_size = 50*1000*1000
data = os.urandom(resource_size)
print("Sending "+self.size_str(resource_size)+" resource...")
resource = RNS.Resource(data, l1, timeout=resource_timeout)
resource = RNS.Resource(data, l1, timeout=resource_timeout, callback=self.lr_callback)
start = time.time()
while resource.status < RNS.Resource.COMPLETE:
TestLink.large_resource_status = resource.status
while TestLink.large_resource_status < RNS.Resource.COMPLETE:
time.sleep(0.01)
t = time.time() - start
self.assertEqual(resource.status, RNS.Resource.COMPLETE)
self.assertEqual(TestLink.large_resource_status, RNS.Resource.COMPLETE)
print("Resource completed at "+self.size_str(resource_size/t, "b")+"ps")
l1.teardown()