From cf41187e2f2b9923f0c2ab1c7ea88efab881978a Mon Sep 17 00:00:00 2001 From: nothingbutlucas <69118979+nothingbutlucas@users.noreply.github.com> Date: Wed, 24 Apr 2024 00:24:22 -0300 Subject: [PATCH 01/11] refactor: Change for loop to list comprehensions Not only more concise and shorter, it's notably faster Signed-off-by: nothingbutlucas <69118979+nothingbutlucas@users.noreply.github.com> --- RNS/Utilities/rnid.py | 28 ++++++++++++---------------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/RNS/Utilities/rnid.py b/RNS/Utilities/rnid.py index 97de3e0..3640b82 100644 --- a/RNS/Utilities/rnid.py +++ b/RNS/Utilities/rnid.py @@ -82,7 +82,7 @@ def main(): parser.add_argument("-w", "--write", metavar="file", action="store", default=None, help="output file path", type=str) parser.add_argument("-f", "--force", action="store_true", default=None, help="write output even if it overwrites existing files") parser.add_argument("-I", "--stdin", action="store_true", default=False, help=argparse.SUPPRESS) # "read input from STDIN instead of file" - parser.add_argument("-O", "--stdout", action="store_true", default=False, help=argparse.SUPPRESS) # help="write output to STDOUT instead of file", + parser.add_argument("-O", "--stdout", action="store_true", default=False, help=argparse.SUPPRESS) # help="write output to STDOUT instead of file", parser.add_argument("-R", "--request", action="store_true", default=False, help="request unknown Identities from the network") parser.add_argument("-t", action="store", metavar="seconds", type=float, help="identity request timeout before giving up", default=RNS.Transport.PATH_REQUEST_TIMEOUT) @@ -93,14 +93,10 @@ def main(): parser.add_argument("-B", "--base32", action="store_true", default=False, help="Use base32-encoded input and output") parser.add_argument("--version", action="version", version="rnid {version}".format(version=__version__)) - + args = parser.parse_args() - ops = 0; - for t in [args.encrypt, args.decrypt, args.validate, args.sign]: - if t: - ops += 1 - + ops = sum(1 for t in [args.encrypt, args.decrypt, args.validate, args.sign] if t) if ops > 1: RNS.log("This utility currently only supports one of the encrypt, decrypt, sign or verify operations per invocation", RNS.LOG_ERROR) exit(1) @@ -179,7 +175,7 @@ def main(): quietness = args.quiet if verbosity != 0 or quietness != 0: targetloglevel = targetloglevel+verbosity-quietness - + # Start Reticulum reticulum = RNS.Reticulum(configdir=args.config, loglevel=targetloglevel) RNS.compact_log_fmt = True @@ -234,7 +230,7 @@ def main(): RNS.log("Invalid hexadecimal hash provided", RNS.LOG_ERROR) exit(7) - + else: # Try loading Identity from file if not os.path.isfile(identity_str): @@ -391,7 +387,7 @@ def main(): RNS.log("Could not open output file for writing", RNS.LOG_ERROR) RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR) exit(15) - + # TODO: Actually expand this to a good solution # probably need to create a wrapper that takes # into account not closing stdout when done @@ -415,12 +411,12 @@ def main(): if not args.stdout: RNS.log("Signing "+str(args.read)) - + try: data_output.write(identity.sign(data_input.read())) data_output.close() data_input.close() - + if not args.stdout: if args.read: RNS.log("File "+str(args.read)+" signed with "+str(identity)+" to "+str(args.write)) @@ -448,7 +444,7 @@ def main(): else: # if not args.stdout: # RNS.log("Verifying "+str(args.validate)+" for "+str(args.read)) - + try: try: sig_input = open(args.validate, "rb") @@ -498,7 +494,7 @@ def main(): if not args.stdout: RNS.log("Encrypting "+str(args.read)) - + try: more_data = True while more_data: @@ -545,7 +541,7 @@ def main(): if not args.stdout: RNS.log("Decrypting "+str(args.read)+"...") - + try: more_data = True while more_data: @@ -597,4 +593,4 @@ def main(): exit(255) if __name__ == "__main__": - main() \ No newline at end of file + main() From 7159e6a52336309e6590b020977a37fefd968f19 Mon Sep 17 00:00:00 2001 From: nothingbutlucas <69118979+nothingbutlucas@users.noreply.github.com> Date: Wed, 24 Apr 2024 00:32:11 -0300 Subject: [PATCH 02/11] style: Change comparations according PEP8 According PEP8 this is the recommended way to do it. I'm 99% sure that this does not speed up the code, but we are comparing identity, not equality, so "is" and "is not" must be used. Signed-off-by: nothingbutlucas <69118979+nothingbutlucas@users.noreply.github.com> --- RNS/Utilities/rnid.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/RNS/Utilities/rnid.py b/RNS/Utilities/rnid.py index 3640b82..ec4c4f0 100644 --- a/RNS/Utilities/rnid.py +++ b/RNS/Utilities/rnid.py @@ -40,11 +40,11 @@ CHUNK_SIZE = 16*1024*1024 def spin(until=None, msg=None, timeout=None): i = 0 syms = "⢄⢂⢁⡁⡈⡐⡠" - if timeout != None: + if timeout is not None: timeout = time.time()+timeout print(msg+" ", end=" ") - while (timeout == None or time.time() timeout: + if timeout is not None and time.time() > timeout: return False else: return True @@ -204,7 +204,7 @@ def main(): destination_hash = bytes.fromhex(identity_str) identity = RNS.Identity.recall(destination_hash) - if identity == None: + if identity is None: if not args.request: RNS.log("Could not recall Identity for "+RNS.prettyhexrep(destination_hash)+".", RNS.LOG_ERROR) RNS.log("You can query the network for unknown Identities with the -R option.", RNS.LOG_ERROR) @@ -212,7 +212,7 @@ def main(): else: RNS.Transport.request_path(destination_hash) def spincheck(): - return RNS.Identity.recall(destination_hash) != None + return RNS.Identity.recall(destination_hash) is not None spin(spincheck, "Requesting unknown Identity for "+RNS.prettyhexrep(destination_hash), args.t) if not spincheck(): @@ -245,7 +245,7 @@ def main(): RNS.log("Could not decode Identity from specified file") exit(9) - if identity != None: + if identity is not None: if args.hash: try: aspects = args.hash.split(".") @@ -255,7 +255,7 @@ def main(): else: app_name = aspects[0] aspects = aspects[1:] - if identity.pub != None: + if identity.pub is not None: destination = RNS.Destination(identity, RNS.Destination.OUT, RNS.Destination.SINGLE, app_name, *aspects) RNS.log("The "+str(args.hash)+" destination for this Identity is "+RNS.prettyhexrep(destination.hash)) RNS.log("The full destination specifier is "+str(destination)) @@ -278,7 +278,7 @@ def main(): else: app_name = aspects[0] aspects = aspects[1:] - if identity.prv != None: + if identity.prv is not None: destination = RNS.Destination(identity, RNS.Destination.IN, RNS.Destination.SINGLE, app_name, *aspects) RNS.log("Created destination "+str(destination)) RNS.log("Announcing destination "+RNS.prettyhexrep(destination.hash)) @@ -369,7 +369,7 @@ def main(): if args.decrypt and not args.write and not args.stdout and args.read and args.read.lower().endswith("."+ENCRYPT_EXT): args.write = str(args.read).replace("."+ENCRYPT_EXT, "") - if args.sign and identity.prv == None: + if args.sign and identity.prv is None: RNS.log("Specified Identity does not hold a private key. Cannot sign.", RNS.LOG_ERROR) exit(14) @@ -395,7 +395,7 @@ def main(): # data_output = sys.stdout if args.sign: - if identity.prv == None: + if identity.prv is None: RNS.log("Specified Identity does not hold a private key. Cannot sign.", RNS.LOG_ERROR) exit(16) @@ -525,7 +525,7 @@ def main(): exit(26) if args.decrypt: - if identity.prv == None: + if identity.prv is None: RNS.log("Specified Identity does not hold a private key. Cannot decrypt.", RNS.LOG_ERROR) exit(27) @@ -548,7 +548,7 @@ def main(): chunk = data_input.read(CHUNK_SIZE) if chunk: plaintext = identity.decrypt(chunk) - if plaintext == None: + if plaintext is None: if not args.stdout: RNS.log("Data could not be decrypted with the specified Identity") exit(30) From 794a5f4401919d5be10043f123824d7b1fef25b8 Mon Sep 17 00:00:00 2001 From: nothingbutlucas <69118979+nothingbutlucas@users.noreply.github.com> Date: Wed, 24 Apr 2024 00:34:08 -0300 Subject: [PATCH 03/11] refactor: Remove unused code Signed-off-by: nothingbutlucas <69118979+nothingbutlucas@users.noreply.github.com> --- RNS/Utilities/rnid.py | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/RNS/Utilities/rnid.py b/RNS/Utilities/rnid.py index ec4c4f0..e779b81 100644 --- a/RNS/Utilities/rnid.py +++ b/RNS/Utilities/rnid.py @@ -577,17 +577,6 @@ def main(): pass exit(31) - if True: - pass - - elif False: - pass - - else: - print("") - parser.print_help() - print("") - except KeyboardInterrupt: print("") exit(255) From 8d1a0003788477b2ba3a71e5dfed45b4ed0f2405 Mon Sep 17 00:00:00 2001 From: nothingbutlucas <69118979+nothingbutlucas@users.noreply.github.com> Date: Wed, 24 Apr 2024 00:43:33 -0300 Subject: [PATCH 04/11] refactor: Change method for string concatenations Strings in Python are immutable so, the "+" operation means that we need to create the string again and attaching the old content. Per "+". We can replace this by using join() or built-in functions or arguments of functions, like "sep" in print() Signed-off-by: nothingbutlucas <69118979+nothingbutlucas@users.noreply.github.com> --- RNS/Utilities/rnid.py | 35 +++++++++++++++++------------------ 1 file changed, 17 insertions(+), 18 deletions(-) diff --git a/RNS/Utilities/rnid.py b/RNS/Utilities/rnid.py index e779b81..71cb0ee 100644 --- a/RNS/Utilities/rnid.py +++ b/RNS/Utilities/rnid.py @@ -37,20 +37,21 @@ SIG_EXT = "rsg" ENCRYPT_EXT = "rfe" CHUNK_SIZE = 16*1024*1024 -def spin(until=None, msg=None, timeout=None): + +def spin(until=None, msg="", timeout=None): i = 0 syms = "⢄⢂⢁⡁⡈⡐⡠" if timeout is not None: timeout = time.time()+timeout - print(msg+" ", end=" ") + print(msg, end=" ") while (timeout is None or time.time() timeout: return False @@ -220,17 +221,15 @@ def main(): exit(6) else: identity = RNS.Identity.recall(destination_hash) - RNS.log("Received Identity "+str(identity)+" for destination "+RNS.prettyhexrep(destination_hash)+" from the network") + RNS.log(" ".join(["Received Identity", str(identity), "for destination", RNS.prettyhexrep(destination_hash), "from the network"])) else: - RNS.log("Recalled Identity "+str(identity)+" for destination "+RNS.prettyhexrep(destination_hash)) - + RNS.log(" ".join(["Recalled Identity", str(identity), "for destination", RNS.prettyhexrep(destination_hash)])) except Exception as e: RNS.log("Invalid hexadecimal hash provided", RNS.LOG_ERROR) exit(7) - else: # Try loading Identity from file if not os.path.isfile(identity_str): @@ -239,7 +238,7 @@ def main(): else: try: identity = RNS.Identity.from_file(identity_str) - RNS.log("Loaded Identity "+str(identity)+" from "+str(identity_str)) + RNS.log(" ".join(["Loaded Identity", str(identity), "from", str(identity_str)])) except Exception as e: RNS.log("Could not decode Identity from specified file") @@ -257,7 +256,7 @@ def main(): aspects = aspects[1:] if identity.pub is not None: destination = RNS.Destination(identity, RNS.Destination.OUT, RNS.Destination.SINGLE, app_name, *aspects) - RNS.log("The "+str(args.hash)+" destination for this Identity is "+RNS.prettyhexrep(destination.hash)) + RNS.log(" ".join(["The", str(args.hash), "destination for this Identity is", RNS.prettyhexrep(destination.hash)])) RNS.log("The full destination specifier is "+str(destination)) time.sleep(0.25) exit(0) @@ -287,7 +286,7 @@ def main(): exit(0) else: destination = RNS.Destination(identity, RNS.Destination.OUT, RNS.Destination.SINGLE, app_name, *aspects) - RNS.log("The "+str(args.announce)+" destination for this Identity is "+RNS.prettyhexrep(destination.hash)) + RNS.log(" ".join(["The", str(args.announce), "destination for this Identity is", RNS.prettyhexrep(destination.hash)])) RNS.log("The full destination specifier is "+str(destination)) RNS.log("Cannot announce this destination, since the private key is not held") time.sleep(0.25) @@ -419,7 +418,7 @@ def main(): if not args.stdout: if args.read: - RNS.log("File "+str(args.read)+" signed with "+str(identity)+" to "+str(args.write)) + RNS.log(" ".join([ "File", str(args.read), "signed with", str(identity), "to", str(args.write) ])) exit(0) except Exception as e: @@ -453,18 +452,17 @@ def main(): RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR) exit(21) - validated = identity.validate(sig_input.read(), data_input.read()) sig_input.close() data_input.close() if not validated: if not args.stdout: - RNS.log("Signature "+str(args.validate)+" for file "+str(args.read)+" is invalid", RNS.LOG_ERROR) + RNS.log(" ".join(["Signature", str(args.validate), "for file", str(args.read), "is invalid"]), RNS.LOG_ERROR) exit(22) else: if not args.stdout: - RNS.log("Signature "+str(args.validate)+" for file "+str(args.read)+" made by Identity "+str(identity)+" is valid") + RNS.log(" ".join(["Signature", str(args.validate), "for file", str(args.read), "made by Identity", str(identity), "is valid"])) exit(0) except Exception as e: @@ -507,7 +505,7 @@ def main(): data_input.close() if not args.stdout: if args.read: - RNS.log("File "+str(args.read)+" encrypted for "+str(identity)+" to "+str(args.write)) + RNS.log(" ".join(["File", str(args.read), "encrypted for", str(identity), "to", str(args.write)])) exit(0) except Exception as e: @@ -540,7 +538,7 @@ def main(): exit(29) if not args.stdout: - RNS.log("Decrypting "+str(args.read)+"...") + RNS.log("".join(["Decrypting ", str(args.read), "..."])) try: more_data = True @@ -560,7 +558,7 @@ def main(): data_input.close() if not args.stdout: if args.read: - RNS.log("File "+str(args.read)+" decrypted with "+str(identity)+" to "+str(args.write)) + RNS.log(" ".join(["File", str(args.read), "decrypted with", str(identity), "to", str(args.write)])) exit(0) except Exception as e: @@ -581,5 +579,6 @@ def main(): print("") exit(255) + if __name__ == "__main__": main() From 87ff1808a2f4d7b378283d24f269ab64a79a7fb2 Mon Sep 17 00:00:00 2001 From: nothingbutlucas <69118979+nothingbutlucas@users.noreply.github.com> Date: Wed, 24 Apr 2024 11:04:26 -0300 Subject: [PATCH 05/11] refactor(main): Reduce complexity of main function This will make it easier for us to maintain and understand the main function. Also deleting duplicated code Signed-off-by: nothingbutlucas <69118979+nothingbutlucas@users.noreply.github.com> --- RNS/Utilities/rnid.py | 56 +++++++++++++++++++------------------------ 1 file changed, 24 insertions(+), 32 deletions(-) diff --git a/RNS/Utilities/rnid.py b/RNS/Utilities/rnid.py index 71cb0ee..02c5eef 100644 --- a/RNS/Utilities/rnid.py +++ b/RNS/Utilities/rnid.py @@ -58,6 +58,27 @@ def spin(until=None, msg="", timeout=None): else: return True + +def get_keys(args: argparse.Namespace, identity) -> None: + """Get public or/and private keys""" + + if args.base64: + RNS.log("Public Key : " + base64.urlsafe_b64encode(identity.get_public_key()).decode("utf-8")) + elif args.base32: + RNS.log("Public Key : " + base64.b32encode(identity.get_public_key()).decode("utf-8")) + else: + RNS.log("Public Key : " + RNS.hexrep(identity.get_public_key(), delimit=False)) + if identity.prv: + if args.print_private: + if args.base64: + RNS.log("Private Key : " + base64.urlsafe_b64encode(identity.get_private_key()).decode("utf-8")) + elif args.base32: + RNS.log("Private Key : " + base64.b32encode(identity.get_private_key()).decode("utf-8")) + else: + RNS.log("Private Key : " + RNS.hexrep(identity.get_private_key(), delimit=False)) + else: + RNS.log("Private Key : Hidden") + def main(): try: parser = argparse.ArgumentParser(description="Reticulum Identity & Encryption Utility") @@ -131,22 +152,8 @@ def main(): exit(42) RNS.log("Identity imported") - if args.base64: - RNS.log("Public Key : "+base64.urlsafe_b64encode(identity.get_public_key()).decode("utf-8")) - elif args.base32: - RNS.log("Public Key : "+base64.b32encode(identity.get_public_key()).decode("utf-8")) - else: - RNS.log("Public Key : "+RNS.hexrep(identity.get_public_key(), delimit=False)) - if identity.prv: - if args.print_private: - if args.base64: - RNS.log("Private Key : "+base64.urlsafe_b64encode(identity.get_private_key()).decode("utf-8")) - elif args.base32: - RNS.log("Private Key : "+base64.b32encode(identity.get_private_key()).decode("utf-8")) - else: - RNS.log("Private Key : "+RNS.hexrep(identity.get_private_key(), delimit=False)) - else: - RNS.log("Private Key : Hidden") + + get_keys(args, identity) if args.write: try: @@ -298,22 +305,7 @@ def main(): exit(0) if args.print_identity: - if args.base64: - RNS.log("Public Key : "+base64.urlsafe_b64encode(identity.get_public_key()).decode("utf-8")) - elif args.base32: - RNS.log("Public Key : "+base64.b32encode(identity.get_public_key()).decode("utf-8")) - else: - RNS.log("Public Key : "+RNS.hexrep(identity.get_public_key(), delimit=False)) - if identity.prv: - if args.print_private: - if args.base64: - RNS.log("Private Key : "+base64.urlsafe_b64encode(identity.get_private_key()).decode("utf-8")) - elif args.base32: - RNS.log("Private Key : "+base64.b32encode(identity.get_private_key()).decode("utf-8")) - else: - RNS.log("Private Key : "+RNS.hexrep(identity.get_private_key(), delimit=False)) - else: - RNS.log("Private Key : Hidden") + get_keys(args, identity) exit(0) if args.export: From 541d3cde48cbd88b4aaf16a167af1fea57e264be Mon Sep 17 00:00:00 2001 From: nothingbutlucas <69118979+nothingbutlucas@users.noreply.github.com> Date: Wed, 24 Apr 2024 11:28:00 -0300 Subject: [PATCH 06/11] style: Format the code a little bit This will remove the unnecesary import of time, and a little bit of format Signed-off-by: nothingbutlucas <69118979+nothingbutlucas@users.noreply.github.com> --- RNS/Utilities/rnir.py | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/RNS/Utilities/rnir.py b/RNS/Utilities/rnir.py index 0e4bf9e..77e60ea 100644 --- a/RNS/Utilities/rnir.py +++ b/RNS/Utilities/rnir.py @@ -24,23 +24,23 @@ import RNS import argparse -import time from RNS._version import __version__ -def program_setup(configdir, verbosity = 0, quietness = 0, service = False): - targetverbosity = verbosity-quietness +def program_setup(configdir, verbosity=0, quietness=0, service=False): + targetverbosity = verbosity - quietness if service: - targetlogdest = RNS.LOG_FILE + targetlogdest = RNS.LOG_FILE targetverbosity = None else: - targetlogdest = RNS.LOG_STDOUT + targetlogdest = RNS.LOG_STDOUT - reticulum = RNS.Reticulum(configdir=configdir, verbosity=targetverbosity, logdest=targetlogdest) + RNS.Reticulum(configdir=configdir, verbosity=targetverbosity, logdest=targetlogdest) exit(0) + def main(): try: parser = argparse.ArgumentParser(description="Reticulum Distributed Identity Resolver") @@ -49,24 +49,23 @@ def main(): parser.add_argument('-q', '--quiet', action='count', default=0) parser.add_argument("--exampleconfig", action='store_true', default=False, help="print verbose configuration example to stdout and exit") parser.add_argument("--version", action="version", version="ir {version}".format(version=__version__)) - args = parser.parse_args() if args.exampleconfig: print(__example_rns_config__) exit() + configarg = None if args.config: configarg = args.config - else: - configarg = None - program_setup(configdir = configarg, verbosity=args.verbose, quietness=args.quiet) + program_setup(configdir=configarg, verbosity=args.verbose, quietness=args.quiet) except KeyboardInterrupt: print("") exit() + __example_rns_config__ = '''# This is an example Identity Resolver file. ''' From 059c2381b5fd32eef5bd3714f594541516a91bf6 Mon Sep 17 00:00:00 2001 From: nothingbutlucas <69118979+nothingbutlucas@users.noreply.github.com> Date: Wed, 24 Apr 2024 11:52:02 -0300 Subject: [PATCH 07/11] refactor(if/else/return): Remove indentation This may seems like a lot, but the only think that I change here was the if/else block. I move the else statement at the beginning and if that condition is true, will exit the function. This is just for code clarity, I think that this change should not improve the speed or performance of the code. Signed-off-by: nothingbutlucas <69118979+nothingbutlucas@users.noreply.github.com> --- RNS/Utilities/rnstatus.py | 340 +++++++++++++++++++------------------- 1 file changed, 171 insertions(+), 169 deletions(-) diff --git a/RNS/Utilities/rnstatus.py b/RNS/Utilities/rnstatus.py index 835df74..5b80308 100644 --- a/RNS/Utilities/rnstatus.py +++ b/RNS/Utilities/rnstatus.py @@ -27,13 +27,14 @@ import argparse from RNS._version import __version__ + def size_str(num, suffix='B'): - units = ['','K','M','G','T','P','E','Z'] + units = ['', 'K', 'M', 'G', 'T', 'P', 'E', 'Z'] last_unit = 'Y' if suffix == 'b': num *= 8 - units = ['','K','M','G','T','P','E','Z'] + units = ['', 'K', 'M', 'G', 'T', 'P', 'E', 'Z'] last_unit = 'Y' for unit in units: @@ -46,6 +47,7 @@ def size_str(num, suffix='B'): return "%.2f%s%s" % (num, last_unit, suffix) + def program_setup(configdir, dispall=False, verbosity=0, name_filter=None, json=False, astats=False, sorting=None, sort_reverse=False): reticulum = RNS.Reticulum(configdir = configdir, loglevel = 3+verbosity) @@ -55,169 +57,169 @@ def program_setup(configdir, dispall=False, verbosity=0, name_filter=None, json= except Exception as e: pass - if stats != None: - if json: - import json - for s in stats: - if isinstance(stats[s], bytes): - stats[s] = RNS.hexrep(stats[s], delimit=False) - - if isinstance(stats[s], dict): - for i in stats[s]: - if isinstance(i, dict): - for k in i: - if isinstance(i[k], bytes): - i[k] = RNS.hexrep(i[k], delimit=False) - - print(json.dumps(stats)) - exit() - - interfaces = stats["interfaces"] - if sorting != None and isinstance(sorting, str): - sorting = sorting.lower() - if sorting == "rate" or sorting == "bitrate": - interfaces.sort(key=lambda i: i["bitrate"], reverse=not sort_reverse) - if sorting == "rx": - interfaces.sort(key=lambda i: i["rxb"], reverse=not sort_reverse) - if sorting == "tx": - interfaces.sort(key=lambda i: i["txb"], reverse=not sort_reverse) - if sorting == "traffic": - interfaces.sort(key=lambda i: i["rxb"]+i["txb"], reverse=not sort_reverse) - if sorting == "announces" or sorting == "announce": - interfaces.sort(key=lambda i: i["incoming_announce_frequency"]+i["outgoing_announce_frequency"], reverse=not sort_reverse) - if sorting == "arx": - interfaces.sort(key=lambda i: i["incoming_announce_frequency"], reverse=not sort_reverse) - if sorting == "atx": - interfaces.sort(key=lambda i: i["outgoing_announce_frequency"], reverse=not sort_reverse) - if sorting == "held": - interfaces.sort(key=lambda i: i["held_announces"], reverse=not sort_reverse) - - - for ifstat in interfaces: - name = ifstat["name"] - - if dispall or not ( - name.startswith("LocalInterface[") or - name.startswith("TCPInterface[Client") or - name.startswith("I2PInterfacePeer[Connected peer") or - (name.startswith("I2PInterface[") and ("i2p_connectable" in ifstat and ifstat["i2p_connectable"] == False)) - ): - - if not (name.startswith("I2PInterface[") and ("i2p_connectable" in ifstat and ifstat["i2p_connectable"] == False)): - if name_filter == None or name_filter.lower() in name.lower(): - print("") - - if ifstat["status"]: - ss = "Up" - else: - ss = "Down" - - if ifstat["mode"] == RNS.Interfaces.Interface.Interface.MODE_ACCESS_POINT: - modestr = "Access Point" - elif ifstat["mode"] == RNS.Interfaces.Interface.Interface.MODE_POINT_TO_POINT: - modestr = "Point-to-Point" - elif ifstat["mode"] == RNS.Interfaces.Interface.Interface.MODE_ROAMING: - modestr = "Roaming" - elif ifstat["mode"] == RNS.Interfaces.Interface.Interface.MODE_BOUNDARY: - modestr = "Boundary" - elif ifstat["mode"] == RNS.Interfaces.Interface.Interface.MODE_GATEWAY: - modestr = "Gateway" - else: - modestr = "Full" - - - if ifstat["clients"] != None: - clients = ifstat["clients"] - if name.startswith("Shared Instance["): - cnum = max(clients-1,0) - if cnum == 1: - spec_str = " program" - else: - spec_str = " programs" - - clients_string = "Serving : "+str(cnum)+spec_str - elif name.startswith("I2PInterface["): - if "i2p_connectable" in ifstat and ifstat["i2p_connectable"] == True: - cnum = clients - if cnum == 1: - spec_str = " connected I2P endpoint" - else: - spec_str = " connected I2P endpoints" - - clients_string = "Peers : "+str(cnum)+spec_str - else: - clients_string = "" - else: - clients_string = "Clients : "+str(clients) - - else: - clients = None - - print(" {n}".format(n=ifstat["name"])) - - if "ifac_netname" in ifstat and ifstat["ifac_netname"] != None: - print(" Network : {nn}".format(nn=ifstat["ifac_netname"])) - - print(" Status : {ss}".format(ss=ss)) - - if clients != None and clients_string != "": - print(" "+clients_string) - - if not (name.startswith("Shared Instance[") or name.startswith("TCPInterface[Client") or name.startswith("LocalInterface[")): - print(" Mode : {mode}".format(mode=modestr)) - - if "bitrate" in ifstat and ifstat["bitrate"] != None: - print(" Rate : {ss}".format(ss=speed_str(ifstat["bitrate"]))) - - if "airtime_short" in ifstat and "airtime_long" in ifstat: - print(" Airtime : {ats}% (15s), {atl}% (1h)".format(ats=str(ifstat["airtime_short"]),atl=str(ifstat["airtime_long"]))) - - if "channel_load_short" in ifstat and "channel_load_long" in ifstat: - print(" Ch.Load : {ats}% (15s), {atl}% (1h)".format(ats=str(ifstat["channel_load_short"]),atl=str(ifstat["channel_load_long"]))) - - if "peers" in ifstat and ifstat["peers"] != None: - print(" Peers : {np} reachable".format(np=ifstat["peers"])) - - if "tunnelstate" in ifstat and ifstat["tunnelstate"] != None: - print(" I2P : {ts}".format(ts=ifstat["tunnelstate"])) - - if "ifac_signature" in ifstat and ifstat["ifac_signature"] != None: - sigstr = "<…"+RNS.hexrep(ifstat["ifac_signature"][-5:], delimit=False)+">" - print(" Access : {nb}-bit IFAC by {sig}".format(nb=ifstat["ifac_size"]*8, sig=sigstr)) - - if "i2p_b32" in ifstat and ifstat["i2p_b32"] != None: - print(" I2P B32 : {ep}".format(ep=str(ifstat["i2p_b32"]))) - - if astats and "announce_queue" in ifstat and ifstat["announce_queue"] != None and ifstat["announce_queue"] > 0: - aqn = ifstat["announce_queue"] - if aqn == 1: - print(" Queued : {np} announce".format(np=aqn)) - else: - print(" Queued : {np} announces".format(np=aqn)) - - if astats and "held_announces" in ifstat and ifstat["held_announces"] != None and ifstat["held_announces"] > 0: - aqn = ifstat["held_announces"] - if aqn == 1: - print(" Held : {np} announce".format(np=aqn)) - else: - print(" Held : {np} announces".format(np=aqn)) - - if astats and "incoming_announce_frequency" in ifstat and ifstat["incoming_announce_frequency"] != None: - print(" Announces : {iaf}↑".format(iaf=RNS.prettyfrequency(ifstat["outgoing_announce_frequency"]))) - print(" {iaf}↓".format(iaf=RNS.prettyfrequency(ifstat["incoming_announce_frequency"]))) - - print(" Traffic : {txb}↑\n {rxb}↓".format(rxb=size_str(ifstat["rxb"]), txb=size_str(ifstat["txb"]))) - - if "transport_id" in stats and stats["transport_id"] != None: - print("\n Transport Instance "+RNS.prettyhexrep(stats["transport_id"])+" running") - if "probe_responder" in stats and stats["probe_responder"] != None: - print(" Probe responder at "+RNS.prettyhexrep(stats["probe_responder"])+ " active") - print(" Uptime is "+RNS.prettytime(stats["transport_uptime"])) - - print("") - - else: + if stats is None: print("Could not get RNS status") + return + + if json: + import json + for s in stats: + if isinstance(stats[s], bytes): + stats[s] = RNS.hexrep(stats[s], delimit=False) + + if isinstance(stats[s], dict): + for i in stats[s]: + if isinstance(i, dict): + for k in i: + if isinstance(i[k], bytes): + i[k] = RNS.hexrep(i[k], delimit=False) + + print(json.dumps(stats)) + exit() + + interfaces = stats["interfaces"] + if sorting != None and isinstance(sorting, str): + sorting = sorting.lower() + if sorting == "rate" or sorting == "bitrate": + interfaces.sort(key=lambda i: i["bitrate"], reverse=not sort_reverse) + if sorting == "rx": + interfaces.sort(key=lambda i: i["rxb"], reverse=not sort_reverse) + if sorting == "tx": + interfaces.sort(key=lambda i: i["txb"], reverse=not sort_reverse) + if sorting == "traffic": + interfaces.sort(key=lambda i: i["rxb"]+i["txb"], reverse=not sort_reverse) + if sorting == "announces" or sorting == "announce": + interfaces.sort(key=lambda i: i["incoming_announce_frequency"]+i["outgoing_announce_frequency"], reverse=not sort_reverse) + if sorting == "arx": + interfaces.sort(key=lambda i: i["incoming_announce_frequency"], reverse=not sort_reverse) + if sorting == "atx": + interfaces.sort(key=lambda i: i["outgoing_announce_frequency"], reverse=not sort_reverse) + if sorting == "held": + interfaces.sort(key=lambda i: i["held_announces"], reverse=not sort_reverse) + + + for ifstat in interfaces: + name = ifstat["name"] + + if dispall or not ( + name.startswith("LocalInterface[") or + name.startswith("TCPInterface[Client") or + name.startswith("I2PInterfacePeer[Connected peer") or + (name.startswith("I2PInterface[") and ("i2p_connectable" in ifstat and ifstat["i2p_connectable"] == False)) + ): + + if not (name.startswith("I2PInterface[") and ("i2p_connectable" in ifstat and ifstat["i2p_connectable"] == False)): + if name_filter == None or name_filter.lower() in name.lower(): + print("") + + if ifstat["status"]: + ss = "Up" + else: + ss = "Down" + + if ifstat["mode"] == RNS.Interfaces.Interface.Interface.MODE_ACCESS_POINT: + modestr = "Access Point" + elif ifstat["mode"] == RNS.Interfaces.Interface.Interface.MODE_POINT_TO_POINT: + modestr = "Point-to-Point" + elif ifstat["mode"] == RNS.Interfaces.Interface.Interface.MODE_ROAMING: + modestr = "Roaming" + elif ifstat["mode"] == RNS.Interfaces.Interface.Interface.MODE_BOUNDARY: + modestr = "Boundary" + elif ifstat["mode"] == RNS.Interfaces.Interface.Interface.MODE_GATEWAY: + modestr = "Gateway" + else: + modestr = "Full" + + + if ifstat["clients"] != None: + clients = ifstat["clients"] + if name.startswith("Shared Instance["): + cnum = max(clients-1,0) + if cnum == 1: + spec_str = " program" + else: + spec_str = " programs" + + clients_string = "Serving : "+str(cnum)+spec_str + elif name.startswith("I2PInterface["): + if "i2p_connectable" in ifstat and ifstat["i2p_connectable"] == True: + cnum = clients + if cnum == 1: + spec_str = " connected I2P endpoint" + else: + spec_str = " connected I2P endpoints" + + clients_string = "Peers : "+str(cnum)+spec_str + else: + clients_string = "" + else: + clients_string = "Clients : "+str(clients) + + else: + clients = None + + print(" {n}".format(n=ifstat["name"])) + + if "ifac_netname" in ifstat and ifstat["ifac_netname"] != None: + print(" Network : {nn}".format(nn=ifstat["ifac_netname"])) + + print(" Status : {ss}".format(ss=ss)) + + if clients != None and clients_string != "": + print(" "+clients_string) + + if not (name.startswith("Shared Instance[") or name.startswith("TCPInterface[Client") or name.startswith("LocalInterface[")): + print(" Mode : {mode}".format(mode=modestr)) + + if "bitrate" in ifstat and ifstat["bitrate"] != None: + print(" Rate : {ss}".format(ss=speed_str(ifstat["bitrate"]))) + + if "airtime_short" in ifstat and "airtime_long" in ifstat: + print(" Airtime : {ats}% (15s), {atl}% (1h)".format(ats=str(ifstat["airtime_short"]),atl=str(ifstat["airtime_long"]))) + + if "channel_load_short" in ifstat and "channel_load_long" in ifstat: + print(" Ch.Load : {ats}% (15s), {atl}% (1h)".format(ats=str(ifstat["channel_load_short"]),atl=str(ifstat["channel_load_long"]))) + + if "peers" in ifstat and ifstat["peers"] != None: + print(" Peers : {np} reachable".format(np=ifstat["peers"])) + + if "tunnelstate" in ifstat and ifstat["tunnelstate"] != None: + print(" I2P : {ts}".format(ts=ifstat["tunnelstate"])) + + if "ifac_signature" in ifstat and ifstat["ifac_signature"] != None: + sigstr = "<…"+RNS.hexrep(ifstat["ifac_signature"][-5:], delimit=False)+">" + print(" Access : {nb}-bit IFAC by {sig}".format(nb=ifstat["ifac_size"]*8, sig=sigstr)) + + if "i2p_b32" in ifstat and ifstat["i2p_b32"] != None: + print(" I2P B32 : {ep}".format(ep=str(ifstat["i2p_b32"]))) + + if astats and "announce_queue" in ifstat and ifstat["announce_queue"] != None and ifstat["announce_queue"] > 0: + aqn = ifstat["announce_queue"] + if aqn == 1: + print(" Queued : {np} announce".format(np=aqn)) + else: + print(" Queued : {np} announces".format(np=aqn)) + + if astats and "held_announces" in ifstat and ifstat["held_announces"] != None and ifstat["held_announces"] > 0: + aqn = ifstat["held_announces"] + if aqn == 1: + print(" Held : {np} announce".format(np=aqn)) + else: + print(" Held : {np} announces".format(np=aqn)) + + if astats and "incoming_announce_frequency" in ifstat and ifstat["incoming_announce_frequency"] != None: + print(" Announces : {iaf}↑".format(iaf=RNS.prettyfrequency(ifstat["outgoing_announce_frequency"]))) + print(" {iaf}↓".format(iaf=RNS.prettyfrequency(ifstat["incoming_announce_frequency"]))) + + print(" Traffic : {txb}↑\n {rxb}↓".format(rxb=size_str(ifstat["rxb"]), txb=size_str(ifstat["txb"]))) + + if "transport_id" in stats and stats["transport_id"] != None: + print("\n Transport Instance "+RNS.prettyhexrep(stats["transport_id"])+" running") + if "probe_responder" in stats and stats["probe_responder"] != None: + print(" Probe responder at "+RNS.prettyhexrep(stats["probe_responder"])+ " active") + print(" Uptime is "+RNS.prettytime(stats["transport_uptime"])) + + print("") def main(): try: @@ -232,7 +234,7 @@ def main(): help="show all interfaces", default=False ) - + parser.add_argument( "-A", "--announce-stats", @@ -240,7 +242,7 @@ def main(): help="show announce stats", default=False ) - + parser.add_argument( "-s", "--sort", @@ -249,7 +251,7 @@ def main(): default=None, type=str ) - + parser.add_argument( "-r", "--reverse", @@ -257,7 +259,7 @@ def main(): help="reverse sorting", default=False, ) - + parser.add_argument( "-j", "--json", @@ -269,7 +271,7 @@ def main(): parser.add_argument('-v', '--verbose', action='count', default=0) parser.add_argument("filter", nargs="?", default=None, help="only display interfaces with names including filter", type=str) - + args = parser.parse_args() if args.config: From 1f6a494e14d535d7e58b93d0ec96bfb24541ec98 Mon Sep 17 00:00:00 2001 From: nothingbutlucas <69118979+nothingbutlucas@users.noreply.github.com> Date: Wed, 24 Apr 2024 12:08:23 -0300 Subject: [PATCH 08/11] refactor: Change loop for dictionary comprehension This will improve code execution because nested loops are slower than list/dictionary comprehensions. Signed-off-by: nothingbutlucas <69118979+nothingbutlucas@users.noreply.github.com> --- RNS/Utilities/rnstatus.py | 29 ++++++++++++++--------------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/RNS/Utilities/rnstatus.py b/RNS/Utilities/rnstatus.py index 5b80308..de41158 100644 --- a/RNS/Utilities/rnstatus.py +++ b/RNS/Utilities/rnstatus.py @@ -48,6 +48,13 @@ def size_str(num, suffix='B'): return "%.2f%s%s" % (num, last_unit, suffix) +def convert_bytes_to_hex(value_obj): + if isinstance(value_obj, bytes): + return RNS.hexrep(value_obj, delimit=False) + elif isinstance(value_obj, dict): + return {key: convert_bytes_to_hex(value) for key, value in value_obj.items()} + return value_obj + def program_setup(configdir, dispall=False, verbosity=0, name_filter=None, json=False, astats=False, sorting=None, sort_reverse=False): reticulum = RNS.Reticulum(configdir = configdir, loglevel = 3+verbosity) @@ -63,22 +70,13 @@ def program_setup(configdir, dispall=False, verbosity=0, name_filter=None, json= if json: import json - for s in stats: - if isinstance(stats[s], bytes): - stats[s] = RNS.hexrep(stats[s], delimit=False) - - if isinstance(stats[s], dict): - for i in stats[s]: - if isinstance(i, dict): - for k in i: - if isinstance(i[k], bytes): - i[k] = RNS.hexrep(i[k], delimit=False) - + for s, value in stats.items(): + stats[s] = convert_bytes_to_hex(value) print(json.dumps(stats)) exit() interfaces = stats["interfaces"] - if sorting != None and isinstance(sorting, str): + if sorting is not None and isinstance(sorting, str): sorting = sorting.lower() if sorting == "rate" or sorting == "bitrate": interfaces.sort(key=lambda i: i["bitrate"], reverse=not sort_reverse) @@ -97,7 +95,6 @@ def program_setup(configdir, dispall=False, verbosity=0, name_filter=None, json= if sorting == "held": interfaces.sort(key=lambda i: i["held_announces"], reverse=not sort_reverse) - for ifstat in interfaces: name = ifstat["name"] @@ -130,8 +127,7 @@ def program_setup(configdir, dispall=False, verbosity=0, name_filter=None, json= else: modestr = "Full" - - if ifstat["clients"] != None: + if ifstat["clients"] is not None: clients = ifstat["clients"] if name.startswith("Shared Instance["): cnum = max(clients-1,0) @@ -221,6 +217,7 @@ def program_setup(configdir, dispall=False, verbosity=0, name_filter=None, json= print("") + def main(): try: parser = argparse.ArgumentParser(description="Reticulum Network Stack Status") @@ -294,6 +291,7 @@ def main(): print("") exit() + def speed_str(num, suffix='bps'): units = ['','k','M','G','T','P','E','Z'] last_unit = 'Y' @@ -310,5 +308,6 @@ def speed_str(num, suffix='bps'): return "%.2f %s%s" % (num, last_unit, suffix) + if __name__ == "__main__": main() From c032a2e438071b74530a3daa7053e99ca846c32f Mon Sep 17 00:00:00 2001 From: nothingbutlucas <69118979+nothingbutlucas@users.noreply.github.com> Date: Wed, 24 Apr 2024 12:11:43 -0300 Subject: [PATCH 09/11] refactor(if/elif): Add elif's on if "questions" The variable sorting can have only 1 value at a time. So does not make sense to ask on every if statement the value of sorting. If one statement is True, then, the other would be False. Signed-off-by: nothingbutlucas <69118979+nothingbutlucas@users.noreply.github.com> --- RNS/Utilities/rnstatus.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/RNS/Utilities/rnstatus.py b/RNS/Utilities/rnstatus.py index de41158..b3876ef 100644 --- a/RNS/Utilities/rnstatus.py +++ b/RNS/Utilities/rnstatus.py @@ -80,19 +80,19 @@ def program_setup(configdir, dispall=False, verbosity=0, name_filter=None, json= sorting = sorting.lower() if sorting == "rate" or sorting == "bitrate": interfaces.sort(key=lambda i: i["bitrate"], reverse=not sort_reverse) - if sorting == "rx": + elif sorting == "rx": interfaces.sort(key=lambda i: i["rxb"], reverse=not sort_reverse) - if sorting == "tx": + elif sorting == "tx": interfaces.sort(key=lambda i: i["txb"], reverse=not sort_reverse) - if sorting == "traffic": + elif sorting == "traffic": interfaces.sort(key=lambda i: i["rxb"]+i["txb"], reverse=not sort_reverse) - if sorting == "announces" or sorting == "announce": + elif sorting == "announces" or sorting == "announce": interfaces.sort(key=lambda i: i["incoming_announce_frequency"]+i["outgoing_announce_frequency"], reverse=not sort_reverse) - if sorting == "arx": + elif sorting == "arx": interfaces.sort(key=lambda i: i["incoming_announce_frequency"], reverse=not sort_reverse) - if sorting == "atx": + elif sorting == "atx": interfaces.sort(key=lambda i: i["outgoing_announce_frequency"], reverse=not sort_reverse) - if sorting == "held": + elif sorting == "held": interfaces.sort(key=lambda i: i["held_announces"], reverse=not sort_reverse) for ifstat in interfaces: From acd893a6e654043adb7a51b651c792d3141f0333 Mon Sep 17 00:00:00 2001 From: nothingbutlucas <69118979+nothingbutlucas@users.noreply.github.com> Date: Wed, 24 Apr 2024 15:37:10 -0300 Subject: [PATCH 10/11] refactor: Replace string concatenation with joins Same as previous commits. "".join() is better that "string" + "another_string" + "another_string_2", because strings are immutable. Also, changed for loops with list comprehension. This is much faster and would improve performance. Signed-off-by: nothingbutlucas <69118979+nothingbutlucas@users.noreply.github.com> --- RNS/Utilities/rnx.py | 52 ++++++++++++++++++++------------------------ 1 file changed, 24 insertions(+), 28 deletions(-) diff --git a/RNS/Utilities/rnx.py b/RNS/Utilities/rnx.py index 77e823d..dabb36a 100644 --- a/RNS/Utilities/rnx.py +++ b/RNS/Utilities/rnx.py @@ -28,7 +28,7 @@ import argparse import shlex import time import sys -import tty +# import tty # Commented this because commented lines below use it import os from RNS._version import __version__ @@ -39,25 +39,27 @@ reticulum = None allow_all = False allowed_identity_hashes = [] + def prepare_identity(identity_path): global identity - if identity_path == None: + if identity_path is None: identity_path = RNS.Reticulum.identitypath+"/"+APP_NAME if os.path.isfile(identity_path): - identity = RNS.Identity.from_file(identity_path) + identity = RNS.Identity.from_file(identity_path) if identity == None: RNS.log("No valid saved identity found, creating new...", RNS.LOG_INFO) identity = RNS.Identity() identity.to_file(identity_path) + def listen(configdir, identitypath = None, verbosity = 0, quietness = 0, allowed = [], print_identity = False, disable_auth = None, disable_announce=False): global identity, allow_all, allowed_identity_hashes, reticulum targetloglevel = 3+verbosity-quietness reticulum = RNS.Reticulum(configdir=configdir, loglevel=targetloglevel) - + prepare_identity(identitypath) destination = RNS.Destination(identity, RNS.Destination.IN, RNS.Destination.SINGLE, APP_NAME, "execute") @@ -69,7 +71,7 @@ def listen(configdir, identitypath = None, verbosity = 0, quietness = 0, allowed if disable_auth: allow_all = True else: - if allowed != None: + if allowed is not None: for a in allowed: try: dest_len = (RNS.Reticulum.TRUNCATED_HASHLENGTH//8)*2 @@ -107,7 +109,7 @@ def listen(configdir, identitypath = None, verbosity = 0, quietness = 0, allowed if not disable_announce: destination.announce() - + while True: time.sleep(1) @@ -297,6 +299,7 @@ def remote_execution_progress(request_receipt): link = None listener_destination = None remote_exec_grace = 2.0 + def execute(configdir, identitypath = None, verbosity = 0, quietness = 0, detailed = False, mirror = False, noid = False, destination = None, command = None, stdin = None, stdoutl = None, stderrl = None, timeout = RNS.Transport.PATH_REQUEST_TIMEOUT, result_timeout = None, interactive = False): global identity, reticulum, link, listener_destination, remote_exec_grace @@ -338,7 +341,7 @@ def execute(configdir, identitypath = None, verbosity = 0, quietness = 0, detail if link == None or link.status == RNS.Link.CLOSED or link.status == RNS.Link.PENDING: link = RNS.Link(listener_destination) link.did_identify = False - + if not spin(until=lambda: link.status == RNS.Link.ACTIVE, msg="Establishing link with "+RNS.prettyhexrep(destination_hash), timeout=timeout): print("Could not establish link with "+RNS.prettyhexrep(destination_hash)) exit(243) @@ -467,7 +470,7 @@ def execute(configdir, identitypath = None, verbosity = 0, quietness = 0, detail else: tstr = "" print("Remote wrote "+str(outlen)+" bytes to stdout"+tstr) - + if errlen != None and stderr != None: if len(stderr) < errlen: tstr = ", "+str(len(stderr))+" bytes displayed" @@ -548,7 +551,6 @@ def main(): parser.add_argument("--stdout", action='store', default=None, help="max size in bytes of returned stdout", type=int) parser.add_argument("--stderr", action='store', default=None, help="max size in bytes of returned stderr", type=int) parser.add_argument("--version", action="version", version="rnx {version}".format(version=__version__)) - args = parser.parse_args() if args.listen or args.print_identity: @@ -600,8 +602,8 @@ def main(): # while True: # ch = sys.stdin.read(1) # cmdbuf += ch.encode("utf-8") - # print("\r"+prompt+cmdbuf.decode("utf-8"), end="") - + # print("\r"+prompt+cmdbuf.decode("utf-8"), end="") + command = input() if command.lower() == "exit" or command.lower() == "quit": exit(0) @@ -668,6 +670,7 @@ def size_str(num, suffix='B'): return "%.2f%s%s" % (num, last_unit, suffix) + def pretty_time(time, verbose=False): days = int(time // (24 * 3600)) time = time % (24 * 3600) @@ -676,7 +679,7 @@ def pretty_time(time, verbose=False): minutes = int(time // 60) time %= 60 seconds = round(time, 2) - + ss = "" if seconds == 1 else "s" sm = "" if minutes == 1 else "s" sh = "" if hours == 1 else "s" @@ -684,31 +687,24 @@ def pretty_time(time, verbose=False): components = [] if days > 0: - components.append(str(days)+" day"+sd if verbose else str(days)+"d") + components.append("".join([str(days), " day", sd if verbose else str(days), "d"])) if hours > 0: - components.append(str(hours)+" hour"+sh if verbose else str(hours)+"h") + components.append("".join([str(hours), " hour", sh if verbose else str(hours), "h"])) if minutes > 0: - components.append(str(minutes)+" minute"+sm if verbose else str(minutes)+"m") + components.append("".join([str(minutes), " minute", sm if verbose else str(minutes), "m"])) if seconds > 0: - components.append(str(seconds)+" second"+ss if verbose else str(seconds)+"s") + components.append("".join([str(seconds), " second", ss if verbose else str(seconds), "s"])) - i = 0 - tstr = "" - for c in components: - i += 1 - if i == 1: - pass - elif i < len(components): - tstr += ", " - elif i == len(components): - tstr += " and " + if len(components) > 1: + time_string = "".join([", ".join(components[:-1]), " and ", components[-1]]) + else: + time_string = components[0] if components else "" - tstr += c + return time_string - return tstr if __name__ == "__main__": main() From e47dcae54e139e61c36d26b3ef128ae5df9f4fea Mon Sep 17 00:00:00 2001 From: nothingbutlucas <69118979+nothingbutlucas@users.noreply.github.com> Date: Sat, 4 May 2024 14:29:56 -0300 Subject: [PATCH 11/11] refactor(rnodeconf): Replace string concatenation Signed-off-by: nothingbutlucas <69118979+nothingbutlucas@users.noreply.github.com> --- RNS/Utilities/rnodeconf.py | 155 +++++++++++++++++++------------------ 1 file changed, 81 insertions(+), 74 deletions(-) diff --git a/RNS/Utilities/rnodeconf.py b/RNS/Utilities/rnodeconf.py index 27f7f91..16fcd5c 100755 --- a/RNS/Utilities/rnodeconf.py +++ b/RNS/Utilities/rnodeconf.py @@ -56,12 +56,14 @@ fw_filename = None fw_url = None mapped_model = None +# TODO: Create and use constants for megas, kilos, etc (Avoid using magic numbers) + class KISS(): FEND = 0xC0 FESC = 0xDB TFEND = 0xDC TFESC = 0xDD - + CMD_UNKNOWN = 0xFE CMD_DATA = 0x00 CMD_FREQUENCY = 0x01 @@ -102,11 +104,11 @@ class KISS(): DETECT_REQ = 0x73 DETECT_RESP = 0x46 - + RADIO_STATE_OFF = 0x00 RADIO_STATE_ON = 0x01 RADIO_STATE_ASK = 0xFF - + CMD_ERROR = 0x90 ERROR_INITRADIO = 0x01 ERROR_TXFAILED = 0x02 @@ -166,7 +168,7 @@ class ROM(): MODEL_E9 = 0xE9 MODEL_E3 = 0xE3 MODEL_E8 = 0xE8 - + PRODUCT_HMBRW = 0xF0 MODEL_FF = 0xFF MODEL_FE = 0xFE @@ -280,7 +282,7 @@ try: os.makedirs(ROM_DIR) except Exception as e: - print("No access to directory "+str(CNF_DIR)+". This utility needs file system access to store firmware and data files. Cannot continue.") + print("".join(["No access to directory ", str(CNF_DIR), ". This utility needs file system access to store firmware and data files. Cannot continue."])) print("The contained exception was:") print(str(e)) exit(99) @@ -411,7 +413,7 @@ class RNode(): command_buffer = command_buffer+bytes([byte]) if (len(command_buffer) == 4): self.r_frequency = command_buffer[0] << 24 | command_buffer[1] << 16 | command_buffer[2] << 8 | command_buffer[3] - RNS.log("Radio reporting frequency is "+str(self.r_frequency/1000000.0)+" MHz") + RNS.log("".join(["Radio reporting frequency is ", str(self.r_frequency/1000000.0), " MHz"])) self.updateBitrate() elif (command == KISS.CMD_BANDWIDTH): @@ -427,7 +429,7 @@ class RNode(): command_buffer = command_buffer+bytes([byte]) if (len(command_buffer) == 4): self.r_bandwidth = command_buffer[0] << 24 | command_buffer[1] << 16 | command_buffer[2] << 8 | command_buffer[3] - RNS.log("Radio reporting bandwidth is "+str(self.r_bandwidth/1000.0)+" KHz") + RNS.log("".join(["Radio reporting bandwidth is ", str(self.r_bandwidth/1000.0), " KHz"])) self.updateBitrate() elif (command == KISS.CMD_BT_PIN): @@ -501,7 +503,7 @@ class RNode(): elif (command == KISS.CMD_TXPOWER): self.r_txpower = byte - RNS.log("Radio reporting TX power is "+str(self.r_txpower)+" dBm") + RNS.log("".join(["Radio reporting TX power is ", str(self.r_txpower), " dBm"])) elif (command == KISS.CMD_SF): self.r_sf = byte RNS.log("Radio reporting spreading factor is "+str(self.r_sf)) @@ -549,17 +551,17 @@ class RNode(): self.r_random = byte elif (command == KISS.CMD_ERROR): if (byte == KISS.ERROR_INITRADIO): - RNS.log(str(self)+" hardware initialisation error (code "+RNS.hexrep(byte)+")") + RNS.log("".join([str(self), " hardware initialisation error (code ", RNS.hexrep(byte), ")"])) elif (byte == KISS.ERROR_TXFAILED): - RNS.log(str(self)+" hardware TX error (code "+RNS.hexrep(byte)+")") + RNS.log("".join([str(self), " hardware TX error (code ", RNS.hexrep(byte), ")"])) else: - RNS.log(str(self)+" hardware error (code "+RNS.hexrep(byte)+")") + RNS.log("".join([str(self), " hardware error (code ", RNS.hexrep(byte), ")"])) elif (command == KISS.CMD_DETECT): if byte == KISS.DETECT_RESP: self.detected = True else: self.detected = False - + else: time_since_last = int(time.time()*1000) - last_read_ms if len(data_buffer) > 0 and time_since_last > self.timeout: @@ -816,7 +818,7 @@ class RNode(): from cryptography.hazmat.primitives.serialization import load_der_private_key from cryptography.hazmat.primitives.asymmetric import padding - # Try loading local signing key for + # Try loading local signing key for # validation of self-signed devices if os.path.isdir(FWD_DIR) and os.path.isfile(FWD_DIR+"/signing.key"): private_bytes = None @@ -852,7 +854,7 @@ class RNode(): RNS.log("Could not deserialize local signing key") RNS.log(str(e)) - # Try loading trusted signing key for + # Try loading trusted signing key for # validation of devices if os.path.isdir(TK_DIR): for f in os.listdir(TK_DIR): @@ -930,7 +932,7 @@ class RNode(): print(" ") print(" To initialise this device to a verifiable state, please run:") print(" ") - print(" rnodeconf "+str(self.serial.name)+" --autoinstall") + print("".join([" rnodeconf ", str(self.serial.name), " --autoinstall"])) print("") @@ -965,7 +967,7 @@ class RNode(): selected_version = None selected_hash = None -firmware_version_url = "https://unsigned.io/firmware/latest/?v="+program_version+"&variant=" +firmware_version_url = "".join(["https://unsigned.io/firmware/latest/?v=", program_version, "&variant="]) fallback_firmware_version_url = "https://github.com/markqvist/rnode_firmware/releases/latest/download/release.json" def ensure_firmware_file(fw_filename): global selected_version, selected_hash, upd_nocheck @@ -981,7 +983,7 @@ def ensure_firmware_file(fw_filename): ] parts_missing = False for rf in required_files: - if not os.path.isfile(EXT_DIR+"/"+rf): + if not os.path.isfile("".join([EXT_DIR, "/", rf])): parts_missing = True if parts_missing: @@ -994,7 +996,7 @@ def ensure_firmware_file(fw_filename): release_info = vf.read().decode("utf-8").strip() selected_version = release_info.split()[0] selected_hash = release_info.split()[1] - RNS.log("Using existing firmware file: "+fw_filename+" for version "+selected_version) + RNS.log("".join(["Using existing firmware file: ", fw_filename, " for version ", selected_version])) else: RNS.log("No extracted firmware is available, cannot continue.") RNS.log("Extract a firmware from an existing RNode first, using the --extract-firmware option.") @@ -1006,17 +1008,19 @@ def ensure_firmware_file(fw_filename): try: # if custom firmware url, download latest release if selected_version == None and fw_url == None: - version_url = firmware_version_url+fw_filename + version_url = firmware_version_url + fw_filename RNS.log("Retrieving latest version info from "+version_url) - urlretrieve(firmware_version_url+fw_filename, UPD_DIR+"/"+fw_filename+".version.latest") + urlretrieve(version_url, "".join([UPD_DIR, "/", fw_filename, ".version.latest"])) else: if fw_url != None: if selected_version == None: version_url = fw_url+"latest/download/release.json" else: - version_url = fw_url+"download/"+selected_version+"/release.json" + version_url = "".join([fw_url, "download/", selected_version, "/release.json"]) else: - version_url = firmware_update_url+selected_version+"/release.json" + if selected_version is None: + selected_version = "" + version_url = "".join([firmware_update_url, selected_version, "/release.json"]) try: RNS.log("Retrieving specified version info from "+version_url) urlretrieve(version_url, UPD_DIR+"/version_release_info.json") @@ -1024,8 +1028,9 @@ def ensure_firmware_file(fw_filename): with open(UPD_DIR+"/version_release_info.json", "rb") as rif: rdat = json.loads(rif.read()) variant = rdat[fw_filename] - with open(UPD_DIR+"/"+fw_filename+".version.latest", "wb") as verf: + with open("".join([UPD_DIR, "/", fw_filename, ".version.latest"]), "wb") as verf: inf_str = str(variant["version"])+" "+str(variant["hash"]) + inf_str = "".join([str(variant["version"]), " ", str(variant["hash"])]) verf.write(inf_str.encode("utf-8")) except Exception as e: RNS.log("Failed to retrive version information for your board.") @@ -1055,7 +1060,7 @@ def ensure_firmware_file(fw_filename): with open(UPD_DIR+"/fallback_release_info.json", "rb") as rif: rdat = json.loads(rif.read()) variant = rdat[fw_filename] - with open(UPD_DIR+"/"+fw_filename+".version.latest", "wb") as verf: + with open("".join([UPD_DIR, "/", fw_filename, ".version.latest"]), "wb") as verf: inf_str = str(variant["version"])+" "+str(variant["hash"]) verf.write(inf_str.encode("utf-8")) @@ -1064,7 +1069,7 @@ def ensure_firmware_file(fw_filename): raise e import shutil - file = open(UPD_DIR+"/"+fw_filename+".version.latest", "rb") + file = open("".join([UPD_DIR, "/", fw_filename, ".version.latest"]), "rb") release_info = file.read().decode("utf-8").strip() selected_version = release_info.split()[0] if selected_version == "not": @@ -1074,7 +1079,7 @@ def ensure_firmware_file(fw_filename): selected_hash = release_info.split()[1] if not os.path.isdir(UPD_DIR+"/"+selected_version): os.makedirs(UPD_DIR+"/"+selected_version) - shutil.copy(UPD_DIR+"/"+fw_filename+".version.latest", UPD_DIR+"/"+selected_version+"/"+fw_filename+".version") + shutil.copy("".join([UPD_DIR, "/", fw_filename, ".version.latest"]), "".join([UPD_DIR, "/", selected_version, "/", fw_filename, ".version"])) RNS.log("The selected firmware for this board is version "+selected_version) else: @@ -1084,32 +1089,32 @@ def ensure_firmware_file(fw_filename): # if custom firmware url, use it if fw_url != None: - update_target_url = fw_url+"download/"+selected_version+"/"+fw_filename + update_target_url = "".join([fw_url, "download/", selected_version, "/", fw_filename]) RNS.log("Retrieving firmware from custom url "+update_target_url) else: - update_target_url = firmware_update_url+selected_version+"/"+fw_filename + update_target_url = "".join([firmware_update_url, selected_version, "/", fw_filename]) try: if not os.path.isdir(UPD_DIR+"/"+selected_version): os.makedirs(UPD_DIR+"/"+selected_version) - if not os.path.isfile(UPD_DIR+"/"+selected_version+"/"+fw_filename): - RNS.log("Firmware "+UPD_DIR+"/"+selected_version+"/"+fw_filename+" not found.") - RNS.log("Downloading missing firmware file: "+fw_filename+" for version "+selected_version) - urlretrieve(update_target_url, UPD_DIR+"/"+selected_version+"/"+fw_filename) + if not os.path.isfile("".join([UPD_DIR, "/", selected_version, "/", fw_filename])): + RNS.log("".join(["Firmware ", UPD_DIR, "/", selected_version, "/", fw_filename, " not found."])) + RNS.log("".join(["Downloading missing firmware file: ", fw_filename, " for version ", selected_version])) + urlretrieve(update_target_url, "".join([UPD_DIR, "/", selected_version, "/", fw_filename])) RNS.log("Firmware file downloaded") else: - RNS.log("Using existing firmware file: "+fw_filename+" for version "+selected_version) + RNS.log("".join(["Using existing firmware file: ", fw_filename, " for version ", selected_version])) try: if selected_hash == None: try: - file = open(UPD_DIR+"/"+selected_version+"/"+fw_filename+".version", "rb") + file = open("".join([UPD_DIR, "/", selected_version, "/", fw_filename, ".version"]), "rb") release_info = file.read().decode("utf-8").strip() selected_hash = release_info.split()[1] except Exception as e: RNS.log("Could not read locally cached release information.") - RNS.log("Ensure "+UPD_DIR+"/"+selected_version+"/"+fw_filename+".version exists and has the correct format and hash.") + RNS.log("".join(["Ensure ", UPD_DIR, "/", selected_version, "/", fw_filename, ".version exists and has the correct format and hash."])) RNS.log("You can clear the cache with the --clear-cache option and try again.") if selected_hash == None: @@ -1117,7 +1122,8 @@ def ensure_firmware_file(fw_filename): exit(97) RNS.log("Verifying firmware integrity...") - fw_file = open(UPD_DIR+"/"+selected_version+"/"+fw_filename, "rb") + fw_file = open("".join([UPD_DIR, "/", selected_version, "/", fw_filename]), "rb") + # TODO: Remove unused variable expected_hash = bytes.fromhex(selected_hash) file_hash = hashlib.sha256(fw_file.read()).hexdigest() if file_hash == selected_hash: @@ -1230,11 +1236,11 @@ def main(): parser.add_argument("-f", "--flash", action="store_true", help="Flash firmware and bootstrap EEPROM") parser.add_argument("-r", "--rom", action="store_true", help="Bootstrap EEPROM without flashing firmware") - parser.add_argument("-k", "--key", action="store_true", help="Generate a new signing key and exit") # + parser.add_argument("-k", "--key", action="store_true", help="Generate a new signing key and exit") # parser.add_argument("-S", "--sign", action="store_true", help="Display public part of signing key") parser.add_argument("-H", "--firmware-hash", action="store", help="Display installed firmware hash") parser.add_argument("--platform", action="store", metavar="platform", type=str, default=None, help="Platform specification for device bootstrap") - parser.add_argument("--product", action="store", metavar="product", type=str, default=None, help="Product specification for device bootstrap") # + parser.add_argument("--product", action="store", metavar="product", type=str, default=None, help="Product specification for device bootstrap") # parser.add_argument("--model", action="store", metavar="model", type=str, default=None, help="Model code for device bootstrap") parser.add_argument("--hwrev", action="store", metavar="revision", type=int, default=None, help="Hardware revision for device bootstrap") @@ -1263,7 +1269,7 @@ def main(): if args.fw_version != None: selected_version = args.fw_version - try: + try: check_float = float(selected_version) except ValueError: RNS.log("Selected version \""+selected_version+"\" does not appear to be a number.") @@ -1277,7 +1283,7 @@ def main(): if args.nocheck: upd_nocheck = True - + if args.public or args.key or args.flash or args.rom or args.autoinstall or args.trust_key: from cryptography.hazmat.primitives import hashes from cryptography.hazmat.backends import default_backend @@ -1296,7 +1302,7 @@ def main(): public_key = load_der_public_key(public_bytes, backend=default_backend()) key_hash = hashlib.sha256(public_bytes).hexdigest() RNS.log("Trusting key: "+str(key_hash)) - f = open(TK_DIR+"/"+str(key_hash)+".pubkey", "wb") + f = open("".join([TK_DIR, "/", str(key_hash), ".pubkey"]), "wb") f.write(public_bytes) f.close() @@ -1328,12 +1334,12 @@ def main(): ports = list_ports.comports() portlist = [] for port in ports: - portlist.insert(0, port) - + portlist.insert(0, port) + pi = 1 print("Detected serial ports:") for port in portlist: - print(" ["+str(pi)+"] "+str(port.device)+" ("+str(port.product)+", "+str(port.serial_number)+")") + print("".join([" [", str(pi), "] ", str(port.device), " (", str(port.product), ", ", str(port.serial_number), ")"])) pi += 1 print("\nEnter the number of the serial port your device is connected to:\n? ", end="") @@ -1355,7 +1361,7 @@ def main(): port_product = selected_port.product port_serialno = selected_port.serial_number - print("\nOk, using device on "+str(port_path)+" ("+str(port_product)+", "+str(port_serialno)+")") + print("".join(["\nOk, using device on ", str(port_path), " (", str(port_product), ", ", str(port_serialno), ")"])) else: ports = list_ports.comports() @@ -1423,11 +1429,11 @@ def main(): hash_f.close() extraction_parts = [ - ("bootloader", "python \""+CNF_DIR+"/recovery_esptool.py\" --chip esp32 --port "+port_path+" --baud "+args.baud_flash+" --before default_reset --after hard_reset read_flash 0x1000 0x4650 \""+EXT_DIR+"/extracted_rnode_firmware.bootloader\""), - ("partition table", "python \""+CNF_DIR+"/recovery_esptool.py\" --chip esp32 --port "+port_path+" --baud "+args.baud_flash+" --before default_reset --after hard_reset read_flash 0x8000 0xC00 \""+EXT_DIR+"/extracted_rnode_firmware.partitions\""), - ("app boot", "python \""+CNF_DIR+"/recovery_esptool.py\" --chip esp32 --port "+port_path+" --baud "+args.baud_flash+" --before default_reset --after hard_reset read_flash 0xe000 0x2000 \""+EXT_DIR+"/extracted_rnode_firmware.boot_app0\""), - ("application image", "python \""+CNF_DIR+"/recovery_esptool.py\" --chip esp32 --port "+port_path+" --baud "+args.baud_flash+" --before default_reset --after hard_reset read_flash 0x10000 0x200000 \""+EXT_DIR+"/extracted_rnode_firmware.bin\""), - ("console image", "python \""+CNF_DIR+"/recovery_esptool.py\" --chip esp32 --port "+port_path+" --baud "+args.baud_flash+" --before default_reset --after hard_reset read_flash 0x210000 0x1F0000 \""+EXT_DIR+"/extracted_console_image.bin\""), + ("bootloader", "".join(["python \"", CNF_DIR, "/recovery_esptool.py\" --chip esp32 --port ", port_path, " --baud ", args.baud_flash, " --before default_reset --after hard_reset read_flash 0x1000 0x4650 \"", EXT_DIR, "/extracted_rnode_firmware.bootloader\""])), + ("partition table", "".join(["python \"", CNF_DIR, "/recovery_esptool.py\" --chip esp32 --port ", port_path, " --baud ", args.baud_flash, " --before default_reset --after hard_reset read_flash 0x8000 0xC00 \"", EXT_DIR, "/extracted_rnode_firmware.partitions\""])), + ("app boot", "".join(["python \"", CNF_DIR, "/recovery_esptool.py\" --chip esp32 --port ", port_path, " --baud ", args.baud_flash, " --before default_reset --after hard_reset read_flash 0xe000 0x2000 \"", EXT_DIR, "/extracted_rnode_firmware.boot_app0\""])), + ("application image", "".join(["python \"", CNF_DIR, "/recovery_esptool.py\" --chip esp32 --port ", port_path, " --baud ", args.baud_flash, " --before default_reset --after hard_reset read_flash 0x10000 0x200000 \"", EXT_DIR, "/extracted_rnode_firmware.bin\""])), + ("console image", "".join(["python \"", CNF_DIR, "/recovery_esptool.py\" --chip esp32 --port ", port_path, " --baud ", args.baud_flash, " --before default_reset --after hard_reset read_flash 0x210000 0x1F0000 \"", EXT_DIR, "/extracted_console_image.bin\""])), ] import subprocess, shlex for part, command in extraction_parts: @@ -1460,12 +1466,12 @@ def main(): ports = list_ports.comports() portlist = [] for port in ports: - portlist.insert(0, port) - + portlist.insert(0, port) + pi = 1 print("Detected serial ports:") for port in portlist: - print(" ["+str(pi)+"] "+str(port.device)+" ("+str(port.product)+", "+str(port.serial_number)+")") + print("".join([" [", str(pi), "] ", str(port.device), " (", str(port.product), ", ", str(port.serial_number), ")"])) pi += 1 print("\nEnter the number of the serial port your device is connected to:\n? ", end="") @@ -1488,7 +1494,7 @@ def main(): port_serialno = selected_port.serial_number clear() - print("\nOk, using device on "+str(port_path)+" ("+str(port_product)+", "+str(port_serialno)+")") + print("".join(["\nOk, using device on ", str(port_path), " (", str(port_product), ", ", str(port_serialno), ")"])) else: ports = list_ports.comports() @@ -1541,7 +1547,7 @@ def main(): print("correct firmware and provision it.") else: print("\nIt looks like this is a fresh device with no RNode firmware.") - + print("") print("What kind of device is this?\n") print("[1] A specific kind of RNode") @@ -1981,7 +1987,7 @@ def main(): fw_filename = "rnode_firmware.hex" elif selected_mcu == ROM.MCU_2560: fw_filename = "rnode_firmware_m2560.hex" - + elif selected_platform == ROM.PLATFORM_ESP32: fw_filename = None print("\nWhat kind of ESP32 board is this?\n") @@ -2094,7 +2100,7 @@ def main(): except Exception as e: RNS.log("Could not load device signing key") - + exit() @@ -2177,7 +2183,7 @@ def main(): if platform == "unzip": flasher = "unzip" if which(flasher) is not None: - return [flasher, "-o", UPD_DIR+"/"+selected_version+"/"+fw_filename, "-d", UPD_DIR+"/"+selected_version] + return [flasher, "-o", "".join([UPD_DIR, "/", selected_version, "/", fw_filename]), "-d", UPD_DIR+"/"+selected_version] else: RNS.log("") RNS.log("You do not currently have the \""+flasher+"\" program installed on your system.") @@ -2194,9 +2200,9 @@ def main(): # avrdude -C/home/markqvist/.arduino15/packages/arduino/tools/avrdude/6.3.0-arduino17/etc/avrdude.conf -q -q -V -patmega2560 -cwiring -P/dev/ttyACM0 -b115200 -D -Uflash:w:/tmp/arduino-sketch-0E260F46C421A84A7CBAD48E859C8E64/RNode_Firmware.ino.hex:i # avrdude -q -q -V -patmega2560 -cwiring -P/dev/ttyACM0 -b115200 -D -Uflash:w:/tmp/arduino-sketch-0E260F46C421A84A7CBAD48E859C8E64/RNode_Firmware.ino.hex:i if fw_filename == "rnode_firmware.hex": - return [flasher, "-P", args.port, "-p", "m1284p", "-c", "arduino", "-b", "115200", "-U", "flash:w:"+UPD_DIR+"/"+selected_version+"/"+fw_filename+":i"] + return [flasher, "-P", args.port, "-p", "m1284p", "-c", "arduino", "-b", "115200", "-U", "".join(["flash:w:", UPD_DIR, "/", selected_version, "/", fw_filename, ":i"])] elif fw_filename == "rnode_firmware_m2560.hex": - return [flasher, "-P", args.port, "-p", "atmega2560", "-c", "wiring", "-D", "-b", "115200", "-U", "flash:w:"+UPD_DIR+"/"+selected_version+"/"+fw_filename] + return [flasher, "-P", args.port, "-p", "atmega2560", "-c", "wiring", "-D", "-b", "115200", "-U", "".join(["flash:w:", UPD_DIR, "/", selected_version, "/", fw_filename])] else: RNS.log("") RNS.log("You do not currently have the \""+flasher+"\" program installed on your system.") @@ -2224,6 +2230,7 @@ def main(): os.chmod(flasher, stat.S_IRWXU | stat.S_IRGRP | stat.S_IXGRP) if which(flasher) is not None: + # TODO: Simplify this returns with a list and add or remove arguments if fw_filename == "rnode_firmware_tbeam.zip": if numeric_version >= 1.55: return [ @@ -2237,11 +2244,11 @@ def main(): "--flash_mode", "dio", "--flash_freq", "80m", "--flash_size", "4MB", - "0xe000", UPD_DIR+"/"+selected_version+"/rnode_firmware_tbeam.boot_app0", - "0x1000", UPD_DIR+"/"+selected_version+"/rnode_firmware_tbeam.bootloader", - "0x10000", UPD_DIR+"/"+selected_version+"/rnode_firmware_tbeam.bin", - "0x210000",UPD_DIR+"/"+selected_version+"/console_image.bin", - "0x8000", UPD_DIR+"/"+selected_version+"/rnode_firmware_tbeam.partitions", + "0xe000", "".join([UPD_DIR, "/", selected_version, "/rnode_firmware_tbeam.boot_app0"]), + "0x1000", "".join([UPD_DIR, "/", selected_version, "/rnode_firmware_tbeam.bootloader"]), + "0x10000", "".join([UPD_DIR, "/", selected_version, "/rnode_firmware_tbeam.bin"]), + "0x210000","".join([UPD_DIR, "/", selected_version, "/console_image.bin"]), + "0x8000", "".join([UPD_DIR, "/", selected_version, "/rnode_firmware_tbeam.partitions"]), ] else: return [ @@ -2672,7 +2679,7 @@ def main(): wants_fw_provision = False if args.flash: from subprocess import call - + if fw_filename == None: fw_filename = "rnode_firmware.hex" @@ -2703,9 +2710,9 @@ def main(): RNS.log("Error while flashing") RNS.log(str(e)) exit(1) - + else: - fw_src = UPD_DIR+"/"+selected_version+"/" + fw_src = "".join([UPD_DIR, "/", selected_version, "/"]) if os.path.isfile(fw_src+fw_filename): try: if fw_filename.endswith(".zip"): @@ -2862,7 +2869,7 @@ def main(): update_full_path = EXT_DIR+"/extracted_rnode_firmware.version" else: update_full_path = UPD_DIR+"/"+selected_version+"/"+fw_filename - if os.path.isfile(update_full_path): + if os.path.isfile(update_full_path): try: args.info = False RNS.log("Updating RNode firmware for device on "+args.port) @@ -3017,13 +3024,13 @@ def main(): RNS.log("") RNS.log("Device info:") - RNS.log("\tProduct : "+products[rnode.product]+" "+models[rnode.model][3]+" ("+bytes([rnode.product]).hex()+":"+bytes([rnode.model]).hex()+board_string+")") + RNS.log("".join(["\tProduct : ", products[rnode.product], " ", models[rnode.model][3], " (", bytes([rnode.product]).hex(), ":", bytes([rnode.model]).hex(), board_string, ")"])) RNS.log("\tDevice signature : "+sigstring) RNS.log("\tFirmware version : "+rnode.version) RNS.log("\tHardware revision : "+str(int(rnode.hw_rev))) RNS.log("\tSerial number : "+RNS.hexrep(rnode.serialno)) RNS.log("\tModem chip : "+str(models[rnode.model][5])) - RNS.log("\tFrequency range : "+str(rnode.min_freq/1e6)+" MHz - "+str(rnode.max_freq/1e6)+" MHz") + RNS.log("".join(["\tFrequency range : ", str(rnode.min_freq/1e6), " MHz - ", str(rnode.max_freq/1e6), " MHz"])) RNS.log("\tMax TX power : "+str(rnode.max_output)+" dBm") RNS.log("\tManufactured : "+timestring) @@ -3040,7 +3047,7 @@ def main(): RNS.log("\tDevice mode : TNC") RNS.log("\t Frequency : "+str((rnode.conf_frequency/1000000.0))+" MHz") RNS.log("\t Bandwidth : "+str(rnode.conf_bandwidth/1000.0)+" KHz") - RNS.log("\t TX power : "+str(rnode.conf_txpower)+" dBm ("+str(txp_mw)+" mW)") + RNS.log("".join(["\t TX power : ", str(rnode.conf_txpower), " dBm (", str(txp_mw), " mW)"])) RNS.log("\t Spreading factor : "+str(rnode.conf_sf)) RNS.log("\t Coding rate : "+str(rnode.conf_cr)) RNS.log("\t On-air bitrate : "+str(rnode.bitrate_kbps)+" kbps") @@ -3070,7 +3077,7 @@ def main(): if args.autoinstall: RNS.log("Clearing old EEPROM, this will take about 15 seconds...") rnode.wipe_eeprom() - + if rnode.platform == ROM.PLATFORM_ESP32: RNS.log("Waiting for ESP32 reset...") time.sleep(6) @@ -3241,7 +3248,7 @@ def main(): vf.close() else: partition_filename = fw_filename.replace(".zip", ".bin") - partition_hash = get_partition_hash(UPD_DIR+"/"+selected_version+"/"+partition_filename) + partition_hash = get_partition_hash("".join([UPD_DIR, "/", selected_version, "/", partition_filename])) if partition_hash != None: time.sleep(0.75)