From 7991db5c744bfd3f7635025fcd4dca823b6b73f7 Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Fri, 24 Sep 2021 20:10:04 +0200 Subject: [PATCH] Added rnstatus utility --- RNS/Interfaces/AX25KISSInterface.py | 7 +++ RNS/Interfaces/Interface.py | 4 +- RNS/Interfaces/KISSInterface.py | 6 ++ RNS/Interfaces/LocalInterface.py | 25 +++++++- RNS/Interfaces/RNodeInterface.py | 6 ++ RNS/Interfaces/SerialInterface.py | 5 ++ RNS/Interfaces/TCPInterface.py | 24 +++++++- RNS/Interfaces/UDPInterface.py | 8 +++ RNS/Reticulum.py | 64 +++++++++++++++++++- RNS/Utilities/rnprobe.py | 2 +- RNS/Utilities/rnstatus.py | 90 +++++++++++++++++++++++++++++ 11 files changed, 234 insertions(+), 7 deletions(-) create mode 100644 RNS/Utilities/rnstatus.py diff --git a/RNS/Interfaces/AX25KISSInterface.py b/RNS/Interfaces/AX25KISSInterface.py index cb35ed0..112817e 100644 --- a/RNS/Interfaces/AX25KISSInterface.py +++ b/RNS/Interfaces/AX25KISSInterface.py @@ -48,6 +48,9 @@ class AX25KISSInterface(Interface): serial = None def __init__(self, owner, name, callsign, ssid, port, speed, databits, parity, stopbits, preamble, txtail, persistence, slottime, flow_control): + self.rxb = 0 + self.txb = 0 + self.serial = None self.owner = owner self.name = name @@ -188,10 +191,12 @@ class AX25KISSInterface(Interface): def processIncoming(self, data): if (len(data) > AX25.HEADER_SIZE): + self.rxb += len(data) self.owner.inbound(data[AX25.HEADER_SIZE:], self) def processOutgoing(self,data): + datalen = len(data) if self.online: if self.interface_ready: if self.flow_control: @@ -224,6 +229,8 @@ class AX25KISSInterface(Interface): kiss_frame = bytes([KISS.FEND])+bytes([0x00])+data+bytes([KISS.FEND]) written = self.serial.write(kiss_frame) + self.txb += datalen + if written != len(kiss_frame): if self.flow_control: self.interface_ready = True diff --git a/RNS/Interfaces/Interface.py b/RNS/Interfaces/Interface.py index 440200a..7e36369 100755 --- a/RNS/Interfaces/Interface.py +++ b/RNS/Interfaces/Interface.py @@ -8,7 +8,9 @@ class Interface: name = None def __init__(self): - pass + self.rxb = 0 + self.txb = 0 + self.online = False def get_hash(self): return RNS.Identity.full_hash(str(self).encode("utf-8")) diff --git a/RNS/Interfaces/KISSInterface.py b/RNS/Interfaces/KISSInterface.py index 635bc2c..810399a 100644 --- a/RNS/Interfaces/KISSInterface.py +++ b/RNS/Interfaces/KISSInterface.py @@ -40,6 +40,9 @@ class KISSInterface(Interface): serial = None def __init__(self, owner, name, port, speed, databits, parity, stopbits, preamble, txtail, persistence, slottime, flow_control, beacon_interval, beacon_data): + self.rxb = 0 + self.txb = 0 + if beacon_data == None: beacon_data = "" @@ -174,10 +177,12 @@ class KISSInterface(Interface): def processIncoming(self, data): + self.rxb += len(data) self.owner.inbound(data, self) def processOutgoing(self,data): + datalen = len(data) if self.online: if self.interface_ready: if self.flow_control: @@ -189,6 +194,7 @@ class KISSInterface(Interface): frame = bytes([KISS.FEND])+bytes([0x00])+data+bytes([KISS.FEND]) written = self.serial.write(frame) + self.txb += datalen if data == self.beacon_d: self.first_tx = None diff --git a/RNS/Interfaces/LocalInterface.py b/RNS/Interfaces/LocalInterface.py index e2389f2..d886366 100644 --- a/RNS/Interfaces/LocalInterface.py +++ b/RNS/Interfaces/LocalInterface.py @@ -24,6 +24,10 @@ class ThreadingTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): class LocalClientInterface(Interface): def __init__(self, owner, name, target_port = None, connected_socket=None): + self.rxb = 0 + self.txb = 0 + self.online = False + self.IN = True self.OUT = False self.socket = None @@ -58,6 +62,10 @@ class LocalClientInterface(Interface): thread.start() def processIncoming(self, data): + self.rxb += len(data) + if hasattr(self, "parent_interface") and self.parent_interface != None: + self.parent_interface.rxb += len(data) + self.owner.inbound(data, self) def processOutgoing(self, data): @@ -70,6 +78,10 @@ class LocalClientInterface(Interface): data = bytes([HDLC.FLAG])+HDLC.escape(data)+bytes([HDLC.FLAG]) self.socket.sendall(data) self.writing = False + self.txb += len(data) + if hasattr(self, "parent_interface") and self.parent_interface != None: + self.parent_interface.txb += len(data) + except Exception as e: RNS.log("Exception occurred while transmitting via "+str(self)+", tearing down interface", RNS.LOG_ERROR) RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR) @@ -147,6 +159,8 @@ class LocalClientInterface(Interface): if self in RNS.Transport.local_client_interfaces: RNS.Transport.local_client_interfaces.remove(self) + if hasattr(self, "parent_interface") and self.parent_interface != None: + self.parent_interface.clients -= 1 if nowarning == False: RNS.log("The interface "+str(self)+" experienced an unrecoverable error and is being torn down. Restart Reticulum to attempt to open this interface again.", RNS.LOG_ERROR) @@ -170,6 +184,11 @@ class LocalClientInterface(Interface): class LocalServerInterface(Interface): def __init__(self, owner, bindport=None): + self.rxb = 0 + self.txb = 0 + self.online = False + self.clients = 0 + self.IN = True self.OUT = False self.name = "Reticulum" @@ -196,6 +215,9 @@ class LocalServerInterface(Interface): thread.setDaemon(True) thread.start() + self.online = True + + def incoming_connection(self, handler): interface_name = str(str(handler.client_address[1])) @@ -208,13 +230,14 @@ class LocalServerInterface(Interface): RNS.log("Accepting new connection to shared instance: "+str(spawned_interface), RNS.LOG_VERBOSE) RNS.Transport.interfaces.append(spawned_interface) RNS.Transport.local_client_interfaces.append(spawned_interface) + self.clients += 1 spawned_interface.read_loop() def processOutgoing(self, data): pass def __str__(self): - return "Shared Instance ["+str(self.bind_port)+"]" + return "Shared Instance["+str(self.bind_port)+"]" class LocalInterfaceHandler(socketserver.BaseRequestHandler): def __init__(self, callback, *args, **keys): diff --git a/RNS/Interfaces/RNodeInterface.py b/RNS/Interfaces/RNodeInterface.py index e99b24f..541bc71 100644 --- a/RNS/Interfaces/RNodeInterface.py +++ b/RNS/Interfaces/RNodeInterface.py @@ -72,6 +72,9 @@ class RNodeInterface(Interface): CALLSIGN_MAX_LEN = 32 def __init__(self, owner, name, port, frequency = None, bandwidth = None, txpower = None, sf = None, cr = None, flow_control = False, id_interval = None, id_callsign = None): + self.rxb = 0 + self.txb = 0 + self.serial = None self.owner = owner self.name = name @@ -278,10 +281,12 @@ class RNodeInterface(Interface): self.bitrate = 0 def processIncoming(self, data): + self.rxb += len(data) self.owner.inbound(data, self) def processOutgoing(self,data): + datalen = len(data) if self.online: if self.interface_ready: if self.flow_control: @@ -297,6 +302,7 @@ class RNodeInterface(Interface): frame = bytes([0xc0])+bytes([0x00])+data+bytes([0xc0]) written = self.serial.write(frame) + self.txb += datalen if written != len(frame): raise IOError("Serial interface only wrote "+str(written)+" bytes of "+str(len(data))) diff --git a/RNS/Interfaces/SerialInterface.py b/RNS/Interfaces/SerialInterface.py index a131153..8ce02c7 100755 --- a/RNS/Interfaces/SerialInterface.py +++ b/RNS/Interfaces/SerialInterface.py @@ -31,6 +31,9 @@ class SerialInterface(Interface): serial = None def __init__(self, owner, name, port, speed, databits, parity, stopbits): + self.rxb = 0 + self.txb = 0 + self.serial = None self.owner = owner self.name = name @@ -79,6 +82,7 @@ class SerialInterface(Interface): def processIncoming(self, data): + self.rxb += len(data) self.owner.inbound(data, self) @@ -86,6 +90,7 @@ class SerialInterface(Interface): if self.online: data = bytes([HDLC.FLAG])+HDLC.escape(data)+bytes([HDLC.FLAG]) written = self.serial.write(data) + self.txb += len(data) if written != len(data): raise IOError("Serial interface only wrote "+str(written)+" bytes of "+str(len(data))) diff --git a/RNS/Interfaces/TCPInterface.py b/RNS/Interfaces/TCPInterface.py index b27183d..0bcf9fa 100644 --- a/RNS/Interfaces/TCPInterface.py +++ b/RNS/Interfaces/TCPInterface.py @@ -34,6 +34,9 @@ class TCPClientInterface(Interface): TCP_PROBES = 5 def __init__(self, owner, name, target_ip=None, target_port=None, connected_socket=None, max_reconnect_tries=None): + self.rxb = 0 + self.txb = 0 + self.IN = True self.OUT = False self.socket = None @@ -177,6 +180,10 @@ class TCPClientInterface(Interface): raise IOError("Attempt to reconnect on a non-initiator TCP interface") def processIncoming(self, data): + self.rxb += len(data) + if hasattr(self, "parent_interface") and self.parent_interface != None: + self.parent_interface.rxb += len(data) + self.owner.inbound(data, self) def processOutgoing(self, data): @@ -189,6 +196,10 @@ class TCPClientInterface(Interface): data = bytes([HDLC.FLAG])+HDLC.escape(data)+bytes([HDLC.FLAG]) self.socket.sendall(data) self.writing = False + self.txb += len(data) + if hasattr(self, "parent_interface") and self.parent_interface != None: + self.parent_interface.txb += len(data) + except Exception as e: RNS.log("Exception occurred while transmitting via "+str(self)+", tearing down interface", RNS.LOG_ERROR) RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR) @@ -248,7 +259,7 @@ class TCPClientInterface(Interface): self.teardown() def teardown(self): - if self.initiator: + if self.initiator and not self.detached: RNS.log("The interface "+str(self)+" experienced an unrecoverable error and is being torn down. Restart Reticulum to attempt to open this interface again.", RNS.LOG_ERROR) if RNS.Reticulum.panic_on_interface_error: RNS.panic() @@ -260,6 +271,9 @@ class TCPClientInterface(Interface): self.OUT = False self.IN = False + if hasattr(self, "parent_interface") and self.parent_interface != None: + self.parent_interface.clients -= 1 + if self in RNS.Transport.interfaces: RNS.Transport.interfaces.remove(self) @@ -277,6 +291,11 @@ class TCPServerInterface(Interface): return netifaces.ifaddresses(name)[netifaces.AF_INET][0]['broadcast'] def __init__(self, owner, name, device=None, bindip=None, bindport=None): + self.rxb = 0 + self.txb = 0 + self.online = False + self.clients = 0 + self.IN = True self.OUT = False self.name = name @@ -304,6 +323,8 @@ class TCPServerInterface(Interface): thread.setDaemon(True) thread.start() + self.online = True + def incoming_connection(self, handler): RNS.log("Accepting incoming TCP connection", RNS.LOG_VERBOSE) @@ -317,6 +338,7 @@ class TCPServerInterface(Interface): spawned_interface.online = True RNS.log("Spawned new TCPClient Interface: "+str(spawned_interface), RNS.LOG_VERBOSE) RNS.Transport.interfaces.append(spawned_interface) + self.clients += 1 spawned_interface.read_loop() def processOutgoing(self, data): diff --git a/RNS/Interfaces/UDPInterface.py b/RNS/Interfaces/UDPInterface.py index a92b40e..29c39c2 100644 --- a/RNS/Interfaces/UDPInterface.py +++ b/RNS/Interfaces/UDPInterface.py @@ -18,9 +18,12 @@ class UDPInterface(Interface): return netifaces.ifaddresses(name)[netifaces.AF_INET][0]['broadcast'] def __init__(self, owner, name, device=None, bindip=None, bindport=None, forwardip=None, forwardport=None): + self.rxb = 0 + self.txb = 0 self.IN = True self.OUT = False self.name = name + self.online = False if device != None: if bindip == None: @@ -47,6 +50,8 @@ class UDPInterface(Interface): thread.setDaemon(True) thread.start() + self.online = True + if (forwardip != None and forwardport != None): self.forwards = True self.forward_ip = forwardip @@ -54,6 +59,7 @@ class UDPInterface(Interface): def processIncoming(self, data): + self.rxb += len(data) self.owner.inbound(data, self) def processOutgoing(self,data): @@ -61,6 +67,8 @@ class UDPInterface(Interface): udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) udp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) udp_socket.sendto(data, (self.forward_ip, self.forward_port)) + self.txb += len(data) + except Exception as e: RNS.log("Could not transmit on "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR) diff --git a/RNS/Reticulum.py b/RNS/Reticulum.py index feb5108..7b505d4 100755 --- a/RNS/Reticulum.py +++ b/RNS/Reticulum.py @@ -1,8 +1,10 @@ from .Interfaces import * -import configparser from .vendor.configobj import ConfigObj +import configparser +import multiprocessing.connection import RNS import signal +import threading import atexit import struct import array @@ -108,7 +110,9 @@ class Reticulum: Reticulum.panic_on_interface_error = False self.local_interface_port = 37428 - self.share_instance = True + self.local_control_port = 37429 + self.share_instance = True + self.rpc_listener = None self.requested_loglevel = loglevel if self.requested_loglevel != None: @@ -153,6 +157,15 @@ class Reticulum: RNS.Transport.start(self) + self.rpc_addr = ("127.0.0.1", self.local_control_port) + self.rpc_key = RNS.Identity.full_hash(RNS.Transport.identity.get_private_key()) + + if self.is_shared_instance: + self.rpc_listener = multiprocessing.connection.Listener(self.rpc_addr, authkey=self.rpc_key) + thread = threading.Thread(target=self.rpc_loop) + thread.setDaemon(True) + thread.start() + atexit.register(Reticulum.exit_handler) signal.signal(signal.SIGINT, Reticulum.sigint_handler) @@ -165,6 +178,7 @@ class Reticulum: ) interface.OUT = True RNS.Transport.interfaces.append(interface) + self.is_shared_instance = True RNS.log("Started shared instance interface: "+str(interface), RNS.LOG_DEBUG) except Exception as e: @@ -211,6 +225,9 @@ class Reticulum: if option == "shared_instance_port": value = int(self.config["reticulum"][option]) self.local_interface_port = value + if option == "instance_control_port": + value = int(self.config["reticulum"][option]) + self.local_control_port = value if option == "enable_transport": v = self.config["reticulum"].as_bool(option) if v == True: @@ -455,7 +472,7 @@ class Reticulum: RNS.Transport.interfaces.append(interface) else: - RNS.log("Skipping disabled interface \""+name+"\"", RNS.LOG_NOTICE) + RNS.log("Skipping disabled interface \""+name+"\"", RNS.LOG_INFO) except Exception as e: RNS.log("The interface \""+name+"\" could not be created. Check your configuration file for errors!", RNS.LOG_ERROR) @@ -475,6 +492,47 @@ class Reticulum: self.config.write() self.__apply_config() + def rpc_loop(self): + while True: + try: + rpc_connection = self.rpc_listener.accept() + call = rpc_connection.recv() + + if "get" in call: + path = call["get"] + + if path == "interface_stats": + rpc_connection.send(self.get_interface_stats()) + + rpc_connection.close() + except Exception as e: + RNS.log("An error ocurred while handling RPC call from local client: "+str(e), RNS.LOG_ERROR) + + def get_interface_stats(self): + if self.is_connected_to_shared_instance: + rpc_connection = multiprocessing.connection.Client(self.rpc_addr, authkey=self.rpc_key) + rpc_connection.send({"get": "interface_stats"}) + response = rpc_connection.recv() + return response + else: + stats = [] + for interface in RNS.Transport.interfaces: + ifstats = {} + + if hasattr(interface, "clients"): + ifstats["clients"] = interface.clients + else: + ifstats["clients"] = None + + ifstats["name"] = str(interface) + ifstats["rxb"] = interface.rxb + ifstats["txb"] = interface.txb + ifstats["status"] = interface.online + stats.append(ifstats) + + return stats + + @staticmethod def should_use_implicit_proof(): """ diff --git a/RNS/Utilities/rnprobe.py b/RNS/Utilities/rnprobe.py index 6308083..44bd54a 100644 --- a/RNS/Utilities/rnprobe.py +++ b/RNS/Utilities/rnprobe.py @@ -124,7 +124,7 @@ def main(): parser.add_argument( "--version", action="version", - version="rnpath {version}".format(version=__version__) + version="rnprobe {version}".format(version=__version__) ) parser.add_argument( diff --git a/RNS/Utilities/rnstatus.py b/RNS/Utilities/rnstatus.py new file mode 100644 index 0000000..c897a21 --- /dev/null +++ b/RNS/Utilities/rnstatus.py @@ -0,0 +1,90 @@ +#!/usr/bin/env python3 + +import RNS +import argparse + +from RNS._version import __version__ + +def size_str(num, suffix='B'): + units = ['','K','M','G','T','P','E','Z'] + last_unit = 'Y' + + if suffix == 'b': + num *= 8 + units = ['','K','M','G','T','P','E','Z'] + last_unit = 'Y' + + for unit in units: + if abs(num) < 1000.0: + if unit == "": + return "%.0f %s%s" % (num, unit, suffix) + else: + return "%.2f %s%s" % (num, unit, suffix) + num /= 1000.0 + + return "%.2f%s%s" % (num, last_unit, suffix) + +def program_setup(configdir, dispall=False, verbosity = 0): + reticulum = RNS.Reticulum(configdir = configdir, loglevel = 3+verbosity) + + ifstats = reticulum.get_interface_stats() + if ifstats != None: + for ifstat in ifstats: + name = ifstat["name"] + + if dispall or not (name.startswith("LocalInterface[") or name.startswith("TCPInterface[Client")): + if ifstat["status"]: + ss = "Up" + else: + ss = "Down" + + if ifstat["clients"] != None: + clients = ifstat["clients"] + if name.startswith("Shared Instance["): + clients_string = "Connected applications: "+str(clients-1) + else: + clients_string = "Connected clients: "+str(clients) + + else: + clients = None + + print(" {n}".format(n=ifstat["name"])) + print("\tStatus: {ss}".format(ss=ss)) + if clients != None: + print("\t"+clients_string) + print("\tRX: {rxb}\n\tTX: {txb}".format(rxb=size_str(ifstat["rxb"]), txb=size_str(ifstat["txb"]))) + print("") + else: + print("Could not get RNS status") + +def main(): + try: + parser = argparse.ArgumentParser(description="Reticulum Network Stack Status") + parser.add_argument("--config", action="store", default=None, help="path to alternative Reticulum config directory", type=str) + parser.add_argument("--version", action="version", version="rnstatus {version}".format(version=__version__)) + + parser.add_argument( + "-a", + "--all", + action="store_true", + help="show all interfaces", + default=False + ) + + parser.add_argument('-v', '--verbose', action='count', default=0) + + args = parser.parse_args() + + if args.config: + configarg = args.config + else: + configarg = None + + program_setup(configdir = configarg, dispall = args.all, verbosity=args.verbose) + + except KeyboardInterrupt: + print("") + exit() + +if __name__ == "__main__": + main()