From 3871d8615e6307bcc956bf3801dfddf17cbe27c1 Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Sat, 14 May 2022 18:09:38 +0200 Subject: [PATCH] Added per-interface announce rate control --- RNS/Interfaces/I2PInterface.py | 3 ++ RNS/Interfaces/LocalInterface.py | 8 ++++ RNS/Interfaces/TCPInterface.py | 3 ++ RNS/Reticulum.py | 25 +++++++++++ RNS/Transport.py | 74 +++++++++++++++++++++++++------- 5 files changed, 97 insertions(+), 16 deletions(-) diff --git a/RNS/Interfaces/I2PInterface.py b/RNS/Interfaces/I2PInterface.py index 0c9c41c..fc52555 100644 --- a/RNS/Interfaces/I2PInterface.py +++ b/RNS/Interfaces/I2PInterface.py @@ -599,6 +599,9 @@ class I2PInterface(Interface): spawned_interface.ifac_size = self.ifac_size spawned_interface.ifac_netname = self.ifac_netname spawned_interface.ifac_netkey = self.ifac_netkey + spawned_interface.announce_rate_target = self.announce_rate_target + spawned_interface.announce_rate_grace = self.announce_rate_grace + spawned_interface.announce_rate_penalty = self.announce_rate_penalty RNS.log("Spawned new I2PInterface Peer: "+str(spawned_interface), RNS.LOG_VERBOSE) RNS.Transport.interfaces.append(spawned_interface) self.clients += 1 diff --git a/RNS/Interfaces/LocalInterface.py b/RNS/Interfaces/LocalInterface.py index 4fed948..57301a7 100644 --- a/RNS/Interfaces/LocalInterface.py +++ b/RNS/Interfaces/LocalInterface.py @@ -80,6 +80,10 @@ class LocalClientInterface(Interface): self.online = True self.writing = False + self.announce_rate_target = None + self.announce_rate_grace = None + self.announce_rate_penalty = None + if connected_socket == None: thread = threading.Thread(target=self.read_loop) thread.setDaemon(True) @@ -285,6 +289,10 @@ class LocalServerInterface(Interface): thread.setDaemon(True) thread.start() + self.announce_rate_target = None + self.announce_rate_grace = None + self.announce_rate_penalty = None + self.bitrate = 1000*1000*1000 self.online = True diff --git a/RNS/Interfaces/TCPInterface.py b/RNS/Interfaces/TCPInterface.py index 1425bc5..fe30ce0 100644 --- a/RNS/Interfaces/TCPInterface.py +++ b/RNS/Interfaces/TCPInterface.py @@ -456,6 +456,9 @@ class TCPServerInterface(Interface): spawned_interface.ifac_size = self.ifac_size spawned_interface.ifac_netname = self.ifac_netname spawned_interface.ifac_netkey = self.ifac_netkey + spawned_interface.announce_rate_target = self.announce_rate_target + spawned_interface.announce_rate_grace = self.announce_rate_grace + spawned_interface.announce_rate_penalty = self.announce_rate_penalty spawned_interface.online = True RNS.log("Spawned new TCPClient Interface: "+str(spawned_interface), RNS.LOG_VERBOSE) RNS.Transport.interfaces.append(spawned_interface) diff --git a/RNS/Reticulum.py b/RNS/Reticulum.py index fe7a4a7..100b671 100755 --- a/RNS/Reticulum.py +++ b/RNS/Reticulum.py @@ -393,6 +393,27 @@ class Reticulum: if "bitrate" in c: if c.as_int("bitrate") >= Reticulum.MINIMUM_BITRATE: configured_bitrate = c.as_int("bitrate") + + announce_rate_target = None + if "announce_rate_target" in c: + if c.as_int("announce_rate_target") > 0: + announce_rate_target = c.as_int("announce_rate_target") + + announce_rate_grace = None + if "announce_rate_grace" in c: + if c.as_int("announce_rate_grace") >= 0: + announce_rate_grace = c.as_int("announce_rate_grace") + + announce_rate_penalty = None + if "announce_rate_penalty" in c: + if c.as_int("announce_rate_penalty") >= 0: + announce_rate_penalty = c.as_int("announce_rate_penalty") + + if announce_rate_target != None and announce_rate_grace == None: + announce_rate_grace = 0 + + if announce_rate_target != None and announce_rate_penalty == None: + announce_rate_penalty = 0 announce_cap = Reticulum.ANNOUNCE_CAP/100.0 if "announce_cap" in c: @@ -765,6 +786,10 @@ class Reticulum: interface.ifac_size = 8 if interface != None: + interface.announce_rate_target = announce_rate_target + interface.announce_rate_grace = announce_rate_grace + interface.announce_rate_penalty = announce_rate_penalty + interface.ifac_netname = ifac_netname interface.ifac_netkey = ifac_netkey diff --git a/RNS/Transport.py b/RNS/Transport.py index 8e7d3f9..1f6aaa2 100755 --- a/RNS/Transport.py +++ b/RNS/Transport.py @@ -71,6 +71,7 @@ class Transport: REVERSE_TIMEOUT = 30*60 # Reverse table entries are removed after max 30 minutes DESTINATION_TIMEOUT = PATHFINDER_E # Destination table entries are removed if unused for one week MAX_RECEIPTS = 1024 # Maximum number of receipts to keep track of + MAX_RATE_TIMESTAMPS = 16 # Maximum number of announce timestamps to keep per destination interfaces = [] # All active interfaces destinations = [] # All active destinations @@ -90,6 +91,7 @@ class Transport: held_announces = {} # A table containing temporarily held announce-table entries announce_handlers = [] # A table storing externally registered announce handlers tunnels = {} # A table storing tunnels to other transport instances + announce_rate_table = {} # A table for keeping track of announce rates # Transport control destinations are used # for control purposes like path requests @@ -1094,6 +1096,42 @@ class Transport: if should_add: now = time.time() + + rate_blocked = False + if packet.context != RNS.Packet.PATH_RESPONSE and packet.receiving_interface.announce_rate_target != None: + if not packet.destination_hash in Transport.announce_rate_table: + rate_entry = { "last": now, "rate_violations": 0, "blocked_until": 0, "timestamps": [now]} + Transport.announce_rate_table[packet.destination_hash] = rate_entry + + else: + rate_entry = Transport.announce_rate_table[packet.destination_hash] + rate_entry["timestamps"].append(now) + + while len(rate_entry["timestamps"]) > Transport.MAX_RATE_TIMESTAMPS: + rate_entry["timestamps"].pop(0) + + current_rate = now - rate_entry["last"] + + if now > rate_entry["blocked_until"]: + + if current_rate < packet.receiving_interface.announce_rate_target: + rate_entry["rate_violations"] += 1 + + else: + rate_entry["rate_violations"] = max(0, rate_entry["rate_violations"]-1) + + if rate_entry["rate_violations"] > packet.receiving_interface.announce_rate_grace: + rate_target = packet.receiving_interface.announce_rate_target + rate_penalty = packet.receiving_interface.announce_rate_penalty + rate_entry["blocked_until"] = rate_entry["last"] + rate_target + rate_penalty + rate_blocked = True + else: + rate_entry["last"] = now + + else: + rate_blocked = True + + retries = 0 announce_hops = packet.hops local_rebroadcasts = 0 @@ -1114,23 +1152,27 @@ class Transport: if (RNS.Reticulum.transport_enabled() or Transport.from_local_client(packet)) and packet.context != RNS.Packet.PATH_RESPONSE: # Insert announce into announce table for retransmission - if Transport.from_local_client(packet): - # If the announce is from a local client, - # it is announced immediately, but only one time. - retransmit_timeout = now - retries = Transport.PATHFINDER_R + if rate_blocked: + RNS.log("Blocking rebroadcast of announce from "+RNS.prettyhexrep(packet.destination_hash)+" due to excessive announce rate", RNS.LOG_DEBUG) + + else: + if Transport.from_local_client(packet): + # If the announce is from a local client, + # it is announced immediately, but only one time. + retransmit_timeout = now + retries = Transport.PATHFINDER_R - Transport.announce_table[packet.destination_hash] = [ - now, - retransmit_timeout, - retries, - received_from, - announce_hops, - packet, - local_rebroadcasts, - block_rebroadcasts, - attached_interface - ] + Transport.announce_table[packet.destination_hash] = [ + now, + retransmit_timeout, + retries, + received_from, + announce_hops, + packet, + local_rebroadcasts, + block_rebroadcasts, + attached_interface + ] # TODO: Check from_local_client once and store result elif Transport.from_local_client(packet) and packet.context == RNS.Packet.PATH_RESPONSE: