This commit is contained in:
Lucas 2024-05-04 23:26:08 +02:00 committed by GitHub
commit 5cf4869eda
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 348 additions and 369 deletions

View File

@ -37,26 +37,48 @@ SIG_EXT = "rsg"
ENCRYPT_EXT = "rfe" ENCRYPT_EXT = "rfe"
CHUNK_SIZE = 16*1024*1024 CHUNK_SIZE = 16*1024*1024
def spin(until=None, msg=None, timeout=None):
def spin(until=None, msg="", timeout=None):
i = 0 i = 0
syms = "⢄⢂⢁⡁⡈⡐⡠" syms = "⢄⢂⢁⡁⡈⡐⡠"
if timeout != None: if timeout is not None:
timeout = time.time()+timeout timeout = time.time()+timeout
print(msg+" ", end=" ") print(msg, end=" ")
while (timeout == None or time.time()<timeout) and not until(): while (timeout is None or time.time()<timeout) and not until():
time.sleep(0.1) time.sleep(0.1)
print(("\b\b"+syms[i]+" "), end="") print(("\b\b", syms[i]), end=" ", sep="")
sys.stdout.flush() sys.stdout.flush()
i = (i+1)%len(syms) i = (i+1)%len(syms)
print("\r"+" "*len(msg)+" \r", end="") print("\r", *len(msg), "\r", end="", sep="")
if timeout != None and time.time() > timeout: if timeout is not None and time.time() > timeout:
return False return False
else: else:
return True return True
def get_keys(args: argparse.Namespace, identity) -> None:
"""Get public or/and private keys"""
if args.base64:
RNS.log("Public Key : " + base64.urlsafe_b64encode(identity.get_public_key()).decode("utf-8"))
elif args.base32:
RNS.log("Public Key : " + base64.b32encode(identity.get_public_key()).decode("utf-8"))
else:
RNS.log("Public Key : " + RNS.hexrep(identity.get_public_key(), delimit=False))
if identity.prv:
if args.print_private:
if args.base64:
RNS.log("Private Key : " + base64.urlsafe_b64encode(identity.get_private_key()).decode("utf-8"))
elif args.base32:
RNS.log("Private Key : " + base64.b32encode(identity.get_private_key()).decode("utf-8"))
else:
RNS.log("Private Key : " + RNS.hexrep(identity.get_private_key(), delimit=False))
else:
RNS.log("Private Key : Hidden")
def main(): def main():
try: try:
parser = argparse.ArgumentParser(description="Reticulum Identity & Encryption Utility") parser = argparse.ArgumentParser(description="Reticulum Identity & Encryption Utility")
@ -96,11 +118,7 @@ def main():
args = parser.parse_args() args = parser.parse_args()
ops = 0; ops = sum(1 for t in [args.encrypt, args.decrypt, args.validate, args.sign] if t)
for t in [args.encrypt, args.decrypt, args.validate, args.sign]:
if t:
ops += 1
if ops > 1: if ops > 1:
RNS.log("This utility currently only supports one of the encrypt, decrypt, sign or verify operations per invocation", RNS.LOG_ERROR) RNS.log("This utility currently only supports one of the encrypt, decrypt, sign or verify operations per invocation", RNS.LOG_ERROR)
exit(1) exit(1)
@ -134,22 +152,8 @@ def main():
exit(42) exit(42)
RNS.log("Identity imported") RNS.log("Identity imported")
if args.base64:
RNS.log("Public Key : "+base64.urlsafe_b64encode(identity.get_public_key()).decode("utf-8")) get_keys(args, identity)
elif args.base32:
RNS.log("Public Key : "+base64.b32encode(identity.get_public_key()).decode("utf-8"))
else:
RNS.log("Public Key : "+RNS.hexrep(identity.get_public_key(), delimit=False))
if identity.prv:
if args.print_private:
if args.base64:
RNS.log("Private Key : "+base64.urlsafe_b64encode(identity.get_private_key()).decode("utf-8"))
elif args.base32:
RNS.log("Private Key : "+base64.b32encode(identity.get_private_key()).decode("utf-8"))
else:
RNS.log("Private Key : "+RNS.hexrep(identity.get_private_key(), delimit=False))
else:
RNS.log("Private Key : Hidden")
if args.write: if args.write:
try: try:
@ -208,7 +212,7 @@ def main():
destination_hash = bytes.fromhex(identity_str) destination_hash = bytes.fromhex(identity_str)
identity = RNS.Identity.recall(destination_hash) identity = RNS.Identity.recall(destination_hash)
if identity == None: if identity is None:
if not args.request: if not args.request:
RNS.log("Could not recall Identity for "+RNS.prettyhexrep(destination_hash)+".", RNS.LOG_ERROR) RNS.log("Could not recall Identity for "+RNS.prettyhexrep(destination_hash)+".", RNS.LOG_ERROR)
RNS.log("You can query the network for unknown Identities with the -R option.", RNS.LOG_ERROR) RNS.log("You can query the network for unknown Identities with the -R option.", RNS.LOG_ERROR)
@ -216,7 +220,7 @@ def main():
else: else:
RNS.Transport.request_path(destination_hash) RNS.Transport.request_path(destination_hash)
def spincheck(): def spincheck():
return RNS.Identity.recall(destination_hash) != None return RNS.Identity.recall(destination_hash) is not None
spin(spincheck, "Requesting unknown Identity for "+RNS.prettyhexrep(destination_hash), args.t) spin(spincheck, "Requesting unknown Identity for "+RNS.prettyhexrep(destination_hash), args.t)
if not spincheck(): if not spincheck():
@ -224,17 +228,15 @@ def main():
exit(6) exit(6)
else: else:
identity = RNS.Identity.recall(destination_hash) identity = RNS.Identity.recall(destination_hash)
RNS.log("Received Identity "+str(identity)+" for destination "+RNS.prettyhexrep(destination_hash)+" from the network") RNS.log(" ".join(["Received Identity", str(identity), "for destination", RNS.prettyhexrep(destination_hash), "from the network"]))
else: else:
RNS.log("Recalled Identity "+str(identity)+" for destination "+RNS.prettyhexrep(destination_hash)) RNS.log(" ".join(["Recalled Identity", str(identity), "for destination", RNS.prettyhexrep(destination_hash)]))
except Exception as e: except Exception as e:
RNS.log("Invalid hexadecimal hash provided", RNS.LOG_ERROR) RNS.log("Invalid hexadecimal hash provided", RNS.LOG_ERROR)
exit(7) exit(7)
else: else:
# Try loading Identity from file # Try loading Identity from file
if not os.path.isfile(identity_str): if not os.path.isfile(identity_str):
@ -243,13 +245,13 @@ def main():
else: else:
try: try:
identity = RNS.Identity.from_file(identity_str) identity = RNS.Identity.from_file(identity_str)
RNS.log("Loaded Identity "+str(identity)+" from "+str(identity_str)) RNS.log(" ".join(["Loaded Identity", str(identity), "from", str(identity_str)]))
except Exception as e: except Exception as e:
RNS.log("Could not decode Identity from specified file") RNS.log("Could not decode Identity from specified file")
exit(9) exit(9)
if identity != None: if identity is not None:
if args.hash: if args.hash:
try: try:
aspects = args.hash.split(".") aspects = args.hash.split(".")
@ -259,9 +261,9 @@ def main():
else: else:
app_name = aspects[0] app_name = aspects[0]
aspects = aspects[1:] aspects = aspects[1:]
if identity.pub != None: if identity.pub is not None:
destination = RNS.Destination(identity, RNS.Destination.OUT, RNS.Destination.SINGLE, app_name, *aspects) destination = RNS.Destination(identity, RNS.Destination.OUT, RNS.Destination.SINGLE, app_name, *aspects)
RNS.log("The "+str(args.hash)+" destination for this Identity is "+RNS.prettyhexrep(destination.hash)) RNS.log(" ".join(["The", str(args.hash), "destination for this Identity is", RNS.prettyhexrep(destination.hash)]))
RNS.log("The full destination specifier is "+str(destination)) RNS.log("The full destination specifier is "+str(destination))
time.sleep(0.25) time.sleep(0.25)
exit(0) exit(0)
@ -282,7 +284,7 @@ def main():
else: else:
app_name = aspects[0] app_name = aspects[0]
aspects = aspects[1:] aspects = aspects[1:]
if identity.prv != None: if identity.prv is not None:
destination = RNS.Destination(identity, RNS.Destination.IN, RNS.Destination.SINGLE, app_name, *aspects) destination = RNS.Destination(identity, RNS.Destination.IN, RNS.Destination.SINGLE, app_name, *aspects)
RNS.log("Created destination "+str(destination)) RNS.log("Created destination "+str(destination))
RNS.log("Announcing destination "+RNS.prettyhexrep(destination.hash)) RNS.log("Announcing destination "+RNS.prettyhexrep(destination.hash))
@ -291,7 +293,7 @@ def main():
exit(0) exit(0)
else: else:
destination = RNS.Destination(identity, RNS.Destination.OUT, RNS.Destination.SINGLE, app_name, *aspects) destination = RNS.Destination(identity, RNS.Destination.OUT, RNS.Destination.SINGLE, app_name, *aspects)
RNS.log("The "+str(args.announce)+" destination for this Identity is "+RNS.prettyhexrep(destination.hash)) RNS.log(" ".join(["The", str(args.announce), "destination for this Identity is", RNS.prettyhexrep(destination.hash)]))
RNS.log("The full destination specifier is "+str(destination)) RNS.log("The full destination specifier is "+str(destination))
RNS.log("Cannot announce this destination, since the private key is not held") RNS.log("Cannot announce this destination, since the private key is not held")
time.sleep(0.25) time.sleep(0.25)
@ -303,22 +305,7 @@ def main():
exit(0) exit(0)
if args.print_identity: if args.print_identity:
if args.base64: get_keys(args, identity)
RNS.log("Public Key : "+base64.urlsafe_b64encode(identity.get_public_key()).decode("utf-8"))
elif args.base32:
RNS.log("Public Key : "+base64.b32encode(identity.get_public_key()).decode("utf-8"))
else:
RNS.log("Public Key : "+RNS.hexrep(identity.get_public_key(), delimit=False))
if identity.prv:
if args.print_private:
if args.base64:
RNS.log("Private Key : "+base64.urlsafe_b64encode(identity.get_private_key()).decode("utf-8"))
elif args.base32:
RNS.log("Private Key : "+base64.b32encode(identity.get_private_key()).decode("utf-8"))
else:
RNS.log("Private Key : "+RNS.hexrep(identity.get_private_key(), delimit=False))
else:
RNS.log("Private Key : Hidden")
exit(0) exit(0)
if args.export: if args.export:
@ -373,7 +360,7 @@ def main():
if args.decrypt and not args.write and not args.stdout and args.read and args.read.lower().endswith("."+ENCRYPT_EXT): if args.decrypt and not args.write and not args.stdout and args.read and args.read.lower().endswith("."+ENCRYPT_EXT):
args.write = str(args.read).replace("."+ENCRYPT_EXT, "") args.write = str(args.read).replace("."+ENCRYPT_EXT, "")
if args.sign and identity.prv == None: if args.sign and identity.prv is None:
RNS.log("Specified Identity does not hold a private key. Cannot sign.", RNS.LOG_ERROR) RNS.log("Specified Identity does not hold a private key. Cannot sign.", RNS.LOG_ERROR)
exit(14) exit(14)
@ -399,7 +386,7 @@ def main():
# data_output = sys.stdout # data_output = sys.stdout
if args.sign: if args.sign:
if identity.prv == None: if identity.prv is None:
RNS.log("Specified Identity does not hold a private key. Cannot sign.", RNS.LOG_ERROR) RNS.log("Specified Identity does not hold a private key. Cannot sign.", RNS.LOG_ERROR)
exit(16) exit(16)
@ -423,7 +410,7 @@ def main():
if not args.stdout: if not args.stdout:
if args.read: if args.read:
RNS.log("File "+str(args.read)+" signed with "+str(identity)+" to "+str(args.write)) RNS.log(" ".join([ "File", str(args.read), "signed with", str(identity), "to", str(args.write) ]))
exit(0) exit(0)
except Exception as e: except Exception as e:
@ -457,18 +444,17 @@ def main():
RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR) RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR)
exit(21) exit(21)
validated = identity.validate(sig_input.read(), data_input.read()) validated = identity.validate(sig_input.read(), data_input.read())
sig_input.close() sig_input.close()
data_input.close() data_input.close()
if not validated: if not validated:
if not args.stdout: if not args.stdout:
RNS.log("Signature "+str(args.validate)+" for file "+str(args.read)+" is invalid", RNS.LOG_ERROR) RNS.log(" ".join(["Signature", str(args.validate), "for file", str(args.read), "is invalid"]), RNS.LOG_ERROR)
exit(22) exit(22)
else: else:
if not args.stdout: if not args.stdout:
RNS.log("Signature "+str(args.validate)+" for file "+str(args.read)+" made by Identity "+str(identity)+" is valid") RNS.log(" ".join(["Signature", str(args.validate), "for file", str(args.read), "made by Identity", str(identity), "is valid"]))
exit(0) exit(0)
except Exception as e: except Exception as e:
@ -511,7 +497,7 @@ def main():
data_input.close() data_input.close()
if not args.stdout: if not args.stdout:
if args.read: if args.read:
RNS.log("File "+str(args.read)+" encrypted for "+str(identity)+" to "+str(args.write)) RNS.log(" ".join(["File", str(args.read), "encrypted for", str(identity), "to", str(args.write)]))
exit(0) exit(0)
except Exception as e: except Exception as e:
@ -529,7 +515,7 @@ def main():
exit(26) exit(26)
if args.decrypt: if args.decrypt:
if identity.prv == None: if identity.prv is None:
RNS.log("Specified Identity does not hold a private key. Cannot decrypt.", RNS.LOG_ERROR) RNS.log("Specified Identity does not hold a private key. Cannot decrypt.", RNS.LOG_ERROR)
exit(27) exit(27)
@ -544,7 +530,7 @@ def main():
exit(29) exit(29)
if not args.stdout: if not args.stdout:
RNS.log("Decrypting "+str(args.read)+"...") RNS.log("".join(["Decrypting ", str(args.read), "..."]))
try: try:
more_data = True more_data = True
@ -552,7 +538,7 @@ def main():
chunk = data_input.read(CHUNK_SIZE) chunk = data_input.read(CHUNK_SIZE)
if chunk: if chunk:
plaintext = identity.decrypt(chunk) plaintext = identity.decrypt(chunk)
if plaintext == None: if plaintext is None:
if not args.stdout: if not args.stdout:
RNS.log("Data could not be decrypted with the specified Identity") RNS.log("Data could not be decrypted with the specified Identity")
exit(30) exit(30)
@ -564,7 +550,7 @@ def main():
data_input.close() data_input.close()
if not args.stdout: if not args.stdout:
if args.read: if args.read:
RNS.log("File "+str(args.read)+" decrypted with "+str(identity)+" to "+str(args.write)) RNS.log(" ".join(["File", str(args.read), "decrypted with", str(identity), "to", str(args.write)]))
exit(0) exit(0)
except Exception as e: except Exception as e:
@ -581,20 +567,10 @@ def main():
pass pass
exit(31) exit(31)
if True:
pass
elif False:
pass
else:
print("")
parser.print_help()
print("")
except KeyboardInterrupt: except KeyboardInterrupt:
print("") print("")
exit(255) exit(255)
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@ -24,13 +24,12 @@
import RNS import RNS
import argparse import argparse
import time
from RNS._version import __version__ from RNS._version import __version__
def program_setup(configdir, verbosity = 0, quietness = 0, service = False): def program_setup(configdir, verbosity=0, quietness=0, service=False):
targetverbosity = verbosity-quietness targetverbosity = verbosity - quietness
if service: if service:
targetlogdest = RNS.LOG_FILE targetlogdest = RNS.LOG_FILE
@ -38,9 +37,10 @@ def program_setup(configdir, verbosity = 0, quietness = 0, service = False):
else: else:
targetlogdest = RNS.LOG_STDOUT targetlogdest = RNS.LOG_STDOUT
reticulum = RNS.Reticulum(configdir=configdir, verbosity=targetverbosity, logdest=targetlogdest) RNS.Reticulum(configdir=configdir, verbosity=targetverbosity, logdest=targetlogdest)
exit(0) exit(0)
def main(): def main():
try: try:
parser = argparse.ArgumentParser(description="Reticulum Distributed Identity Resolver") parser = argparse.ArgumentParser(description="Reticulum Distributed Identity Resolver")
@ -49,24 +49,23 @@ def main():
parser.add_argument('-q', '--quiet', action='count', default=0) parser.add_argument('-q', '--quiet', action='count', default=0)
parser.add_argument("--exampleconfig", action='store_true', default=False, help="print verbose configuration example to stdout and exit") parser.add_argument("--exampleconfig", action='store_true', default=False, help="print verbose configuration example to stdout and exit")
parser.add_argument("--version", action="version", version="ir {version}".format(version=__version__)) parser.add_argument("--version", action="version", version="ir {version}".format(version=__version__))
args = parser.parse_args() args = parser.parse_args()
if args.exampleconfig: if args.exampleconfig:
print(__example_rns_config__) print(__example_rns_config__)
exit() exit()
configarg = None
if args.config: if args.config:
configarg = args.config configarg = args.config
else:
configarg = None
program_setup(configdir = configarg, verbosity=args.verbose, quietness=args.quiet) program_setup(configdir=configarg, verbosity=args.verbose, quietness=args.quiet)
except KeyboardInterrupt: except KeyboardInterrupt:
print("") print("")
exit() exit()
__example_rns_config__ = '''# This is an example Identity Resolver file. __example_rns_config__ = '''# This is an example Identity Resolver file.
''' '''

View File

@ -56,6 +56,8 @@ fw_filename = None
fw_url = None fw_url = None
mapped_model = None mapped_model = None
# TODO: Create and use constants for megas, kilos, etc (Avoid using magic numbers)
class KISS(): class KISS():
FEND = 0xC0 FEND = 0xC0
FESC = 0xDB FESC = 0xDB
@ -280,7 +282,7 @@ try:
os.makedirs(ROM_DIR) os.makedirs(ROM_DIR)
except Exception as e: except Exception as e:
print("No access to directory "+str(CNF_DIR)+". This utility needs file system access to store firmware and data files. Cannot continue.") print("".join(["No access to directory ", str(CNF_DIR), ". This utility needs file system access to store firmware and data files. Cannot continue."]))
print("The contained exception was:") print("The contained exception was:")
print(str(e)) print(str(e))
exit(99) exit(99)
@ -411,7 +413,7 @@ class RNode():
command_buffer = command_buffer+bytes([byte]) command_buffer = command_buffer+bytes([byte])
if (len(command_buffer) == 4): if (len(command_buffer) == 4):
self.r_frequency = command_buffer[0] << 24 | command_buffer[1] << 16 | command_buffer[2] << 8 | command_buffer[3] self.r_frequency = command_buffer[0] << 24 | command_buffer[1] << 16 | command_buffer[2] << 8 | command_buffer[3]
RNS.log("Radio reporting frequency is "+str(self.r_frequency/1000000.0)+" MHz") RNS.log("".join(["Radio reporting frequency is ", str(self.r_frequency/1000000.0), " MHz"]))
self.updateBitrate() self.updateBitrate()
elif (command == KISS.CMD_BANDWIDTH): elif (command == KISS.CMD_BANDWIDTH):
@ -427,7 +429,7 @@ class RNode():
command_buffer = command_buffer+bytes([byte]) command_buffer = command_buffer+bytes([byte])
if (len(command_buffer) == 4): if (len(command_buffer) == 4):
self.r_bandwidth = command_buffer[0] << 24 | command_buffer[1] << 16 | command_buffer[2] << 8 | command_buffer[3] self.r_bandwidth = command_buffer[0] << 24 | command_buffer[1] << 16 | command_buffer[2] << 8 | command_buffer[3]
RNS.log("Radio reporting bandwidth is "+str(self.r_bandwidth/1000.0)+" KHz") RNS.log("".join(["Radio reporting bandwidth is ", str(self.r_bandwidth/1000.0), " KHz"]))
self.updateBitrate() self.updateBitrate()
elif (command == KISS.CMD_BT_PIN): elif (command == KISS.CMD_BT_PIN):
@ -501,7 +503,7 @@ class RNode():
elif (command == KISS.CMD_TXPOWER): elif (command == KISS.CMD_TXPOWER):
self.r_txpower = byte self.r_txpower = byte
RNS.log("Radio reporting TX power is "+str(self.r_txpower)+" dBm") RNS.log("".join(["Radio reporting TX power is ", str(self.r_txpower), " dBm"]))
elif (command == KISS.CMD_SF): elif (command == KISS.CMD_SF):
self.r_sf = byte self.r_sf = byte
RNS.log("Radio reporting spreading factor is "+str(self.r_sf)) RNS.log("Radio reporting spreading factor is "+str(self.r_sf))
@ -549,11 +551,11 @@ class RNode():
self.r_random = byte self.r_random = byte
elif (command == KISS.CMD_ERROR): elif (command == KISS.CMD_ERROR):
if (byte == KISS.ERROR_INITRADIO): if (byte == KISS.ERROR_INITRADIO):
RNS.log(str(self)+" hardware initialisation error (code "+RNS.hexrep(byte)+")") RNS.log("".join([str(self), " hardware initialisation error (code ", RNS.hexrep(byte), ")"]))
elif (byte == KISS.ERROR_TXFAILED): elif (byte == KISS.ERROR_TXFAILED):
RNS.log(str(self)+" hardware TX error (code "+RNS.hexrep(byte)+")") RNS.log("".join([str(self), " hardware TX error (code ", RNS.hexrep(byte), ")"]))
else: else:
RNS.log(str(self)+" hardware error (code "+RNS.hexrep(byte)+")") RNS.log("".join([str(self), " hardware error (code ", RNS.hexrep(byte), ")"]))
elif (command == KISS.CMD_DETECT): elif (command == KISS.CMD_DETECT):
if byte == KISS.DETECT_RESP: if byte == KISS.DETECT_RESP:
self.detected = True self.detected = True
@ -930,7 +932,7 @@ class RNode():
print(" ") print(" ")
print(" To initialise this device to a verifiable state, please run:") print(" To initialise this device to a verifiable state, please run:")
print(" ") print(" ")
print(" rnodeconf "+str(self.serial.name)+" --autoinstall") print("".join([" rnodeconf ", str(self.serial.name), " --autoinstall"]))
print("") print("")
@ -965,7 +967,7 @@ class RNode():
selected_version = None selected_version = None
selected_hash = None selected_hash = None
firmware_version_url = "https://unsigned.io/firmware/latest/?v="+program_version+"&variant=" firmware_version_url = "".join(["https://unsigned.io/firmware/latest/?v=", program_version, "&variant="])
fallback_firmware_version_url = "https://github.com/markqvist/rnode_firmware/releases/latest/download/release.json" fallback_firmware_version_url = "https://github.com/markqvist/rnode_firmware/releases/latest/download/release.json"
def ensure_firmware_file(fw_filename): def ensure_firmware_file(fw_filename):
global selected_version, selected_hash, upd_nocheck global selected_version, selected_hash, upd_nocheck
@ -981,7 +983,7 @@ def ensure_firmware_file(fw_filename):
] ]
parts_missing = False parts_missing = False
for rf in required_files: for rf in required_files:
if not os.path.isfile(EXT_DIR+"/"+rf): if not os.path.isfile("".join([EXT_DIR, "/", rf])):
parts_missing = True parts_missing = True
if parts_missing: if parts_missing:
@ -994,7 +996,7 @@ def ensure_firmware_file(fw_filename):
release_info = vf.read().decode("utf-8").strip() release_info = vf.read().decode("utf-8").strip()
selected_version = release_info.split()[0] selected_version = release_info.split()[0]
selected_hash = release_info.split()[1] selected_hash = release_info.split()[1]
RNS.log("Using existing firmware file: "+fw_filename+" for version "+selected_version) RNS.log("".join(["Using existing firmware file: ", fw_filename, " for version ", selected_version]))
else: else:
RNS.log("No extracted firmware is available, cannot continue.") RNS.log("No extracted firmware is available, cannot continue.")
RNS.log("Extract a firmware from an existing RNode first, using the --extract-firmware option.") RNS.log("Extract a firmware from an existing RNode first, using the --extract-firmware option.")
@ -1006,17 +1008,19 @@ def ensure_firmware_file(fw_filename):
try: try:
# if custom firmware url, download latest release # if custom firmware url, download latest release
if selected_version == None and fw_url == None: if selected_version == None and fw_url == None:
version_url = firmware_version_url+fw_filename version_url = firmware_version_url + fw_filename
RNS.log("Retrieving latest version info from "+version_url) RNS.log("Retrieving latest version info from "+version_url)
urlretrieve(firmware_version_url+fw_filename, UPD_DIR+"/"+fw_filename+".version.latest") urlretrieve(version_url, "".join([UPD_DIR, "/", fw_filename, ".version.latest"]))
else: else:
if fw_url != None: if fw_url != None:
if selected_version == None: if selected_version == None:
version_url = fw_url+"latest/download/release.json" version_url = fw_url+"latest/download/release.json"
else: else:
version_url = fw_url+"download/"+selected_version+"/release.json" version_url = "".join([fw_url, "download/", selected_version, "/release.json"])
else: else:
version_url = firmware_update_url+selected_version+"/release.json" if selected_version is None:
selected_version = ""
version_url = "".join([firmware_update_url, selected_version, "/release.json"])
try: try:
RNS.log("Retrieving specified version info from "+version_url) RNS.log("Retrieving specified version info from "+version_url)
urlretrieve(version_url, UPD_DIR+"/version_release_info.json") urlretrieve(version_url, UPD_DIR+"/version_release_info.json")
@ -1024,8 +1028,9 @@ def ensure_firmware_file(fw_filename):
with open(UPD_DIR+"/version_release_info.json", "rb") as rif: with open(UPD_DIR+"/version_release_info.json", "rb") as rif:
rdat = json.loads(rif.read()) rdat = json.loads(rif.read())
variant = rdat[fw_filename] variant = rdat[fw_filename]
with open(UPD_DIR+"/"+fw_filename+".version.latest", "wb") as verf: with open("".join([UPD_DIR, "/", fw_filename, ".version.latest"]), "wb") as verf:
inf_str = str(variant["version"])+" "+str(variant["hash"]) inf_str = str(variant["version"])+" "+str(variant["hash"])
inf_str = "".join([str(variant["version"]), " ", str(variant["hash"])])
verf.write(inf_str.encode("utf-8")) verf.write(inf_str.encode("utf-8"))
except Exception as e: except Exception as e:
RNS.log("Failed to retrive version information for your board.") RNS.log("Failed to retrive version information for your board.")
@ -1055,7 +1060,7 @@ def ensure_firmware_file(fw_filename):
with open(UPD_DIR+"/fallback_release_info.json", "rb") as rif: with open(UPD_DIR+"/fallback_release_info.json", "rb") as rif:
rdat = json.loads(rif.read()) rdat = json.loads(rif.read())
variant = rdat[fw_filename] variant = rdat[fw_filename]
with open(UPD_DIR+"/"+fw_filename+".version.latest", "wb") as verf: with open("".join([UPD_DIR, "/", fw_filename, ".version.latest"]), "wb") as verf:
inf_str = str(variant["version"])+" "+str(variant["hash"]) inf_str = str(variant["version"])+" "+str(variant["hash"])
verf.write(inf_str.encode("utf-8")) verf.write(inf_str.encode("utf-8"))
@ -1064,7 +1069,7 @@ def ensure_firmware_file(fw_filename):
raise e raise e
import shutil import shutil
file = open(UPD_DIR+"/"+fw_filename+".version.latest", "rb") file = open("".join([UPD_DIR, "/", fw_filename, ".version.latest"]), "rb")
release_info = file.read().decode("utf-8").strip() release_info = file.read().decode("utf-8").strip()
selected_version = release_info.split()[0] selected_version = release_info.split()[0]
if selected_version == "not": if selected_version == "not":
@ -1074,7 +1079,7 @@ def ensure_firmware_file(fw_filename):
selected_hash = release_info.split()[1] selected_hash = release_info.split()[1]
if not os.path.isdir(UPD_DIR+"/"+selected_version): if not os.path.isdir(UPD_DIR+"/"+selected_version):
os.makedirs(UPD_DIR+"/"+selected_version) os.makedirs(UPD_DIR+"/"+selected_version)
shutil.copy(UPD_DIR+"/"+fw_filename+".version.latest", UPD_DIR+"/"+selected_version+"/"+fw_filename+".version") shutil.copy("".join([UPD_DIR, "/", fw_filename, ".version.latest"]), "".join([UPD_DIR, "/", selected_version, "/", fw_filename, ".version"]))
RNS.log("The selected firmware for this board is version "+selected_version) RNS.log("The selected firmware for this board is version "+selected_version)
else: else:
@ -1084,32 +1089,32 @@ def ensure_firmware_file(fw_filename):
# if custom firmware url, use it # if custom firmware url, use it
if fw_url != None: if fw_url != None:
update_target_url = fw_url+"download/"+selected_version+"/"+fw_filename update_target_url = "".join([fw_url, "download/", selected_version, "/", fw_filename])
RNS.log("Retrieving firmware from custom url "+update_target_url) RNS.log("Retrieving firmware from custom url "+update_target_url)
else: else:
update_target_url = firmware_update_url+selected_version+"/"+fw_filename update_target_url = "".join([firmware_update_url, selected_version, "/", fw_filename])
try: try:
if not os.path.isdir(UPD_DIR+"/"+selected_version): if not os.path.isdir(UPD_DIR+"/"+selected_version):
os.makedirs(UPD_DIR+"/"+selected_version) os.makedirs(UPD_DIR+"/"+selected_version)
if not os.path.isfile(UPD_DIR+"/"+selected_version+"/"+fw_filename): if not os.path.isfile("".join([UPD_DIR, "/", selected_version, "/", fw_filename])):
RNS.log("Firmware "+UPD_DIR+"/"+selected_version+"/"+fw_filename+" not found.") RNS.log("".join(["Firmware ", UPD_DIR, "/", selected_version, "/", fw_filename, " not found."]))
RNS.log("Downloading missing firmware file: "+fw_filename+" for version "+selected_version) RNS.log("".join(["Downloading missing firmware file: ", fw_filename, " for version ", selected_version]))
urlretrieve(update_target_url, UPD_DIR+"/"+selected_version+"/"+fw_filename) urlretrieve(update_target_url, "".join([UPD_DIR, "/", selected_version, "/", fw_filename]))
RNS.log("Firmware file downloaded") RNS.log("Firmware file downloaded")
else: else:
RNS.log("Using existing firmware file: "+fw_filename+" for version "+selected_version) RNS.log("".join(["Using existing firmware file: ", fw_filename, " for version ", selected_version]))
try: try:
if selected_hash == None: if selected_hash == None:
try: try:
file = open(UPD_DIR+"/"+selected_version+"/"+fw_filename+".version", "rb") file = open("".join([UPD_DIR, "/", selected_version, "/", fw_filename, ".version"]), "rb")
release_info = file.read().decode("utf-8").strip() release_info = file.read().decode("utf-8").strip()
selected_hash = release_info.split()[1] selected_hash = release_info.split()[1]
except Exception as e: except Exception as e:
RNS.log("Could not read locally cached release information.") RNS.log("Could not read locally cached release information.")
RNS.log("Ensure "+UPD_DIR+"/"+selected_version+"/"+fw_filename+".version exists and has the correct format and hash.") RNS.log("".join(["Ensure ", UPD_DIR, "/", selected_version, "/", fw_filename, ".version exists and has the correct format and hash."]))
RNS.log("You can clear the cache with the --clear-cache option and try again.") RNS.log("You can clear the cache with the --clear-cache option and try again.")
if selected_hash == None: if selected_hash == None:
@ -1117,7 +1122,8 @@ def ensure_firmware_file(fw_filename):
exit(97) exit(97)
RNS.log("Verifying firmware integrity...") RNS.log("Verifying firmware integrity...")
fw_file = open(UPD_DIR+"/"+selected_version+"/"+fw_filename, "rb") fw_file = open("".join([UPD_DIR, "/", selected_version, "/", fw_filename]), "rb")
# TODO: Remove unused variable
expected_hash = bytes.fromhex(selected_hash) expected_hash = bytes.fromhex(selected_hash)
file_hash = hashlib.sha256(fw_file.read()).hexdigest() file_hash = hashlib.sha256(fw_file.read()).hexdigest()
if file_hash == selected_hash: if file_hash == selected_hash:
@ -1296,7 +1302,7 @@ def main():
public_key = load_der_public_key(public_bytes, backend=default_backend()) public_key = load_der_public_key(public_bytes, backend=default_backend())
key_hash = hashlib.sha256(public_bytes).hexdigest() key_hash = hashlib.sha256(public_bytes).hexdigest()
RNS.log("Trusting key: "+str(key_hash)) RNS.log("Trusting key: "+str(key_hash))
f = open(TK_DIR+"/"+str(key_hash)+".pubkey", "wb") f = open("".join([TK_DIR, "/", str(key_hash), ".pubkey"]), "wb")
f.write(public_bytes) f.write(public_bytes)
f.close() f.close()
@ -1333,7 +1339,7 @@ def main():
pi = 1 pi = 1
print("Detected serial ports:") print("Detected serial ports:")
for port in portlist: for port in portlist:
print(" ["+str(pi)+"] "+str(port.device)+" ("+str(port.product)+", "+str(port.serial_number)+")") print("".join([" [", str(pi), "] ", str(port.device), " (", str(port.product), ", ", str(port.serial_number), ")"]))
pi += 1 pi += 1
print("\nEnter the number of the serial port your device is connected to:\n? ", end="") print("\nEnter the number of the serial port your device is connected to:\n? ", end="")
@ -1355,7 +1361,7 @@ def main():
port_product = selected_port.product port_product = selected_port.product
port_serialno = selected_port.serial_number port_serialno = selected_port.serial_number
print("\nOk, using device on "+str(port_path)+" ("+str(port_product)+", "+str(port_serialno)+")") print("".join(["\nOk, using device on ", str(port_path), " (", str(port_product), ", ", str(port_serialno), ")"]))
else: else:
ports = list_ports.comports() ports = list_ports.comports()
@ -1423,11 +1429,11 @@ def main():
hash_f.close() hash_f.close()
extraction_parts = [ extraction_parts = [
("bootloader", "python \""+CNF_DIR+"/recovery_esptool.py\" --chip esp32 --port "+port_path+" --baud "+args.baud_flash+" --before default_reset --after hard_reset read_flash 0x1000 0x4650 \""+EXT_DIR+"/extracted_rnode_firmware.bootloader\""), ("bootloader", "".join(["python \"", CNF_DIR, "/recovery_esptool.py\" --chip esp32 --port ", port_path, " --baud ", args.baud_flash, " --before default_reset --after hard_reset read_flash 0x1000 0x4650 \"", EXT_DIR, "/extracted_rnode_firmware.bootloader\""])),
("partition table", "python \""+CNF_DIR+"/recovery_esptool.py\" --chip esp32 --port "+port_path+" --baud "+args.baud_flash+" --before default_reset --after hard_reset read_flash 0x8000 0xC00 \""+EXT_DIR+"/extracted_rnode_firmware.partitions\""), ("partition table", "".join(["python \"", CNF_DIR, "/recovery_esptool.py\" --chip esp32 --port ", port_path, " --baud ", args.baud_flash, " --before default_reset --after hard_reset read_flash 0x8000 0xC00 \"", EXT_DIR, "/extracted_rnode_firmware.partitions\""])),
("app boot", "python \""+CNF_DIR+"/recovery_esptool.py\" --chip esp32 --port "+port_path+" --baud "+args.baud_flash+" --before default_reset --after hard_reset read_flash 0xe000 0x2000 \""+EXT_DIR+"/extracted_rnode_firmware.boot_app0\""), ("app boot", "".join(["python \"", CNF_DIR, "/recovery_esptool.py\" --chip esp32 --port ", port_path, " --baud ", args.baud_flash, " --before default_reset --after hard_reset read_flash 0xe000 0x2000 \"", EXT_DIR, "/extracted_rnode_firmware.boot_app0\""])),
("application image", "python \""+CNF_DIR+"/recovery_esptool.py\" --chip esp32 --port "+port_path+" --baud "+args.baud_flash+" --before default_reset --after hard_reset read_flash 0x10000 0x200000 \""+EXT_DIR+"/extracted_rnode_firmware.bin\""), ("application image", "".join(["python \"", CNF_DIR, "/recovery_esptool.py\" --chip esp32 --port ", port_path, " --baud ", args.baud_flash, " --before default_reset --after hard_reset read_flash 0x10000 0x200000 \"", EXT_DIR, "/extracted_rnode_firmware.bin\""])),
("console image", "python \""+CNF_DIR+"/recovery_esptool.py\" --chip esp32 --port "+port_path+" --baud "+args.baud_flash+" --before default_reset --after hard_reset read_flash 0x210000 0x1F0000 \""+EXT_DIR+"/extracted_console_image.bin\""), ("console image", "".join(["python \"", CNF_DIR, "/recovery_esptool.py\" --chip esp32 --port ", port_path, " --baud ", args.baud_flash, " --before default_reset --after hard_reset read_flash 0x210000 0x1F0000 \"", EXT_DIR, "/extracted_console_image.bin\""])),
] ]
import subprocess, shlex import subprocess, shlex
for part, command in extraction_parts: for part, command in extraction_parts:
@ -1465,7 +1471,7 @@ def main():
pi = 1 pi = 1
print("Detected serial ports:") print("Detected serial ports:")
for port in portlist: for port in portlist:
print(" ["+str(pi)+"] "+str(port.device)+" ("+str(port.product)+", "+str(port.serial_number)+")") print("".join([" [", str(pi), "] ", str(port.device), " (", str(port.product), ", ", str(port.serial_number), ")"]))
pi += 1 pi += 1
print("\nEnter the number of the serial port your device is connected to:\n? ", end="") print("\nEnter the number of the serial port your device is connected to:\n? ", end="")
@ -1488,7 +1494,7 @@ def main():
port_serialno = selected_port.serial_number port_serialno = selected_port.serial_number
clear() clear()
print("\nOk, using device on "+str(port_path)+" ("+str(port_product)+", "+str(port_serialno)+")") print("".join(["\nOk, using device on ", str(port_path), " (", str(port_product), ", ", str(port_serialno), ")"]))
else: else:
ports = list_ports.comports() ports = list_ports.comports()
@ -2177,7 +2183,7 @@ def main():
if platform == "unzip": if platform == "unzip":
flasher = "unzip" flasher = "unzip"
if which(flasher) is not None: if which(flasher) is not None:
return [flasher, "-o", UPD_DIR+"/"+selected_version+"/"+fw_filename, "-d", UPD_DIR+"/"+selected_version] return [flasher, "-o", "".join([UPD_DIR, "/", selected_version, "/", fw_filename]), "-d", UPD_DIR+"/"+selected_version]
else: else:
RNS.log("") RNS.log("")
RNS.log("You do not currently have the \""+flasher+"\" program installed on your system.") RNS.log("You do not currently have the \""+flasher+"\" program installed on your system.")
@ -2194,9 +2200,9 @@ def main():
# avrdude -C/home/markqvist/.arduino15/packages/arduino/tools/avrdude/6.3.0-arduino17/etc/avrdude.conf -q -q -V -patmega2560 -cwiring -P/dev/ttyACM0 -b115200 -D -Uflash:w:/tmp/arduino-sketch-0E260F46C421A84A7CBAD48E859C8E64/RNode_Firmware.ino.hex:i # avrdude -C/home/markqvist/.arduino15/packages/arduino/tools/avrdude/6.3.0-arduino17/etc/avrdude.conf -q -q -V -patmega2560 -cwiring -P/dev/ttyACM0 -b115200 -D -Uflash:w:/tmp/arduino-sketch-0E260F46C421A84A7CBAD48E859C8E64/RNode_Firmware.ino.hex:i
# avrdude -q -q -V -patmega2560 -cwiring -P/dev/ttyACM0 -b115200 -D -Uflash:w:/tmp/arduino-sketch-0E260F46C421A84A7CBAD48E859C8E64/RNode_Firmware.ino.hex:i # avrdude -q -q -V -patmega2560 -cwiring -P/dev/ttyACM0 -b115200 -D -Uflash:w:/tmp/arduino-sketch-0E260F46C421A84A7CBAD48E859C8E64/RNode_Firmware.ino.hex:i
if fw_filename == "rnode_firmware.hex": if fw_filename == "rnode_firmware.hex":
return [flasher, "-P", args.port, "-p", "m1284p", "-c", "arduino", "-b", "115200", "-U", "flash:w:"+UPD_DIR+"/"+selected_version+"/"+fw_filename+":i"] return [flasher, "-P", args.port, "-p", "m1284p", "-c", "arduino", "-b", "115200", "-U", "".join(["flash:w:", UPD_DIR, "/", selected_version, "/", fw_filename, ":i"])]
elif fw_filename == "rnode_firmware_m2560.hex": elif fw_filename == "rnode_firmware_m2560.hex":
return [flasher, "-P", args.port, "-p", "atmega2560", "-c", "wiring", "-D", "-b", "115200", "-U", "flash:w:"+UPD_DIR+"/"+selected_version+"/"+fw_filename] return [flasher, "-P", args.port, "-p", "atmega2560", "-c", "wiring", "-D", "-b", "115200", "-U", "".join(["flash:w:", UPD_DIR, "/", selected_version, "/", fw_filename])]
else: else:
RNS.log("") RNS.log("")
RNS.log("You do not currently have the \""+flasher+"\" program installed on your system.") RNS.log("You do not currently have the \""+flasher+"\" program installed on your system.")
@ -2224,6 +2230,7 @@ def main():
os.chmod(flasher, stat.S_IRWXU | stat.S_IRGRP | stat.S_IXGRP) os.chmod(flasher, stat.S_IRWXU | stat.S_IRGRP | stat.S_IXGRP)
if which(flasher) is not None: if which(flasher) is not None:
# TODO: Simplify this returns with a list and add or remove arguments
if fw_filename == "rnode_firmware_tbeam.zip": if fw_filename == "rnode_firmware_tbeam.zip":
if numeric_version >= 1.55: if numeric_version >= 1.55:
return [ return [
@ -2237,11 +2244,11 @@ def main():
"--flash_mode", "dio", "--flash_mode", "dio",
"--flash_freq", "80m", "--flash_freq", "80m",
"--flash_size", "4MB", "--flash_size", "4MB",
"0xe000", UPD_DIR+"/"+selected_version+"/rnode_firmware_tbeam.boot_app0", "0xe000", "".join([UPD_DIR, "/", selected_version, "/rnode_firmware_tbeam.boot_app0"]),
"0x1000", UPD_DIR+"/"+selected_version+"/rnode_firmware_tbeam.bootloader", "0x1000", "".join([UPD_DIR, "/", selected_version, "/rnode_firmware_tbeam.bootloader"]),
"0x10000", UPD_DIR+"/"+selected_version+"/rnode_firmware_tbeam.bin", "0x10000", "".join([UPD_DIR, "/", selected_version, "/rnode_firmware_tbeam.bin"]),
"0x210000",UPD_DIR+"/"+selected_version+"/console_image.bin", "0x210000","".join([UPD_DIR, "/", selected_version, "/console_image.bin"]),
"0x8000", UPD_DIR+"/"+selected_version+"/rnode_firmware_tbeam.partitions", "0x8000", "".join([UPD_DIR, "/", selected_version, "/rnode_firmware_tbeam.partitions"]),
] ]
else: else:
return [ return [
@ -2705,7 +2712,7 @@ def main():
exit(1) exit(1)
else: else:
fw_src = UPD_DIR+"/"+selected_version+"/" fw_src = "".join([UPD_DIR, "/", selected_version, "/"])
if os.path.isfile(fw_src+fw_filename): if os.path.isfile(fw_src+fw_filename):
try: try:
if fw_filename.endswith(".zip"): if fw_filename.endswith(".zip"):
@ -3017,13 +3024,13 @@ def main():
RNS.log("") RNS.log("")
RNS.log("Device info:") RNS.log("Device info:")
RNS.log("\tProduct : "+products[rnode.product]+" "+models[rnode.model][3]+" ("+bytes([rnode.product]).hex()+":"+bytes([rnode.model]).hex()+board_string+")") RNS.log("".join(["\tProduct : ", products[rnode.product], " ", models[rnode.model][3], " (", bytes([rnode.product]).hex(), ":", bytes([rnode.model]).hex(), board_string, ")"]))
RNS.log("\tDevice signature : "+sigstring) RNS.log("\tDevice signature : "+sigstring)
RNS.log("\tFirmware version : "+rnode.version) RNS.log("\tFirmware version : "+rnode.version)
RNS.log("\tHardware revision : "+str(int(rnode.hw_rev))) RNS.log("\tHardware revision : "+str(int(rnode.hw_rev)))
RNS.log("\tSerial number : "+RNS.hexrep(rnode.serialno)) RNS.log("\tSerial number : "+RNS.hexrep(rnode.serialno))
RNS.log("\tModem chip : "+str(models[rnode.model][5])) RNS.log("\tModem chip : "+str(models[rnode.model][5]))
RNS.log("\tFrequency range : "+str(rnode.min_freq/1e6)+" MHz - "+str(rnode.max_freq/1e6)+" MHz") RNS.log("".join(["\tFrequency range : ", str(rnode.min_freq/1e6), " MHz - ", str(rnode.max_freq/1e6), " MHz"]))
RNS.log("\tMax TX power : "+str(rnode.max_output)+" dBm") RNS.log("\tMax TX power : "+str(rnode.max_output)+" dBm")
RNS.log("\tManufactured : "+timestring) RNS.log("\tManufactured : "+timestring)
@ -3040,7 +3047,7 @@ def main():
RNS.log("\tDevice mode : TNC") RNS.log("\tDevice mode : TNC")
RNS.log("\t Frequency : "+str((rnode.conf_frequency/1000000.0))+" MHz") RNS.log("\t Frequency : "+str((rnode.conf_frequency/1000000.0))+" MHz")
RNS.log("\t Bandwidth : "+str(rnode.conf_bandwidth/1000.0)+" KHz") RNS.log("\t Bandwidth : "+str(rnode.conf_bandwidth/1000.0)+" KHz")
RNS.log("\t TX power : "+str(rnode.conf_txpower)+" dBm ("+str(txp_mw)+" mW)") RNS.log("".join(["\t TX power : ", str(rnode.conf_txpower), " dBm (", str(txp_mw), " mW)"]))
RNS.log("\t Spreading factor : "+str(rnode.conf_sf)) RNS.log("\t Spreading factor : "+str(rnode.conf_sf))
RNS.log("\t Coding rate : "+str(rnode.conf_cr)) RNS.log("\t Coding rate : "+str(rnode.conf_cr))
RNS.log("\t On-air bitrate : "+str(rnode.bitrate_kbps)+" kbps") RNS.log("\t On-air bitrate : "+str(rnode.bitrate_kbps)+" kbps")
@ -3241,7 +3248,7 @@ def main():
vf.close() vf.close()
else: else:
partition_filename = fw_filename.replace(".zip", ".bin") partition_filename = fw_filename.replace(".zip", ".bin")
partition_hash = get_partition_hash(UPD_DIR+"/"+selected_version+"/"+partition_filename) partition_hash = get_partition_hash("".join([UPD_DIR, "/", selected_version, "/", partition_filename]))
if partition_hash != None: if partition_hash != None:
time.sleep(0.75) time.sleep(0.75)

View File

@ -27,13 +27,14 @@ import argparse
from RNS._version import __version__ from RNS._version import __version__
def size_str(num, suffix='B'): def size_str(num, suffix='B'):
units = ['','K','M','G','T','P','E','Z'] units = ['', 'K', 'M', 'G', 'T', 'P', 'E', 'Z']
last_unit = 'Y' last_unit = 'Y'
if suffix == 'b': if suffix == 'b':
num *= 8 num *= 8
units = ['','K','M','G','T','P','E','Z'] units = ['', 'K', 'M', 'G', 'T', 'P', 'E', 'Z']
last_unit = 'Y' last_unit = 'Y'
for unit in units: for unit in units:
@ -46,6 +47,14 @@ def size_str(num, suffix='B'):
return "%.2f%s%s" % (num, last_unit, suffix) return "%.2f%s%s" % (num, last_unit, suffix)
def convert_bytes_to_hex(value_obj):
if isinstance(value_obj, bytes):
return RNS.hexrep(value_obj, delimit=False)
elif isinstance(value_obj, dict):
return {key: convert_bytes_to_hex(value) for key, value in value_obj.items()}
return value_obj
def program_setup(configdir, dispall=False, verbosity=0, name_filter=None, json=False, astats=False, sorting=None, sort_reverse=False): def program_setup(configdir, dispall=False, verbosity=0, name_filter=None, json=False, astats=False, sorting=None, sort_reverse=False):
reticulum = RNS.Reticulum(configdir = configdir, loglevel = 3+verbosity) reticulum = RNS.Reticulum(configdir = configdir, loglevel = 3+verbosity)
@ -55,44 +64,37 @@ def program_setup(configdir, dispall=False, verbosity=0, name_filter=None, json=
except Exception as e: except Exception as e:
pass pass
if stats != None: if stats is None:
print("Could not get RNS status")
return
if json: if json:
import json import json
for s in stats: for s, value in stats.items():
if isinstance(stats[s], bytes): stats[s] = convert_bytes_to_hex(value)
stats[s] = RNS.hexrep(stats[s], delimit=False)
if isinstance(stats[s], dict):
for i in stats[s]:
if isinstance(i, dict):
for k in i:
if isinstance(i[k], bytes):
i[k] = RNS.hexrep(i[k], delimit=False)
print(json.dumps(stats)) print(json.dumps(stats))
exit() exit()
interfaces = stats["interfaces"] interfaces = stats["interfaces"]
if sorting != None and isinstance(sorting, str): if sorting is not None and isinstance(sorting, str):
sorting = sorting.lower() sorting = sorting.lower()
if sorting == "rate" or sorting == "bitrate": if sorting == "rate" or sorting == "bitrate":
interfaces.sort(key=lambda i: i["bitrate"], reverse=not sort_reverse) interfaces.sort(key=lambda i: i["bitrate"], reverse=not sort_reverse)
if sorting == "rx": elif sorting == "rx":
interfaces.sort(key=lambda i: i["rxb"], reverse=not sort_reverse) interfaces.sort(key=lambda i: i["rxb"], reverse=not sort_reverse)
if sorting == "tx": elif sorting == "tx":
interfaces.sort(key=lambda i: i["txb"], reverse=not sort_reverse) interfaces.sort(key=lambda i: i["txb"], reverse=not sort_reverse)
if sorting == "traffic": elif sorting == "traffic":
interfaces.sort(key=lambda i: i["rxb"]+i["txb"], reverse=not sort_reverse) interfaces.sort(key=lambda i: i["rxb"]+i["txb"], reverse=not sort_reverse)
if sorting == "announces" or sorting == "announce": elif sorting == "announces" or sorting == "announce":
interfaces.sort(key=lambda i: i["incoming_announce_frequency"]+i["outgoing_announce_frequency"], reverse=not sort_reverse) interfaces.sort(key=lambda i: i["incoming_announce_frequency"]+i["outgoing_announce_frequency"], reverse=not sort_reverse)
if sorting == "arx": elif sorting == "arx":
interfaces.sort(key=lambda i: i["incoming_announce_frequency"], reverse=not sort_reverse) interfaces.sort(key=lambda i: i["incoming_announce_frequency"], reverse=not sort_reverse)
if sorting == "atx": elif sorting == "atx":
interfaces.sort(key=lambda i: i["outgoing_announce_frequency"], reverse=not sort_reverse) interfaces.sort(key=lambda i: i["outgoing_announce_frequency"], reverse=not sort_reverse)
if sorting == "held": elif sorting == "held":
interfaces.sort(key=lambda i: i["held_announces"], reverse=not sort_reverse) interfaces.sort(key=lambda i: i["held_announces"], reverse=not sort_reverse)
for ifstat in interfaces: for ifstat in interfaces:
name = ifstat["name"] name = ifstat["name"]
@ -125,8 +127,7 @@ def program_setup(configdir, dispall=False, verbosity=0, name_filter=None, json=
else: else:
modestr = "Full" modestr = "Full"
if ifstat["clients"] is not None:
if ifstat["clients"] != None:
clients = ifstat["clients"] clients = ifstat["clients"]
if name.startswith("Shared Instance["): if name.startswith("Shared Instance["):
cnum = max(clients-1,0) cnum = max(clients-1,0)
@ -216,8 +217,6 @@ def program_setup(configdir, dispall=False, verbosity=0, name_filter=None, json=
print("") print("")
else:
print("Could not get RNS status")
def main(): def main():
try: try:
@ -292,6 +291,7 @@ def main():
print("") print("")
exit() exit()
def speed_str(num, suffix='bps'): def speed_str(num, suffix='bps'):
units = ['','k','M','G','T','P','E','Z'] units = ['','k','M','G','T','P','E','Z']
last_unit = 'Y' last_unit = 'Y'
@ -308,5 +308,6 @@ def speed_str(num, suffix='bps'):
return "%.2f %s%s" % (num, last_unit, suffix) return "%.2f %s%s" % (num, last_unit, suffix)
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@ -28,7 +28,7 @@ import argparse
import shlex import shlex
import time import time
import sys import sys
import tty # import tty # Commented this because commented lines below use it
import os import os
from RNS._version import __version__ from RNS._version import __version__
@ -39,9 +39,10 @@ reticulum = None
allow_all = False allow_all = False
allowed_identity_hashes = [] allowed_identity_hashes = []
def prepare_identity(identity_path): def prepare_identity(identity_path):
global identity global identity
if identity_path == None: if identity_path is None:
identity_path = RNS.Reticulum.identitypath+"/"+APP_NAME identity_path = RNS.Reticulum.identitypath+"/"+APP_NAME
if os.path.isfile(identity_path): if os.path.isfile(identity_path):
@ -52,6 +53,7 @@ def prepare_identity(identity_path):
identity = RNS.Identity() identity = RNS.Identity()
identity.to_file(identity_path) identity.to_file(identity_path)
def listen(configdir, identitypath = None, verbosity = 0, quietness = 0, allowed = [], print_identity = False, disable_auth = None, disable_announce=False): def listen(configdir, identitypath = None, verbosity = 0, quietness = 0, allowed = [], print_identity = False, disable_auth = None, disable_announce=False):
global identity, allow_all, allowed_identity_hashes, reticulum global identity, allow_all, allowed_identity_hashes, reticulum
@ -69,7 +71,7 @@ def listen(configdir, identitypath = None, verbosity = 0, quietness = 0, allowed
if disable_auth: if disable_auth:
allow_all = True allow_all = True
else: else:
if allowed != None: if allowed is not None:
for a in allowed: for a in allowed:
try: try:
dest_len = (RNS.Reticulum.TRUNCATED_HASHLENGTH//8)*2 dest_len = (RNS.Reticulum.TRUNCATED_HASHLENGTH//8)*2
@ -297,6 +299,7 @@ def remote_execution_progress(request_receipt):
link = None link = None
listener_destination = None listener_destination = None
remote_exec_grace = 2.0 remote_exec_grace = 2.0
def execute(configdir, identitypath = None, verbosity = 0, quietness = 0, detailed = False, mirror = False, noid = False, destination = None, command = None, stdin = None, stdoutl = None, stderrl = None, timeout = RNS.Transport.PATH_REQUEST_TIMEOUT, result_timeout = None, interactive = False): def execute(configdir, identitypath = None, verbosity = 0, quietness = 0, detailed = False, mirror = False, noid = False, destination = None, command = None, stdin = None, stdoutl = None, stderrl = None, timeout = RNS.Transport.PATH_REQUEST_TIMEOUT, result_timeout = None, interactive = False):
global identity, reticulum, link, listener_destination, remote_exec_grace global identity, reticulum, link, listener_destination, remote_exec_grace
@ -548,7 +551,6 @@ def main():
parser.add_argument("--stdout", action='store', default=None, help="max size in bytes of returned stdout", type=int) parser.add_argument("--stdout", action='store', default=None, help="max size in bytes of returned stdout", type=int)
parser.add_argument("--stderr", action='store', default=None, help="max size in bytes of returned stderr", type=int) parser.add_argument("--stderr", action='store', default=None, help="max size in bytes of returned stderr", type=int)
parser.add_argument("--version", action="version", version="rnx {version}".format(version=__version__)) parser.add_argument("--version", action="version", version="rnx {version}".format(version=__version__))
args = parser.parse_args() args = parser.parse_args()
if args.listen or args.print_identity: if args.listen or args.print_identity:
@ -668,6 +670,7 @@ def size_str(num, suffix='B'):
return "%.2f%s%s" % (num, last_unit, suffix) return "%.2f%s%s" % (num, last_unit, suffix)
def pretty_time(time, verbose=False): def pretty_time(time, verbose=False):
days = int(time // (24 * 3600)) days = int(time // (24 * 3600))
time = time % (24 * 3600) time = time % (24 * 3600)
@ -684,31 +687,24 @@ def pretty_time(time, verbose=False):
components = [] components = []
if days > 0: if days > 0:
components.append(str(days)+" day"+sd if verbose else str(days)+"d") components.append("".join([str(days), " day", sd if verbose else str(days), "d"]))
if hours > 0: if hours > 0:
components.append(str(hours)+" hour"+sh if verbose else str(hours)+"h") components.append("".join([str(hours), " hour", sh if verbose else str(hours), "h"]))
if minutes > 0: if minutes > 0:
components.append(str(minutes)+" minute"+sm if verbose else str(minutes)+"m") components.append("".join([str(minutes), " minute", sm if verbose else str(minutes), "m"]))
if seconds > 0: if seconds > 0:
components.append(str(seconds)+" second"+ss if verbose else str(seconds)+"s") components.append("".join([str(seconds), " second", ss if verbose else str(seconds), "s"]))
i = 0 if len(components) > 1:
tstr = "" time_string = "".join([", ".join(components[:-1]), " and ", components[-1]])
for c in components: else:
i += 1 time_string = components[0] if components else ""
if i == 1:
pass
elif i < len(components):
tstr += ", "
elif i == len(components):
tstr += " and "
tstr += c return time_string
return tstr
if __name__ == "__main__": if __name__ == "__main__":
main() main()