mirror of
https://github.com/markqvist/Reticulum.git
synced 2024-12-22 03:40:18 +00:00
Added ratchets example
This commit is contained in:
parent
a11f14e75f
commit
dd889d16d4
343
Examples/Ratchets.py
Normal file
343
Examples/Ratchets.py
Normal file
@ -0,0 +1,343 @@
|
||||
##########################################################
|
||||
# This RNS example demonstrates a simple client/server #
|
||||
# echo utility that uses ratchets to rotate encryption #
|
||||
# keys everytime an announce is sent. #
|
||||
##########################################################
|
||||
|
||||
import argparse
|
||||
import RNS
|
||||
|
||||
# Let's define an app name. We'll use this for all
|
||||
# destinations we create. Since this echo example
|
||||
# is part of a range of example utilities, we'll put
|
||||
# them all within the app namespace "example_utilities"
|
||||
APP_NAME = "example_utilities"
|
||||
|
||||
|
||||
##########################################################
|
||||
#### Server Part #########################################
|
||||
##########################################################
|
||||
|
||||
# This initialisation is executed when the users chooses
|
||||
# to run as a server
|
||||
def server(configpath):
|
||||
global reticulum
|
||||
|
||||
# We must first initialise Reticulum
|
||||
reticulum = RNS.Reticulum(configpath)
|
||||
|
||||
# TODO: Remove
|
||||
RNS.loglevel = RNS.LOG_DEBUG
|
||||
|
||||
# Randomly create a new identity for our echo server
|
||||
server_identity = RNS.Identity()
|
||||
|
||||
# We create a destination that clients can query. We want
|
||||
# to be able to verify echo replies to our clients, so we
|
||||
# create a "single" destination that can receive encrypted
|
||||
# messages. This way the client can send a request and be
|
||||
# certain that no-one else than this destination was able
|
||||
# to read it.
|
||||
echo_destination = RNS.Destination(
|
||||
server_identity,
|
||||
RNS.Destination.IN,
|
||||
RNS.Destination.SINGLE,
|
||||
APP_NAME,
|
||||
"ratchet",
|
||||
"echo",
|
||||
"request"
|
||||
)
|
||||
|
||||
# Enable ratchets on the destination by providing a file
|
||||
# path to store ratchets. In this example, we will just
|
||||
# use a temporary file, but in real-world applications,
|
||||
# it's extremely important to keep this file secure, since
|
||||
# it contains encryption keys for the destination.
|
||||
destination_hexhash = RNS.hexrep(echo_destination.hash, delimit=False)
|
||||
echo_destination.enable_ratchets(f"/tmp/{destination_hexhash}.ratchets")
|
||||
|
||||
# We configure the destination to automatically prove all
|
||||
# packets addressed to it. By doing this, RNS will automatically
|
||||
# generate a proof for each incoming packet and transmit it
|
||||
# back to the sender of that packet.
|
||||
echo_destination.set_proof_strategy(RNS.Destination.PROVE_ALL)
|
||||
|
||||
# Tell the destination which function in our program to
|
||||
# run when a packet is received. We do this so we can
|
||||
# print a log message when the server receives a request
|
||||
echo_destination.set_packet_callback(server_callback)
|
||||
|
||||
# Everything's ready!
|
||||
# Let's Wait for client requests or user input
|
||||
announceLoop(echo_destination)
|
||||
|
||||
|
||||
def announceLoop(destination):
|
||||
# Let the user know that everything is ready
|
||||
RNS.log(
|
||||
"Ratcheted echo server "+
|
||||
RNS.prettyhexrep(destination.hash)+
|
||||
" running, hit enter to manually send an announce (Ctrl-C to quit)"
|
||||
)
|
||||
|
||||
# We enter a loop that runs until the users exits.
|
||||
# If the user hits enter, we will announce our server
|
||||
# destination on the network, which will let clients
|
||||
# know how to create messages directed towards it.
|
||||
while True:
|
||||
entered = input()
|
||||
destination.announce()
|
||||
RNS.log("Sent announce from "+RNS.prettyhexrep(destination.hash))
|
||||
|
||||
|
||||
def server_callback(message, packet):
|
||||
global reticulum
|
||||
|
||||
# Tell the user that we received an echo request, and
|
||||
# that we are going to send a reply to the requester.
|
||||
# Sending the proof is handled automatically, since we
|
||||
# set up the destination to prove all incoming packets.
|
||||
|
||||
reception_stats = ""
|
||||
if reticulum.is_connected_to_shared_instance:
|
||||
reception_rssi = reticulum.get_packet_rssi(packet.packet_hash)
|
||||
reception_snr = reticulum.get_packet_snr(packet.packet_hash)
|
||||
|
||||
if reception_rssi != None:
|
||||
reception_stats += " [RSSI "+str(reception_rssi)+" dBm]"
|
||||
|
||||
if reception_snr != None:
|
||||
reception_stats += " [SNR "+str(reception_snr)+" dBm]"
|
||||
|
||||
else:
|
||||
if packet.rssi != None:
|
||||
reception_stats += " [RSSI "+str(packet.rssi)+" dBm]"
|
||||
|
||||
if packet.snr != None:
|
||||
reception_stats += " [SNR "+str(packet.snr)+" dB]"
|
||||
|
||||
RNS.log("Received packet from echo client, proof sent"+reception_stats)
|
||||
|
||||
|
||||
##########################################################
|
||||
#### Client Part #########################################
|
||||
##########################################################
|
||||
|
||||
# This initialisation is executed when the users chooses
|
||||
# to run as a client
|
||||
def client(destination_hexhash, configpath, timeout=None):
|
||||
global reticulum
|
||||
|
||||
# We need a binary representation of the destination
|
||||
# hash that was entered on the command line
|
||||
try:
|
||||
dest_len = (RNS.Reticulum.TRUNCATED_HASHLENGTH//8)*2
|
||||
if len(destination_hexhash) != dest_len:
|
||||
raise ValueError(
|
||||
"Destination length is invalid, must be {hex} hexadecimal characters ({byte} bytes).".format(hex=dest_len, byte=dest_len//2)
|
||||
)
|
||||
|
||||
destination_hash = bytes.fromhex(destination_hexhash)
|
||||
except Exception as e:
|
||||
RNS.log("Invalid destination entered. Check your input!")
|
||||
RNS.log(str(e)+"\n")
|
||||
exit()
|
||||
|
||||
# We must first initialise Reticulum
|
||||
reticulum = RNS.Reticulum(configpath)
|
||||
|
||||
# We override the loglevel to provide feedback when
|
||||
# an announce is received
|
||||
if RNS.loglevel < RNS.LOG_INFO:
|
||||
RNS.loglevel = RNS.LOG_INFO
|
||||
|
||||
# Tell the user that the client is ready!
|
||||
RNS.log(
|
||||
"Echo client ready, hit enter to send echo request to "+
|
||||
destination_hexhash+
|
||||
" (Ctrl-C to quit)"
|
||||
)
|
||||
|
||||
# We enter a loop that runs until the user exits.
|
||||
# If the user hits enter, we will try to send an
|
||||
# echo request to the destination specified on the
|
||||
# command line.
|
||||
while True:
|
||||
input()
|
||||
|
||||
# Let's first check if RNS knows a path to the destination.
|
||||
# If it does, we'll load the server identity and create a packet
|
||||
if RNS.Transport.has_path(destination_hash):
|
||||
|
||||
# To address the server, we need to know it's public
|
||||
# key, so we check if Reticulum knows this destination.
|
||||
# This is done by calling the "recall" method of the
|
||||
# Identity module. If the destination is known, it will
|
||||
# return an Identity instance that can be used in
|
||||
# outgoing destinations.
|
||||
server_identity = RNS.Identity.recall(destination_hash)
|
||||
|
||||
# We got the correct identity instance from the
|
||||
# recall method, so let's create an outgoing
|
||||
# destination. We use the naming convention:
|
||||
# example_utilities.ratchet.echo.request
|
||||
# This matches the naming we specified in the
|
||||
# server part of the code.
|
||||
request_destination = RNS.Destination(
|
||||
server_identity,
|
||||
RNS.Destination.OUT,
|
||||
RNS.Destination.SINGLE,
|
||||
APP_NAME,
|
||||
"ratchet",
|
||||
"echo",
|
||||
"request"
|
||||
)
|
||||
|
||||
# The destination is ready, so let's create a packet.
|
||||
# We set the destination to the request_destination
|
||||
# that was just created, and the only data we add
|
||||
# is a random hash.
|
||||
echo_request = RNS.Packet(request_destination, RNS.Identity.get_random_hash())
|
||||
|
||||
# Send the packet! If the packet is successfully
|
||||
# sent, it will return a PacketReceipt instance.
|
||||
packet_receipt = echo_request.send()
|
||||
|
||||
# If the user specified a timeout, we set this
|
||||
# timeout on the packet receipt, and configure
|
||||
# a callback function, that will get called if
|
||||
# the packet times out.
|
||||
if timeout != None:
|
||||
packet_receipt.set_timeout(timeout)
|
||||
packet_receipt.set_timeout_callback(packet_timed_out)
|
||||
|
||||
# We can then set a delivery callback on the receipt.
|
||||
# This will get automatically called when a proof for
|
||||
# this specific packet is received from the destination.
|
||||
packet_receipt.set_delivery_callback(packet_delivered)
|
||||
|
||||
# Tell the user that the echo request was sent
|
||||
RNS.log("Sent echo request to "+RNS.prettyhexrep(request_destination.hash))
|
||||
else:
|
||||
# If we do not know this destination, tell the
|
||||
# user to wait for an announce to arrive.
|
||||
RNS.log("Destination is not yet known. Requesting path...")
|
||||
RNS.log("Hit enter to manually retry once an announce is received.")
|
||||
RNS.Transport.request_path(destination_hash)
|
||||
|
||||
# This function is called when our reply destination
|
||||
# receives a proof packet.
|
||||
def packet_delivered(receipt):
|
||||
global reticulum
|
||||
|
||||
if receipt.status == RNS.PacketReceipt.DELIVERED:
|
||||
rtt = receipt.get_rtt()
|
||||
if (rtt >= 1):
|
||||
rtt = round(rtt, 3)
|
||||
rttstring = str(rtt)+" seconds"
|
||||
else:
|
||||
rtt = round(rtt*1000, 3)
|
||||
rttstring = str(rtt)+" milliseconds"
|
||||
|
||||
reception_stats = ""
|
||||
if reticulum.is_connected_to_shared_instance:
|
||||
reception_rssi = reticulum.get_packet_rssi(receipt.proof_packet.packet_hash)
|
||||
reception_snr = reticulum.get_packet_snr(receipt.proof_packet.packet_hash)
|
||||
|
||||
if reception_rssi != None:
|
||||
reception_stats += " [RSSI "+str(reception_rssi)+" dBm]"
|
||||
|
||||
if reception_snr != None:
|
||||
reception_stats += " [SNR "+str(reception_snr)+" dB]"
|
||||
|
||||
else:
|
||||
if receipt.proof_packet != None:
|
||||
if receipt.proof_packet.rssi != None:
|
||||
reception_stats += " [RSSI "+str(receipt.proof_packet.rssi)+" dBm]"
|
||||
|
||||
if receipt.proof_packet.snr != None:
|
||||
reception_stats += " [SNR "+str(receipt.proof_packet.snr)+" dB]"
|
||||
|
||||
RNS.log(
|
||||
"Valid reply received from "+
|
||||
RNS.prettyhexrep(receipt.destination.hash)+
|
||||
", round-trip time is "+rttstring+
|
||||
reception_stats
|
||||
)
|
||||
|
||||
# This function is called if a packet times out.
|
||||
def packet_timed_out(receipt):
|
||||
if receipt.status == RNS.PacketReceipt.FAILED:
|
||||
RNS.log("Packet "+RNS.prettyhexrep(receipt.hash)+" timed out")
|
||||
|
||||
|
||||
##########################################################
|
||||
#### Program Startup #####################################
|
||||
##########################################################
|
||||
|
||||
# This part of the program gets run at startup,
|
||||
# and parses input from the user, and then starts
|
||||
# the desired program mode.
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
parser = argparse.ArgumentParser(description="Simple ratcheted echo server and client utility")
|
||||
|
||||
parser.add_argument(
|
||||
"-s",
|
||||
"--server",
|
||||
action="store_true",
|
||||
help="wait for incoming packets from clients"
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"-t",
|
||||
"--timeout",
|
||||
action="store",
|
||||
metavar="s",
|
||||
default=None,
|
||||
help="set a reply timeout in seconds",
|
||||
type=float
|
||||
)
|
||||
|
||||
parser.add_argument("--config",
|
||||
action="store",
|
||||
default=None,
|
||||
help="path to alternative Reticulum config directory",
|
||||
type=str
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"destination",
|
||||
nargs="?",
|
||||
default=None,
|
||||
help="hexadecimal hash of the server destination",
|
||||
type=str
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.server:
|
||||
configarg=None
|
||||
if args.config:
|
||||
configarg = args.config
|
||||
server(configarg)
|
||||
else:
|
||||
if args.config:
|
||||
configarg = args.config
|
||||
else:
|
||||
configarg = None
|
||||
|
||||
if args.timeout:
|
||||
timeoutarg = float(args.timeout)
|
||||
else:
|
||||
timeoutarg = None
|
||||
|
||||
if (args.destination == None):
|
||||
print("")
|
||||
parser.print_help()
|
||||
print("")
|
||||
else:
|
||||
client(args.destination, configarg, timeout=timeoutarg)
|
||||
except KeyboardInterrupt:
|
||||
print("")
|
||||
exit()
|
Loading…
Reference in New Issue
Block a user