diff --git a/RNS/Interfaces/Interface.py b/RNS/Interfaces/Interface.py index a8c38bf..2539284 100755 --- a/RNS/Interfaces/Interface.py +++ b/RNS/Interfaces/Interface.py @@ -56,7 +56,8 @@ class Interface: stale.append(a) for s in stale: - self.announce_queue.remove(s) + if s in self.announce_queue: + self.announce_queue.remove(s) if len(self.announce_queue) > 0: min_hops = min(entry["hops"] for entry in self.announce_queue) @@ -70,7 +71,10 @@ class Interface: self.announce_allowed_at = now + wait_time self.processOutgoing(selected["raw"]) - self.announce_queue.remove(selected) + + if selected in self.announce_queue: + self.announce_queue.remove(selected) + if len(self.announce_queue) > 0: timer = threading.Timer(wait_time, self.process_announce_queue) timer.start() diff --git a/RNS/Reticulum.py b/RNS/Reticulum.py index 639dd73..6176bce 100755 --- a/RNS/Reticulum.py +++ b/RNS/Reticulum.py @@ -837,6 +837,9 @@ class Reticulum: if path == "path": rpc_connection.send(self.drop_path(call["destination_hash"])) + if path == "announce_queues": + rpc_connection.send(self.drop_announce_queues()) + rpc_connection.close() except Exception as e: @@ -938,6 +941,16 @@ class Reticulum: else: return RNS.Transport.expire_path(destination) + def drop_announce_queues(self): + if self.is_connected_to_shared_instance: + rpc_connection = multiprocessing.connection.Client(self.rpc_addr, authkey=self.rpc_key) + rpc_connection.send({"drop": "announce_queues"}) + response = rpc_connection.recv() + return response + + else: + return RNS.Transport.drop_announce_queues() + def get_next_hop_if_name(self, destination): if self.is_connected_to_shared_instance: rpc_connection = multiprocessing.connection.Client(self.rpc_addr, authkey=self.rpc_key) diff --git a/RNS/Transport.py b/RNS/Transport.py index 460eb76..95f8fca 100755 --- a/RNS/Transport.py +++ b/RNS/Transport.py @@ -1798,6 +1798,20 @@ class Transport: if registered_destination.type == RNS.Destination.SINGLE: registered_destination.announce(path_response=True) + @staticmethod + def drop_announce_queues(): + for interface in Transport.interfaces: + if hasattr(interface, "announce_queue") and interface.announce_queue != None: + na = len(interface.announce_queue) + if na > 0: + if na == 1: + na_str = "1 announce" + else: + na_str = str(na)+" announces" + + interface.announce_queue = [] + RNS.log("Dropped "+na_str+" on "+str(interface), RNS.LOG_VERBOSE) + @staticmethod def announce_emitted(packet): random_blob = packet.data[RNS.Identity.KEYSIZE//8:RNS.Identity.KEYSIZE//8+RNS.Reticulum.TRUNCATED_HASHLENGTH//8] diff --git a/RNS/Utilities/rnpath.py b/RNS/Utilities/rnpath.py index d374b36..427cc3f 100644 --- a/RNS/Utilities/rnpath.py +++ b/RNS/Utilities/rnpath.py @@ -30,7 +30,7 @@ import argparse from RNS._version import __version__ -def program_setup(configdir, table, drop, destination_hexhash, verbosity, timeout): +def program_setup(configdir, table, drop, destination_hexhash, verbosity, timeout, drop_queues): if table: reticulum = RNS.Reticulum(configdir = configdir, loglevel = 3+verbosity) table = sorted(reticulum.get_path_table(), key=lambda e: (e["interface"], e["hops"]) ) @@ -43,6 +43,11 @@ def program_setup(configdir, table, drop, destination_hexhash, verbosity, timeou m_str = "s" print(RNS.prettyhexrep(path["hash"])+" is "+str(path["hops"])+" hop"+m_str+" away via "+RNS.prettyhexrep(path["via"])+" on "+path["interface"]+" expires "+RNS.timestamp_str(path["expires"])) + elif drop_queues: + reticulum = RNS.Reticulum(configdir = configdir, loglevel = 3+verbosity) + RNS.log("Dropping announce queues on all interfaces...") + reticulum.drop_announce_queues() + elif drop: try: dest_len = (RNS.Reticulum.TRUNCATED_HASHLENGTH//8)*2 @@ -140,6 +145,14 @@ def main(): default=False ) + parser.add_argument( + "-D", + "--drop-announces", + action="store_true", + help="drop all queued announces", + default=False + ) + parser.add_argument( "-w", action="store", @@ -166,7 +179,7 @@ def main(): else: configarg = None - if not args.table and not args.destination: + if not args.drop_announces and not args.table and not args.destination: print("") parser.print_help() print("") @@ -178,6 +191,7 @@ def main(): destination_hexhash = args.destination, verbosity = args.verbose, timeout = args.w, + drop_queues = args.drop_announces, ) except KeyboardInterrupt: