Added announce queue dropping to rnpath utility

This commit is contained in:
Mark Qvist 2022-05-13 16:18:13 +02:00
parent 1b50f5267a
commit af1a05ff6a
4 changed files with 49 additions and 4 deletions

View File

@ -56,6 +56,7 @@ class Interface:
stale.append(a)
for s in stale:
if s in self.announce_queue:
self.announce_queue.remove(s)
if len(self.announce_queue) > 0:
@ -70,7 +71,10 @@ class Interface:
self.announce_allowed_at = now + wait_time
self.processOutgoing(selected["raw"])
if selected in self.announce_queue:
self.announce_queue.remove(selected)
if len(self.announce_queue) > 0:
timer = threading.Timer(wait_time, self.process_announce_queue)
timer.start()

View File

@ -837,6 +837,9 @@ class Reticulum:
if path == "path":
rpc_connection.send(self.drop_path(call["destination_hash"]))
if path == "announce_queues":
rpc_connection.send(self.drop_announce_queues())
rpc_connection.close()
except Exception as e:
@ -938,6 +941,16 @@ class Reticulum:
else:
return RNS.Transport.expire_path(destination)
def drop_announce_queues(self):
if self.is_connected_to_shared_instance:
rpc_connection = multiprocessing.connection.Client(self.rpc_addr, authkey=self.rpc_key)
rpc_connection.send({"drop": "announce_queues"})
response = rpc_connection.recv()
return response
else:
return RNS.Transport.drop_announce_queues()
def get_next_hop_if_name(self, destination):
if self.is_connected_to_shared_instance:
rpc_connection = multiprocessing.connection.Client(self.rpc_addr, authkey=self.rpc_key)

View File

@ -1798,6 +1798,20 @@ class Transport:
if registered_destination.type == RNS.Destination.SINGLE:
registered_destination.announce(path_response=True)
@staticmethod
def drop_announce_queues():
for interface in Transport.interfaces:
if hasattr(interface, "announce_queue") and interface.announce_queue != None:
na = len(interface.announce_queue)
if na > 0:
if na == 1:
na_str = "1 announce"
else:
na_str = str(na)+" announces"
interface.announce_queue = []
RNS.log("Dropped "+na_str+" on "+str(interface), RNS.LOG_VERBOSE)
@staticmethod
def announce_emitted(packet):
random_blob = packet.data[RNS.Identity.KEYSIZE//8:RNS.Identity.KEYSIZE//8+RNS.Reticulum.TRUNCATED_HASHLENGTH//8]

View File

@ -30,7 +30,7 @@ import argparse
from RNS._version import __version__
def program_setup(configdir, table, drop, destination_hexhash, verbosity, timeout):
def program_setup(configdir, table, drop, destination_hexhash, verbosity, timeout, drop_queues):
if table:
reticulum = RNS.Reticulum(configdir = configdir, loglevel = 3+verbosity)
table = sorted(reticulum.get_path_table(), key=lambda e: (e["interface"], e["hops"]) )
@ -43,6 +43,11 @@ def program_setup(configdir, table, drop, destination_hexhash, verbosity, timeou
m_str = "s"
print(RNS.prettyhexrep(path["hash"])+" is "+str(path["hops"])+" hop"+m_str+" away via "+RNS.prettyhexrep(path["via"])+" on "+path["interface"]+" expires "+RNS.timestamp_str(path["expires"]))
elif drop_queues:
reticulum = RNS.Reticulum(configdir = configdir, loglevel = 3+verbosity)
RNS.log("Dropping announce queues on all interfaces...")
reticulum.drop_announce_queues()
elif drop:
try:
dest_len = (RNS.Reticulum.TRUNCATED_HASHLENGTH//8)*2
@ -140,6 +145,14 @@ def main():
default=False
)
parser.add_argument(
"-D",
"--drop-announces",
action="store_true",
help="drop all queued announces",
default=False
)
parser.add_argument(
"-w",
action="store",
@ -166,7 +179,7 @@ def main():
else:
configarg = None
if not args.table and not args.destination:
if not args.drop_announces and not args.table and not args.destination:
print("")
parser.print_help()
print("")
@ -178,6 +191,7 @@ def main():
destination_hexhash = args.destination,
verbosity = args.verbose,
timeout = args.w,
drop_queues = args.drop_announces,
)
except KeyboardInterrupt: