Reticulum/RNS/Utilities/rnid.py
2024-10-11 23:14:32 +02:00

600 lines
29 KiB
Python

#!/usr/bin/env python3
# MIT License
#
# Copyright (c) 2023 Mark Qvist / unsigned.io
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import RNS
import argparse
import time
import sys
import os
import base64
from RNS._version import __version__
APP_NAME = "rnid"
SIG_EXT = "rsg"
ENCRYPT_EXT = "rfe"
CHUNK_SIZE = 16*1024*1024
def spin(until=None, msg=None, timeout=None):
i = 0
syms = "⢄⢂⢁⡁⡈⡐⡠"
if timeout != None:
timeout = time.time()+timeout
print(f"{msg} ", end=" ")
while (timeout == None or time.time()<timeout) and not until():
time.sleep(0.1)
print(f"\b\b{syms[i]} ", end="")
sys.stdout.flush()
i = (i+1)%len(syms)
print(f"\r{' ' * len(msg)} \r", end="")
if timeout != None and time.time() > timeout:
return False
else:
return True
def main():
try:
parser = argparse.ArgumentParser(description="Reticulum Identity & Encryption Utility")
# parser.add_argument("file", nargs="?", default=None, help="input file path", type=str)
parser.add_argument("--config", metavar="path", action="store", default=None, help="path to alternative Reticulum config directory", type=str)
parser.add_argument("-i", "--identity", metavar="identity", action="store", default=None, help="hexadecimal Reticulum Destination hash or path to Identity file", type=str)
parser.add_argument("-g", "--generate", metavar="file", action="store", default=None, help="generate a new Identity")
parser.add_argument("-m", "--import", dest="import_str", metavar="identity_data", action="store", default=None, help="import Reticulum identity in hex, base32 or base64 format", type=str)
parser.add_argument("-x", "--export", action="store_true", default=None, help="export identity to hex, base32 or base64 format")
parser.add_argument("-v", "--verbose", action="count", default=0, help="increase verbosity")
parser.add_argument("-q", "--quiet", action="count", default=0, help="decrease verbosity")
parser.add_argument("-a", "--announce", metavar="aspects", action="store", default=None, help="announce a destination based on this Identity")
parser.add_argument("-H", "--hash", metavar="aspects", action="store", default=None, help="show destination hashes for other aspects for this Identity")
parser.add_argument("-e", "--encrypt", metavar="file", action="store", default=None, help="encrypt file")
parser.add_argument("-d", "--decrypt", metavar="file", action="store", default=None, help="decrypt file")
parser.add_argument("-s", "--sign", metavar="path", action="store", default=None, help="sign file")
parser.add_argument("-V", "--validate", metavar="path", action="store", default=None, help="validate signature")
parser.add_argument("-r", "--read", metavar="file", action="store", default=None, help="input file path", type=str)
parser.add_argument("-w", "--write", metavar="file", action="store", default=None, help="output file path", type=str)
parser.add_argument("-f", "--force", action="store_true", default=None, help="write output even if it overwrites existing files")
parser.add_argument("-I", "--stdin", action="store_true", default=False, help=argparse.SUPPRESS) # "read input from STDIN instead of file"
parser.add_argument("-O", "--stdout", action="store_true", default=False, help=argparse.SUPPRESS) # help="write output to STDOUT instead of file",
parser.add_argument("-R", "--request", action="store_true", default=False, help="request unknown Identities from the network")
parser.add_argument("-t", action="store", metavar="seconds", type=float, help="identity request timeout before giving up", default=RNS.Transport.PATH_REQUEST_TIMEOUT)
parser.add_argument("-p", "--print-identity", action="store_true", default=False, help="print identity info and exit")
parser.add_argument("-P", "--print-private", action="store_true", default=False, help="allow displaying private keys")
parser.add_argument("-b", "--base64", action="store_true", default=False, help="Use base64-encoded input and output")
parser.add_argument("-B", "--base32", action="store_true", default=False, help="Use base32-encoded input and output")
parser.add_argument("--version", action="version", version=f"rnid {__version__}")
args = parser.parse_args()
ops = 0;
for t in [args.encrypt, args.decrypt, args.validate, args.sign]:
if t:
ops += 1
if ops > 1:
RNS.log("This utility currently only supports one of the encrypt, decrypt, sign or verify operations per invocation", RNS.LOG_ERROR)
exit(1)
if not args.read:
if args.encrypt:
args.read = args.encrypt
if args.decrypt:
args.read = args.decrypt
if args.sign:
args.read = args.sign
identity_str = args.identity
if args.import_str:
identity_bytes = None
try:
if args.base64:
identity_bytes = base64.urlsafe_b64decode(args.import_str)
elif args.base32:
identity_bytes = base64.b32decode(args.import_str)
else:
identity_bytes = bytes.fromhex(args.import_str)
except Exception as e:
print(f"Invalid identity data specified for import: {e}")
exit(41)
try:
identity = RNS.Identity.from_bytes(identity_bytes)
except Exception as e:
print(f"Could not create Reticulum identity from specified data: {e}")
exit(42)
RNS.log("Identity imported")
if args.base64:
RNS.log(f"Public Key : {base64.urlsafe_b64encode(identity.get_public_key()).decode('utf-8')}")
elif args.base32:
RNS.log(f"Public Key : {base64.b32encode(identity.get_public_key()).decode('utf-8')}")
else:
RNS.log(f"Public Key : {RNS.hexrep(identity.get_public_key(), delimit=False)}")
if identity.prv:
if args.print_private:
if args.base64:
RNS.log(f"Private Key : {base64.urlsafe_b64encode(identity.get_private_key()).decode('utf-8')}")
elif args.base32:
RNS.log(f"Private Key : {base64.b32encode(identity.get_private_key()).decode('utf-8')}")
else:
RNS.log(f"Private Key : {RNS.hexrep(identity.get_private_key(), delimit=False)}")
else:
RNS.log("Private Key : Hidden")
if args.write:
try:
wp = os.path.expanduser(args.write)
if not os.path.isfile(wp) or args.force:
identity.to_file(wp)
RNS.log(f"Wrote imported identity to {args.write}")
else:
print(f"File {wp} already exists, not overwriting")
exit(43)
except Exception as e:
print(f"Error while writing imported identity to file: {e}")
exit(44)
exit(0)
if not args.generate and not identity_str:
print("\nNo identity provided, cannot continue\n")
parser.print_help()
print("")
exit(2)
else:
targetloglevel = 4
verbosity = args.verbose
quietness = args.quiet
if verbosity != 0 or quietness != 0:
targetloglevel = targetloglevel+verbosity-quietness
# Start Reticulum
reticulum = RNS.Reticulum(configdir=args.config, loglevel=targetloglevel)
RNS.compact_log_fmt = True
if args.stdout:
RNS.loglevel = -1
if args.generate:
identity = RNS.Identity()
if not args.force and os.path.isfile(args.generate):
RNS.log(f"Identity file {args.generate} already exists. Not overwriting.", RNS.LOG_ERROR)
exit(3)
else:
try:
identity.to_file(args.generate)
RNS.log(f"New identity written to {args.generate}")
exit(0)
except Exception as e:
RNS.log("An error ocurred while saving the generated Identity.", RNS.LOG_ERROR)
RNS.log(f"The contained exception was: {e}", RNS.LOG_ERROR)
exit(4)
identity = None
if len(identity_str) == RNS.Reticulum.TRUNCATED_HASHLENGTH//8*2 and not os.path.isfile(identity_str):
# Try recalling Identity from hex-encoded hash
try:
destination_hash = bytes.fromhex(identity_str)
identity = RNS.Identity.recall(destination_hash)
if identity == None:
if not args.request:
RNS.log(f"Could not recall Identity for {RNS.prettyhexrep(destination_hash)}.", RNS.LOG_ERROR)
RNS.log("You can query the network for unknown Identities with the -R option.", RNS.LOG_ERROR)
exit(5)
else:
RNS.Transport.request_path(destination_hash)
def spincheck():
return RNS.Identity.recall(destination_hash) != None
spin(spincheck, f"Requesting unknown Identity for {RNS.prettyhexrep(destination_hash)}", args.t)
if not spincheck():
RNS.log("Identity request timed out", RNS.LOG_ERROR)
exit(6)
else:
identity = RNS.Identity.recall(destination_hash)
RNS.log(f"Received Identity {identity} for destination {RNS.prettyhexrep(destination_hash)} from the network")
else:
RNS.log(f"Recalled Identity {identity} for destination {RNS.prettyhexrep(destination_hash)}")
except Exception as e:
RNS.log("Invalid hexadecimal hash provided", RNS.LOG_ERROR)
exit(7)
else:
# Try loading Identity from file
if not os.path.isfile(identity_str):
RNS.log("Specified Identity file not found")
exit(8)
else:
try:
identity = RNS.Identity.from_file(identity_str)
RNS.log(f"Loaded Identity {identity} from {identity_str}")
except Exception as e:
RNS.log("Could not decode Identity from specified file")
exit(9)
if identity != None:
if args.hash:
try:
aspects = args.hash.split(".")
if not len(aspects) > 0:
RNS.log("Invalid destination aspects specified", RNS.LOG_ERROR)
exit(32)
else:
app_name = aspects[0]
aspects = aspects[1:]
if identity.pub != None:
destination = RNS.Destination(identity, RNS.Destination.OUT, RNS.Destination.SINGLE, app_name, *aspects)
RNS.log(f"The {args.hash} destination for this Identity is {RNS.prettyhexrep(destination.hash)}")
RNS.log(f"The full destination specifier is {destination}")
time.sleep(0.25)
exit(0)
else:
raise KeyError("No public key known")
except Exception as e:
RNS.log("An error ocurred while attempting to send the announce.", RNS.LOG_ERROR)
RNS.log(f"The contained exception was: {e}", RNS.LOG_ERROR)
exit(0)
if args.announce:
try:
aspects = args.announce.split(".")
if not len(aspects) > 1:
RNS.log("Invalid destination aspects specified", RNS.LOG_ERROR)
exit(32)
else:
app_name = aspects[0]
aspects = aspects[1:]
if identity.prv != None:
destination = RNS.Destination(identity, RNS.Destination.IN, RNS.Destination.SINGLE, app_name, *aspects)
RNS.log(f"Created destination {destination}")
RNS.log(f"Announcing destination {RNS.prettyhexrep(destination.hash)}")
destination.announce()
time.sleep(0.25)
exit(0)
else:
destination = RNS.Destination(identity, RNS.Destination.OUT, RNS.Destination.SINGLE, app_name, *aspects)
RNS.log(f"The {args.announce} destination for this Identity is {RNS.prettyhexrep(destination.hash)}")
RNS.log(f"The full destination specifier is {destination}")
RNS.log("Cannot announce this destination, since the private key is not held")
time.sleep(0.25)
exit(33)
except Exception as e:
RNS.log("An error ocurred while attempting to send the announce.", RNS.LOG_ERROR)
RNS.log(f"The contained exception was: {e}", RNS.LOG_ERROR)
exit(0)
if args.print_identity:
if args.base64:
RNS.log(f"Public Key : {base64.urlsafe_b64encode(identity.get_public_key()).decode('utf-8')}")
elif args.base32:
RNS.log(f"Public Key : {base64.b32encode(identity.get_public_key()).decode('utf-8')}")
else:
RNS.log(f"Public Key : {RNS.hexrep(identity.get_public_key(), delimit=False)}")
if identity.prv:
if args.print_private:
if args.base64:
RNS.log(f"Private Key : {base64.urlsafe_b64encode(identity.get_private_key()).decode('utf-8')}")
elif args.base32:
RNS.log(f"Private Key : {base64.b32encode(identity.get_private_key()).decode('utf-8')}")
else:
RNS.log(f"Private Key : {RNS.hexrep(identity.get_private_key(), delimit=False)}")
else:
RNS.log("Private Key : Hidden")
exit(0)
if args.export:
if identity.prv:
if args.base64:
RNS.log(f"Exported Identity : {base64.urlsafe_b64encode(identity.get_private_key()).decode('utf-8')}")
elif args.base32:
RNS.log(f"Exported Identity : {base64.b32encode(identity.get_private_key()).decode('utf-8')}")
else:
RNS.log(f"Exported Identity : {RNS.hexrep(identity.get_private_key(), delimit=False)}")
else:
RNS.log("Identity doesn't hold a private key, cannot export")
exit(50)
exit(0)
if args.validate:
if not args.read and args.validate.lower().endswith(f".{SIG_EXT}"):
args.read = str(args.validate).replace(f".{SIG_EXT}", "")
if not os.path.isfile(args.validate):
RNS.log(f"Signature file {args.read} not found", RNS.LOG_ERROR)
exit(10)
if not os.path.isfile(args.read):
RNS.log(f"Input file {args.read} not found", RNS.LOG_ERROR)
exit(11)
data_input = None
if args.read:
if not os.path.isfile(args.read):
RNS.log(f"Input file {args.read} not found", RNS.LOG_ERROR)
exit(12)
else:
try:
data_input = open(args.read, "rb")
except Exception as e:
RNS.log("Could not open input file for reading", RNS.LOG_ERROR)
RNS.log(f"The contained exception was: {e}", RNS.LOG_ERROR)
exit(13)
# TODO: Actually expand this to a good solution
# probably need to create a wrapper that takes
# into account not closing stdin when done
# elif args.stdin:
# data_input = sys.stdin
data_output = None
if args.encrypt and not args.write and not args.stdout and args.read:
args.write = f"{args.read}.{ENCRYPT_EXT}"
if args.decrypt and not args.write and not args.stdout and args.read and args.read.lower().endswith(f".{ENCRYPT_EXT}"):
args.write = str(args.read).replace(f".{ENCRYPT_EXT}", "")
if args.sign and identity.prv == None:
RNS.log("Specified Identity does not hold a private key. Cannot sign.", RNS.LOG_ERROR)
exit(14)
if args.sign and not args.write and not args.stdout and args.read:
args.write = f"{args.read}.{SIG_EXT}"
if args.write:
if not args.force and os.path.isfile(args.write):
RNS.log(f"Output file {args.write} already exists. Not overwriting.", RNS.LOG_ERROR)
exit(15)
else:
try:
data_output = open(args.write, "wb")
except Exception as e:
RNS.log("Could not open output file for writing", RNS.LOG_ERROR)
RNS.log(f"The contained exception was: {e}", RNS.LOG_ERROR)
exit(15)
# TODO: Actually expand this to a good solution
# probably need to create a wrapper that takes
# into account not closing stdout when done
# elif args.stdout:
# data_output = sys.stdout
if args.sign:
if identity.prv == None:
RNS.log("Specified Identity does not hold a private key. Cannot sign.", RNS.LOG_ERROR)
exit(16)
if not data_input:
if not args.stdout:
RNS.log("Signing requested, but no input data specified", RNS.LOG_ERROR)
exit(17)
else:
if not data_output:
if not args.stdout:
RNS.log("Signing requested, but no output specified", RNS.LOG_ERROR)
exit(18)
if not args.stdout:
RNS.log(f"Signing {args.read}")
try:
data_output.write(identity.sign(data_input.read()))
data_output.close()
data_input.close()
if not args.stdout:
if args.read:
RNS.log(f"File {args.read} signed with {identity} to {args.write}")
exit(0)
except Exception as e:
if not args.stdout:
RNS.log("An error ocurred while encrypting data.", RNS.LOG_ERROR)
RNS.log(f"The contained exception was: {e}", RNS.LOG_ERROR)
try:
data_output.close()
except:
pass
try:
data_input.close()
except:
pass
exit(19)
if args.validate:
if not data_input:
if not args.stdout:
RNS.log("Signature verification requested, but no input data specified", RNS.LOG_ERROR)
exit(20)
else:
# if not args.stdout:
# RNS.log("Verifying "+str(args.validate)+" for "+str(args.read))
try:
try:
sig_input = open(args.validate, "rb")
except Exception as e:
RNS.log(f"An error ocurred while opening {args.validate}.", RNS.LOG_ERROR)
RNS.log(f"The contained exception was: {e}", RNS.LOG_ERROR)
exit(21)
validated = identity.validate(sig_input.read(), data_input.read())
sig_input.close()
data_input.close()
if not validated:
if not args.stdout:
RNS.log(f"Signature {args.validate} for file {args.read} is invalid", RNS.LOG_ERROR)
exit(22)
else:
if not args.stdout:
RNS.log(f"Signature {args.validate} for file {args.read} made by Identity {identity} is valid")
exit(0)
except Exception as e:
if not args.stdout:
RNS.log("An error ocurred while validating signature.", RNS.LOG_ERROR)
RNS.log(f"The contained exception was: {e}", RNS.LOG_ERROR)
try:
data_output.close()
except:
pass
try:
data_input.close()
except:
pass
exit(23)
if args.encrypt:
if not data_input:
if not args.stdout:
RNS.log("Encryption requested, but no input data specified", RNS.LOG_ERROR)
exit(24)
else:
if not data_output:
if not args.stdout:
RNS.log("Encryption requested, but no output specified", RNS.LOG_ERROR)
exit(25)
if not args.stdout:
RNS.log(f"Encrypting {args.read}")
try:
more_data = True
while more_data:
chunk = data_input.read(CHUNK_SIZE)
if chunk:
data_output.write(identity.encrypt(chunk))
else:
more_data = False
data_output.close()
data_input.close()
if not args.stdout:
if args.read:
RNS.log(f"File {args.read} encrypted for {identity} to {args.write}")
exit(0)
except Exception as e:
if not args.stdout:
RNS.log("An error ocurred while encrypting data.", RNS.LOG_ERROR)
RNS.log(f"The contained exception was: {e}", RNS.LOG_ERROR)
try:
data_output.close()
except:
pass
try:
data_input.close()
except:
pass
exit(26)
if args.decrypt:
if identity.prv == None:
RNS.log("Specified Identity does not hold a private key. Cannot decrypt.", RNS.LOG_ERROR)
exit(27)
if not data_input:
if not args.stdout:
RNS.log("Decryption requested, but no input data specified", RNS.LOG_ERROR)
exit(28)
else:
if not data_output:
if not args.stdout:
RNS.log("Decryption requested, but no output specified", RNS.LOG_ERROR)
exit(29)
if not args.stdout:
RNS.log(f"Decrypting {args.read}...")
try:
more_data = True
while more_data:
chunk = data_input.read(CHUNK_SIZE)
if chunk:
plaintext = identity.decrypt(chunk)
if plaintext == None:
if not args.stdout:
RNS.log("Data could not be decrypted with the specified Identity")
exit(30)
else:
data_output.write(plaintext)
else:
more_data = False
data_output.close()
data_input.close()
if not args.stdout:
if args.read:
RNS.log(f"File {args.read} decrypted with {identity} to {args.write}")
exit(0)
except Exception as e:
if not args.stdout:
RNS.log("An error ocurred while decrypting data.", RNS.LOG_ERROR)
RNS.log(f"The contained exception was: {e}", RNS.LOG_ERROR)
try:
data_output.close()
except:
pass
try:
data_input.close()
except:
pass
exit(31)
if True:
pass
elif False:
pass
else:
print("")
parser.print_help()
print("")
except KeyboardInterrupt:
print("")
exit(255)
if __name__ == "__main__":
main()