Improved link and resource callbacks and resource handling.

This commit is contained in:
Mark Qvist 2021-08-27 19:52:48 +02:00
parent da13ee9cb9
commit a199e4c929
2 changed files with 54 additions and 15 deletions

View File

@ -30,6 +30,8 @@ class Link:
This class. This class.
:param destination: A :ref:`RNS.Destination<api-destination>` instance which to establish a link to. :param destination: A :ref:`RNS.Destination<api-destination>` instance which to establish a link to.
:param established_callback: A function or method with the signature *callback(link)* to be called when the link has been established.
:param closed_callback: A function or method with the signature *callback(link)* to be called when the link is closed.
:param owner: Internal use by :ref:`RNS.Transport<api-Transport>`, ignore this argument. :param owner: Internal use by :ref:`RNS.Transport<api-Transport>`, ignore this argument.
:param peer_pub_bytes: Internal use, ignore this argument. :param peer_pub_bytes: Internal use, ignore this argument.
:param peer_sig_pub_bytes: Internal use, ignore this argument. :param peer_sig_pub_bytes: Internal use, ignore this argument.
@ -47,7 +49,7 @@ class Link:
# TODO: This should not be hardcoded, # TODO: This should not be hardcoded,
# but calculated from something like # but calculated from something like
# first-hop RTT latency and distance # first-hop RTT latency and distance
DEFAULT_TIMEOUT = 15.0 DEFAULT_TIMEOUT = 60.0
""" """
Default timeout for link establishment in seconds. Default timeout for link establishment in seconds.
""" """
@ -102,7 +104,7 @@ class Link:
return None return None
def __init__(self, destination=None, owner=None, peer_pub_bytes = None, peer_sig_pub_bytes = None): def __init__(self, destination=None, established_callback = None, closed_callback = None, owner=None, peer_pub_bytes = None, peer_sig_pub_bytes = None):
if destination != None and destination.type != RNS.Destination.SINGLE: if destination != None and destination.type != RNS.Destination.SINGLE:
raise TypeError("Links can only be established to the \"single\" destination type") raise TypeError("Links can only be established to the \"single\" destination type")
self.rtt = None self.rtt = None
@ -158,6 +160,12 @@ class Link:
else: else:
self.load_peer(peer_pub_bytes, peer_sig_pub_bytes) self.load_peer(peer_pub_bytes, peer_sig_pub_bytes)
if established_callback != None:
self.set_link_established_callback(established_callback)
if closed_callback != None:
self.set_link_closed_callback(closed_callback)
if (self.initiator): if (self.initiator):
peer_pub_bytes = self.destination.identity.get_public_key()[:Link.ECPUBSIZE//2] peer_pub_bytes = self.destination.identity.get_public_key()[:Link.ECPUBSIZE//2]
peer_sig_pub_bytes = self.destination.identity.get_public_key()[Link.ECPUBSIZE//2:Link.ECPUBSIZE] peer_sig_pub_bytes = self.destination.identity.get_public_key()[Link.ECPUBSIZE//2:Link.ECPUBSIZE]
@ -266,13 +274,14 @@ class Link:
self.had_outbound() self.had_outbound()
def request(self, path, data = None, response_callback = None, failed_callback = None): def request(self, path, data = None, response_callback = None, failed_callback = None, timeout = None):
""" """
Sends a request to the remote peer. Sends a request to the remote peer.
:param path: The request path. :param path: The request path.
:param response_callback: A function or method with the signature *response_callback(request_receipt)* to be called when a response is received. See the :ref:`Request Example<example-request>` for more info. :param response_callback: An optional function or method with the signature *response_callback(request_receipt)* to be called when a response is received. See the :ref:`Request Example<example-request>` for more info.
:param failed_callback: A function or method with the signature *failed_callback(request_receipt)* to be called when a request fails. See the :ref:`Request Example<example-request>` for more info. :param failed_callback: An optional function or method with the signature *failed_callback(request_receipt)* to be called when a request fails. See the :ref:`Request Example<example-request>` for more info.
:param timeout: An optional timeout in seconds for the request. If *None* is supplied, this defaults to ``RNS.Packet.TIMEOUT``.
""" """
request_path_hash = RNS.Identity.truncated_hash(path.encode("utf-8")) request_path_hash = RNS.Identity.truncated_hash(path.encode("utf-8"))
unpacked_request = [time.time(), request_path_hash, data] unpacked_request = [time.time(), request_path_hash, data]
@ -280,24 +289,30 @@ class Link:
if len(packed_request) <= Link.MDU: if len(packed_request) <= Link.MDU:
request_packet = RNS.Packet(self, packed_request, RNS.Packet.DATA, context = RNS.Packet.REQUEST) request_packet = RNS.Packet(self, packed_request, RNS.Packet.DATA, context = RNS.Packet.REQUEST)
packet_receipt = request_packet.send()
if timeout != None:
packet_receipt.set_timeout(timeout)
return RequestReceipt( return RequestReceipt(
self, self,
packet_receipt = request_packet.send(), packet_receipt = packet_receipt,
response_callback = response_callback, response_callback = response_callback,
failed_callback = failed_callback failed_callback = failed_callback,
timeout = timeout
) )
else: else:
request_id = RNS.Identity.truncated_hash(packed_request) request_id = RNS.Identity.truncated_hash(packed_request)
RNS.log("Sending request "+RNS.prettyhexrep(request_id)+" as resource.", RNS.LOG_DEBUG) RNS.log("Sending request "+RNS.prettyhexrep(request_id)+" as resource.", RNS.LOG_DEBUG)
request_resource = RNS.Resource(packed_request, self, request_id = request_id, is_response = False) request_resource = RNS.Resource(packed_request, self, request_id = request_id, is_response = False, timeout = timeout)
return RequestReceipt( return RequestReceipt(
self, self,
resource = request_resource, resource = request_resource,
response_callback = response_callback, response_callback = response_callback,
failed_callback = failed_callback failed_callback = failed_callback,
timeout = timeout
) )
@ -831,7 +846,7 @@ class RequestReceipt():
DELIVERED = 0x02 DELIVERED = 0x02
READY = 0x03 READY = 0x03
def __init__(self, link, packet_receipt = None, resource = None, response_callback = None, failed_callback = None): def __init__(self, link, packet_receipt = None, resource = None, response_callback = None, failed_callback = None, timeout = None):
self.packet_receipt = packet_receipt self.packet_receipt = packet_receipt
self.resource = resource self.resource = resource
@ -849,18 +864,28 @@ class RequestReceipt():
self.response = None self.response = None
self.status = RequestReceipt.SENT self.status = RequestReceipt.SENT
self.sent_at = time.time() self.sent_at = time.time()
self.timeout = RNS.Packet.TIMEOUT
self.concluded_at = None self.concluded_at = None
if timeout != None:
self.timeout = timeout
else:
self.timeout = RNS.Packet.TIMEOUT
self.callbacks = RequestReceiptCallbacks() self.callbacks = RequestReceiptCallbacks()
self.callbacks.response = response_callback self.callbacks.response = response_callback
self.callbacks.failed = failed_callback self.callbacks.failed = failed_callback
self.link.pending_requests.append(self) self.link.pending_requests.append(self)
def request_resource_concluded(self, resource): def request_resource_concluded(self, resource):
if resource.status == RNS.Resource.COMPLETE: if resource.status == RNS.Resource.COMPLETE:
RNS.log("Request "+RNS.prettyhexrep(self.request_id)+" successfully sent as resource.", RNS.LOG_DEBUG) RNS.log("Request "+RNS.prettyhexrep(self.request_id)+" successfully sent as resource.", RNS.LOG_DEBUG)
self.status = RequestReceipt.DELIVERED
self.__resource_response_timeout = time.time()+self.timeout
load_thread = threading.Thread(target=self.__resource_response_timeout_job)
load_thread.setDaemon(True)
load_thread.start()
else: else:
RNS.log("Sending request "+RNS.prettyhexrep(self.request_id)+" as resource failed with status: "+RNS.hexrep([resource.status]), RNS.LOG_DEBUG) RNS.log("Sending request "+RNS.prettyhexrep(self.request_id)+" as resource failed with status: "+RNS.hexrep([resource.status]), RNS.LOG_DEBUG)
self.status = RequestReceipt.FAILED self.status = RequestReceipt.FAILED
@ -870,6 +895,7 @@ class RequestReceipt():
if self.callbacks.failed != None: if self.callbacks.failed != None:
self.callbacks.failed(self) self.callbacks.failed(self)
def request_timed_out(self, packet_receipt): def request_timed_out(self, packet_receipt):
self.status = RequestReceipt.FAILED self.status = RequestReceipt.FAILED
self.concluded_at = time.time() self.concluded_at = time.time()
@ -878,8 +904,18 @@ class RequestReceipt():
if self.callbacks.failed != None: if self.callbacks.failed != None:
self.callbacks.failed(self) self.callbacks.failed(self)
def __resource_response_timeout_job(self):
while self.status == RequestReceipt.DELIVERED:
if time.time() > self.__resource_response_timeout:
self.request_timed_out(None)
time.sleep(0.1)
def response_received(self, response): def response_received(self, response):
self.response = response self.response = response
self.status = RequestReceipt.READY
if self.packet_receipt != None: if self.packet_receipt != None:
self.packet_receipt.status = RNS.PacketReceipt.DELIVERED self.packet_receipt.status = RNS.PacketReceipt.DELIVERED

View File

@ -52,7 +52,6 @@ class Resource:
# TODO: Should be allocated more # TODO: Should be allocated more
# intelligently # intelligently
# TODO: Set higher
MAX_RETRIES = 5 MAX_RETRIES = 5
SENDER_GRACE_TIME = 10 SENDER_GRACE_TIME = 10
RETRY_GRACE_TIME = 0.25 RETRY_GRACE_TIME = 0.25
@ -136,7 +135,7 @@ class Resource:
# Create a resource for transmission to a remote destination # Create a resource for transmission to a remote destination
# The data passed can be either a bytes-array or a file opened # The data passed can be either a bytes-array or a file opened
# in binary read mode. # in binary read mode.
def __init__(self, data, link, advertise=True, auto_compress=True, callback=None, progress_callback=None, segment_index = 1, original_hash = None, request_id = None, is_response = False): def __init__(self, data, link, advertise=True, auto_compress=True, callback=None, progress_callback=None, timeout = None, segment_index = 1, original_hash = None, request_id = None, is_response = False):
data_size = None data_size = None
resource_data = None resource_data = None
if hasattr(data, "read"): if hasattr(data, "read"):
@ -183,7 +182,6 @@ class Resource:
self.link = link self.link = link
self.max_retries = Resource.MAX_RETRIES self.max_retries = Resource.MAX_RETRIES
self.retries_left = self.max_retries self.retries_left = self.max_retries
self.default_timeout = self.link.default_timeout
self.timeout_factor = self.link.timeout_factor self.timeout_factor = self.link.timeout_factor
self.sender_grace_time = Resource.SENDER_GRACE_TIME self.sender_grace_time = Resource.SENDER_GRACE_TIME
self.hmu_retry_ok = False self.hmu_retry_ok = False
@ -196,6 +194,11 @@ class Resource:
self.receiver_min_consecutive_height = 0 self.receiver_min_consecutive_height = 0
if timeout != None:
self.timeout = timeout
else:
self.timeout = self.link.default_timeout
if data != None: if data != None:
self.initiator = True self.initiator = True
self.callback = callback self.callback = callback
@ -370,7 +373,7 @@ class Resource:
sleep_time = None sleep_time = None
if self.status == Resource.ADVERTISED: if self.status == Resource.ADVERTISED:
sleep_time = (self.adv_sent+self.default_timeout)-time.time() sleep_time = (self.adv_sent+self.timeout)-time.time()
if sleep_time < 0: if sleep_time < 0:
if self.retries_left <= 0: if self.retries_left <= 0:
RNS.log("Resource transfer timeout after sending advertisement", RNS.LOG_DEBUG) RNS.log("Resource transfer timeout after sending advertisement", RNS.LOG_DEBUG)