mirror of
https://github.com/markqvist/Reticulum.git
synced 2024-11-22 21:50:18 +00:00
Work on bundles
This commit is contained in:
parent
90f2a84243
commit
4a3ee622ec
144
RNS/Bundle.py
144
RNS/Bundle.py
@ -5,30 +5,41 @@ import os.path
|
|||||||
from .vendor import umsgpack as umsgpack
|
from .vendor import umsgpack as umsgpack
|
||||||
|
|
||||||
class Bundle:
|
class Bundle:
|
||||||
|
APP_NAME = "rnsbundle"
|
||||||
|
|
||||||
NO_CUSTODY = 0x00;
|
NO_CUSTODY = 0x00;
|
||||||
TAKING_CUSTODY = 0x01;
|
TAKING_CUSTODY = 0x01;
|
||||||
FULL_CUSTODY = 0x02;
|
FULL_CUSTODY = 0x02;
|
||||||
|
REMOVED = 0xFF;
|
||||||
|
|
||||||
CHUNK_SIZE = RNS.Resource.MAX_EFFICIENT_SIZE / 4
|
CHUNK_SIZE = RNS.Resource.MAX_EFFICIENT_SIZE / 4
|
||||||
|
|
||||||
def __init__(self, destination = None, data = None, filepath = None, advertised_id = None):
|
def __init__(self, destination_hash = None, data = None, filepath = None, advertisement_data = None):
|
||||||
self.destination = destination
|
self.destination_hash = None
|
||||||
|
self.is_originator = False
|
||||||
self.state = None
|
self.state = None
|
||||||
self.data_file = None
|
self.data_file = None
|
||||||
self.meta_file = None
|
self.meta_file = None
|
||||||
|
self.data_hash = None
|
||||||
self.id = None
|
self.id = None
|
||||||
self.storagepath = None
|
self.storagepath = None
|
||||||
self.size = None
|
self.size = None
|
||||||
self.chunks = 0
|
self.chunks = 0
|
||||||
self.heartbeat = time.time()
|
self.created = time.time()
|
||||||
self.resources = []
|
self.heartbeat = created
|
||||||
|
self.transferring = False
|
||||||
|
|
||||||
|
self.chunk_request_destination = None
|
||||||
|
|
||||||
try:
|
try:
|
||||||
if data != None or filepath != None:
|
if data != None or filepath != None:
|
||||||
|
self.destination_hash = destination_hash
|
||||||
|
self.is_originator = True
|
||||||
|
self.id = RNS.Identity.getRandomHash()
|
||||||
|
|
||||||
if filepath == None and data != None:
|
if filepath == None and data != None:
|
||||||
try:
|
try:
|
||||||
self.id = RNS.Identity.fullHash(data)
|
self.data_hash = RNS.Identity.fullHash(data)
|
||||||
self.storagepath = Reticulum.bundlepath+"/"+self.id.hex()
|
self.storagepath = Reticulum.bundlepath+"/"+self.id.hex()
|
||||||
self.datapath = self.storagepath+"/data"
|
self.datapath = self.storagepath+"/data"
|
||||||
self.metadatapath = self.storagepath+"/metadata"
|
self.metadatapath = self.storagepath+"/metadata"
|
||||||
@ -50,7 +61,7 @@ class Bundle:
|
|||||||
elif data == None and filepath != None:
|
elif data == None and filepath != None:
|
||||||
try:
|
try:
|
||||||
input_file = open(filepath, "rb")
|
input_file = open(filepath, "rb")
|
||||||
self.id = RNS.Identity.fullHash(input_file.read())
|
self.data_hash = RNS.Identity.fullHash(input_file.read())
|
||||||
input_file.seek(0)
|
input_file.seek(0)
|
||||||
|
|
||||||
self.storagepath = RNS.Reticulum.bundlepath+"/"+self.id.hex()
|
self.storagepath = RNS.Reticulum.bundlepath+"/"+self.id.hex()
|
||||||
@ -76,42 +87,141 @@ class Bundle:
|
|||||||
else:
|
else:
|
||||||
raise ValueError("Bundle cannot be created from data and file path at the same time")
|
raise ValueError("Bundle cannot be created from data and file path at the same time")
|
||||||
|
|
||||||
elif advertised_id != None:
|
|
||||||
# Incoming bundle transfer
|
|
||||||
self.state = Bundle.TAKING_CUSTODY
|
|
||||||
else:
|
|
||||||
raise ValueError("No source of data specified for bundle initialisation")
|
|
||||||
|
|
||||||
# Prepare file handles and metadata
|
# Prepare file handles and metadata
|
||||||
self.size = os.stat(self.datapath).st_size
|
self.size = os.stat(self.datapath).st_size
|
||||||
if self.size < 1:
|
if self.size < 1:
|
||||||
raise IOError("Bundle data is empty")
|
raise IOError("Bundle data is empty")
|
||||||
|
self.data_file = open(self.datapath, "rb")
|
||||||
|
|
||||||
|
elif advertisement_data != None:
|
||||||
|
# Incoming bundle transfer
|
||||||
|
self.id = advertisement_data[1]
|
||||||
|
self.destination_hash = advertisement_data[0]
|
||||||
|
self.state = Bundle.TAKING_CUSTODY
|
||||||
|
|
||||||
|
self.storagepath = Reticulum.bundlepath+"/"+self.id.hex()
|
||||||
|
self.datapath = self.storagepath+"/data"
|
||||||
|
self.metadatapath = self.storagepath+"/metadata"
|
||||||
|
|
||||||
|
if not os.path.isdir(self.storagepath):
|
||||||
|
os.makedirs(self.storagepath)
|
||||||
|
else:
|
||||||
|
RNS.log("Warning, bundle already exists in storage location, recreating", RNS.LOG_DEBUG)
|
||||||
|
|
||||||
|
self.data_file = open(self.datapath, "wb")
|
||||||
|
self.data_file.close()
|
||||||
|
|
||||||
|
self.size = advertisement_data[2]
|
||||||
|
self.data_file = open(self.datapath, "wb")
|
||||||
|
|
||||||
|
else:
|
||||||
|
raise ValueError("No source of data specified for bundle initialisation")
|
||||||
|
|
||||||
self.chunks = ((self.size-1)//Bundle.CHUNK_SIZE)+1
|
self.chunks = ((self.size-1)//Bundle.CHUNK_SIZE)+1
|
||||||
self.data_file = open(self.datapath, "rb")
|
|
||||||
self.flush_metadata()
|
self.flush_metadata()
|
||||||
|
|
||||||
|
RNS.Transport.register_bundle(self)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
RNS.log("Error while initialising bundle. The contained exception was:", RNS.LOG_ERROR)
|
RNS.log("Error while initialising bundle. The contained exception was:", RNS.LOG_ERROR)
|
||||||
RNS.log(str(e), RNS.LOG_ERROR)
|
RNS.log(str(e), RNS.LOG_ERROR)
|
||||||
|
# TODO: Remove
|
||||||
raise e
|
raise e
|
||||||
|
|
||||||
def flush_metadata(self):
|
def get_packed_metadata(self):
|
||||||
try:
|
|
||||||
metadata = {
|
metadata = {
|
||||||
"destination": self.destination,
|
"destination": self.destination,
|
||||||
"heartbeat": self.heartbeat,
|
"heartbeat": self.heartbeat,
|
||||||
"size": self.size,
|
"size": self.size,
|
||||||
"chunks": self.chunks,
|
"is_originator": self.is_originator
|
||||||
"state": self.state}
|
"state": self.state}
|
||||||
|
|
||||||
|
return umsgpack.packb(metadata)
|
||||||
|
|
||||||
|
def flush_metadata(self):
|
||||||
|
try:
|
||||||
self.meta_file = open(self.metadatapath, "wb")
|
self.meta_file = open(self.metadatapath, "wb")
|
||||||
self.meta_file.write(umsgpack.packb(metadata))
|
self.meta_file.write(self.get_packed_metadata())
|
||||||
self.meta_file.close()
|
self.meta_file.close()
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
RNS.log("Error while flushing metadata for bundle "+RNS.prettyhexrep(self.id), RNS.LOG_ERROR)
|
RNS.log("Error while flushing metadata for bundle "+RNS.prettyhexrep(self.id), RNS.LOG_ERROR)
|
||||||
RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR)
|
RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR)
|
||||||
|
|
||||||
|
def register_destinations(self, destination):
|
||||||
|
self.chunk_request_destination = RNS.Destination(None, RNS.Destination.IN, RNS.Destination.SINGLE, APP_NAME, "chunk", "request")
|
||||||
|
self.chunk_request_destination.link_established_callback(requester_connected)
|
||||||
|
|
||||||
|
def advertise(self, advertise_to):
|
||||||
|
advertisement = [
|
||||||
|
self.destination,
|
||||||
|
self.id,
|
||||||
|
self.size,
|
||||||
|
self.chunks]
|
||||||
|
|
||||||
|
advertisement_data = umsgpack.packb(advertisement)
|
||||||
|
advertisement_packet = RNS.Packet(advertise_to, advertisement_data)
|
||||||
|
advertisement.packet.send()
|
||||||
|
|
||||||
|
def requester_connected(self, link):
|
||||||
|
RNS.log("Requester connected to bundle "+RNS.prettyhexrep(self.id), RNS.LOG_DEBUG)
|
||||||
|
link.packet_callback(chunk_request)
|
||||||
|
|
||||||
|
def chunk_request(self, data, packet):
|
||||||
|
chunk_index = data[0]
|
||||||
|
RNS.log("Request for chunk "+str(chunk_index)+"/"+str(self.chunks)+" of bundle "+RNS.prettyhexrep(self.id), RNS.LOG_DEBUG)
|
||||||
|
if chunk_index < self.chunks:
|
||||||
|
self.emit_resource(packet.link, chunk_index)
|
||||||
|
else:
|
||||||
|
RNS.log("Bundle transfer client requested chunk index out of range, tearing down link.", RNS.LOG_ERROR)
|
||||||
|
packet.link.teardown()
|
||||||
|
|
||||||
|
def emit_resource(self, link, chunk_index):
|
||||||
|
if not self.transferring:
|
||||||
|
chunk_max = self.size-1
|
||||||
|
chunk_start = chunk_index*CHUNK_SIZE
|
||||||
|
chunk_end = (chunk_index+1)*CHUNK_SIZE-1
|
||||||
|
if chunk_end > chunk_max:
|
||||||
|
chunk_end = chunk_max
|
||||||
|
read_size = chunk_end - chunk_start
|
||||||
|
|
||||||
|
try:
|
||||||
|
file = open(self.datapath, "rb")
|
||||||
|
file.seek(chunk_start)
|
||||||
|
data = file.read(read_size)
|
||||||
|
chunk_resource = RNS.Resource(data, link, callback=resource_concluded)
|
||||||
|
chunk_resource.chunk_index = chunk_index
|
||||||
|
except Exception as e:
|
||||||
|
RNS.log("Could not read bundle data from storage, the contained exception was:", RNS.LOG_ERROR)
|
||||||
|
RNS.log(str(e))
|
||||||
|
link.teardown()
|
||||||
|
else:
|
||||||
|
RNS.log("Bundle chunk "+str(chunk_index)+" for "+RNS.prettyhexrep(self.id)+" was requested while a transfer was already in progress", RNS.LOG_ERROR)
|
||||||
|
|
||||||
|
def resource_concluded(self, resource):
|
||||||
|
RNS.log("Concluded transferring chunk "+str(resource.chunk_index)+"/"+str(self.chunks)+" of bundle "+RNS.prettyhexrep(self.id), RNS.LOG_DEBUG)
|
||||||
|
self.transferring = False
|
||||||
|
|
||||||
|
def resign_custody(self):
|
||||||
|
self.state = Bundle.NO_CUSTODY
|
||||||
|
self.heartbeat = time.time()
|
||||||
|
|
||||||
|
def custody_proof(self, proof):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def remove(self):
|
||||||
|
try:
|
||||||
|
self.state = Bundle.REMOVED
|
||||||
|
RNS.Transport.deregister_destination(self.chunk_request_destination)
|
||||||
|
os.unlink(self.datapath)
|
||||||
|
os.unlink(self.metadatapath)
|
||||||
|
os.rmdir(self.storagepath)
|
||||||
|
except Exception as e:
|
||||||
|
RNS.log("Error while removing bundle from storage, the contained exception was:", RNS.LOG_ERROR)
|
||||||
|
RNS.log(str(e), RNS.LOG_ERROR)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class BundleAdvertisement:
|
class BundleAdvertisement:
|
||||||
pass
|
pass
|
@ -43,6 +43,9 @@ class Transport:
|
|||||||
DESTINATION_TIMEOUT = 60*60*24*7 # Destination table entries are removed if unused for one week
|
DESTINATION_TIMEOUT = 60*60*24*7 # Destination table entries are removed if unused for one week
|
||||||
MAX_RECEIPTS = 1024 # Maximum number of receipts to keep track of
|
MAX_RECEIPTS = 1024 # Maximum number of receipts to keep track of
|
||||||
|
|
||||||
|
BUNDLE_TIMEOUT = 60*60*24*7 # Bundles time out after 7 days
|
||||||
|
BUNDLE_INTERVAL = 180 # How often we should attempt to transfer bundles to their next hop
|
||||||
|
|
||||||
interfaces = [] # All active interfaces
|
interfaces = [] # All active interfaces
|
||||||
destinations = [] # All active destinations
|
destinations = [] # All active destinations
|
||||||
pending_links = [] # Links that are being established
|
pending_links = [] # Links that are being established
|
||||||
@ -55,6 +58,7 @@ class Transport:
|
|||||||
destination_table = {} # A lookup table containing the next hop to a given destination
|
destination_table = {} # A lookup table containing the next hop to a given destination
|
||||||
reverse_table = {} # A lookup table for storing packet hashes used to return proofs and replies
|
reverse_table = {} # A lookup table for storing packet hashes used to return proofs and replies
|
||||||
link_table = {} # A lookup table containing hops for links
|
link_table = {} # A lookup table containing hops for links
|
||||||
|
bundle_table = {} # A table for holding references to bundles in transport
|
||||||
held_announces = {} # A table containing temporarily held announce-table entries
|
held_announces = {} # A table containing temporarily held announce-table entries
|
||||||
|
|
||||||
# Transport control destinations are used
|
# Transport control destinations are used
|
||||||
@ -107,10 +111,17 @@ class Transport:
|
|||||||
|
|
||||||
# Create transport-specific destinations
|
# Create transport-specific destinations
|
||||||
Transport.path_request_destination = RNS.Destination(None, RNS.Destination.IN, RNS.Destination.PLAIN, Transport.APP_NAME, "path", "request")
|
Transport.path_request_destination = RNS.Destination(None, RNS.Destination.IN, RNS.Destination.PLAIN, Transport.APP_NAME, "path", "request")
|
||||||
Transport.path_request_destination.packet_callback(Transport.pathRequestHandler)
|
Transport.path_request_destination.packet_callback(Transport.path_request_handler)
|
||||||
|
|
||||||
|
Transport.bundle_advertisement_destination = RNS.Destination(None, RNS.Destination.IN, RNS.Destination.PLAIN, Transport.APP_NAME, "bundle", "advertisement", )
|
||||||
|
Transport.bundle_advertisement_destination.packet_callback(Transport.bundle_advertisement_handler)
|
||||||
|
|
||||||
Transport.control_destinations.append(Transport.path_request_destination)
|
Transport.control_destinations.append(Transport.path_request_destination)
|
||||||
Transport.control_hashes.append(Transport.path_request_destination.hash)
|
Transport.control_hashes.append(Transport.path_request_destination.hash)
|
||||||
|
|
||||||
|
Transport.control_destinations.append(Transport.bundle_advertisement_destination)
|
||||||
|
Transport.control_hashes.append(Transport.bundle_advertisement_destination.hash)
|
||||||
|
|
||||||
thread = threading.Thread(target=Transport.jobloop)
|
thread = threading.Thread(target=Transport.jobloop)
|
||||||
thread.setDaemon(True)
|
thread.setDaemon(True)
|
||||||
thread.start()
|
thread.start()
|
||||||
@ -300,6 +311,8 @@ class Transport:
|
|||||||
|
|
||||||
Transport.tables_last_culled = time.time()
|
Transport.tables_last_culled = time.time()
|
||||||
|
|
||||||
|
Transport.bundle_jobs()
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
RNS.log("An exception occurred while running Transport jobs.", RNS.LOG_ERROR)
|
RNS.log("An exception occurred while running Transport jobs.", RNS.LOG_ERROR)
|
||||||
RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR)
|
RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR)
|
||||||
@ -310,6 +323,35 @@ class Transport:
|
|||||||
for packet in outgoing:
|
for packet in outgoing:
|
||||||
packet.send()
|
packet.send()
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def bundle_jobs():
|
||||||
|
removed_bundles = []
|
||||||
|
for bundle in Transport.bundle_table:
|
||||||
|
# The bundle could not be passed on within the allowed
|
||||||
|
# time, and should be removed from storage
|
||||||
|
if bundle.heartbeat+Transport.BUNDLE_TIMEOUT < time.time():
|
||||||
|
RNS.log("Removing stale bundle "+RNS.prettyhexrep(bundle.id)+" from storage", RNS.LOG_VERBOSE)
|
||||||
|
removed_bundles.append(bundle)
|
||||||
|
bundle.remove()
|
||||||
|
|
||||||
|
# Custody was transferred to another node, we'll remove the bundle
|
||||||
|
if bundle.state == RNS.Bundle.NO_CUSTODY:
|
||||||
|
RNS.log("Removing bundle "+RNS.prettyhexrep(bundle.id)+" from storage since custody was transferred", RNS.LOG_VERBOSE)
|
||||||
|
removed_bundles.append(bundle)
|
||||||
|
bundle.remove()
|
||||||
|
|
||||||
|
# This is an incoming bundle, attempt to retrieve it
|
||||||
|
if bundle.state == RNS.Bundle.TAKING_CUSTODY:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# We have custody over this bundle, and we should attempt
|
||||||
|
# to deliver it to it's next hop.
|
||||||
|
if bundle.state == RNS.Bundle.FULL_CUSTODY:
|
||||||
|
pass
|
||||||
|
|
||||||
|
for bundle in removed_bundles:
|
||||||
|
Transport.bundle_table.remove(bundle)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def outbound(packet):
|
def outbound(packet):
|
||||||
while (Transport.jobs_running):
|
while (Transport.jobs_running):
|
||||||
@ -852,6 +894,11 @@ class Transport:
|
|||||||
if destination.direction == RNS.Destination.IN:
|
if destination.direction == RNS.Destination.IN:
|
||||||
Transport.destinations.append(destination)
|
Transport.destinations.append(destination)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def deregister_destination(destination):
|
||||||
|
if destination in Transport.destinations:
|
||||||
|
Transport.destinations.remove(destination)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def registerLink(link):
|
def registerLink(link):
|
||||||
RNS.log("Registering link "+str(link), RNS.LOG_DEBUG)
|
RNS.log("Registering link "+str(link), RNS.LOG_DEBUG)
|
||||||
@ -870,6 +917,11 @@ class Transport:
|
|||||||
else:
|
else:
|
||||||
RNS.log("Attempted to activate a link that was not in the pending table", RNS.LOG_ERROR)
|
RNS.log("Attempted to activate a link that was not in the pending table", RNS.LOG_ERROR)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def register_bundle(bundle):
|
||||||
|
RNS.log("Transport instance registered bundle "+RNS.prettyhexrep(bundle.id), RNS.LOG_DEBUG)
|
||||||
|
self.bundle_table.append(bundle)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def find_interface_from_hash(interface_hash):
|
def find_interface_from_hash(interface_hash):
|
||||||
for interface in Transport.interfaces:
|
for interface in Transport.interfaces:
|
||||||
@ -983,7 +1035,7 @@ class Transport:
|
|||||||
packet.send()
|
packet.send()
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def pathRequestHandler(data, packet):
|
def path_request_handler(data, packet):
|
||||||
if len(data) >= RNS.Identity.TRUNCATED_HASHLENGTH//8:
|
if len(data) >= RNS.Identity.TRUNCATED_HASHLENGTH//8:
|
||||||
Transport.pathRequest(
|
Transport.pathRequest(
|
||||||
data[:RNS.Identity.TRUNCATED_HASHLENGTH//8],
|
data[:RNS.Identity.TRUNCATED_HASHLENGTH//8],
|
||||||
@ -991,6 +1043,10 @@ class Transport:
|
|||||||
packet.receiving_interface
|
packet.receiving_interface
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def bundle_advertisement_handler(data, packet):
|
||||||
|
pass
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def pathRequest(destination_hash, is_from_local_client, attached_interface):
|
def pathRequest(destination_hash, is_from_local_client, attached_interface):
|
||||||
RNS.log("Path request for "+RNS.prettyhexrep(destination_hash), RNS.LOG_DEBUG)
|
RNS.log("Path request for "+RNS.prettyhexrep(destination_hash), RNS.LOG_DEBUG)
|
||||||
|
Loading…
Reference in New Issue
Block a user