Added per-interface announce rate control

This commit is contained in:
Mark Qvist 2022-05-14 18:09:38 +02:00
parent f2c0dac217
commit 3871d8615e
5 changed files with 97 additions and 16 deletions

View File

@ -599,6 +599,9 @@ class I2PInterface(Interface):
spawned_interface.ifac_size = self.ifac_size spawned_interface.ifac_size = self.ifac_size
spawned_interface.ifac_netname = self.ifac_netname spawned_interface.ifac_netname = self.ifac_netname
spawned_interface.ifac_netkey = self.ifac_netkey spawned_interface.ifac_netkey = self.ifac_netkey
spawned_interface.announce_rate_target = self.announce_rate_target
spawned_interface.announce_rate_grace = self.announce_rate_grace
spawned_interface.announce_rate_penalty = self.announce_rate_penalty
RNS.log("Spawned new I2PInterface Peer: "+str(spawned_interface), RNS.LOG_VERBOSE) RNS.log("Spawned new I2PInterface Peer: "+str(spawned_interface), RNS.LOG_VERBOSE)
RNS.Transport.interfaces.append(spawned_interface) RNS.Transport.interfaces.append(spawned_interface)
self.clients += 1 self.clients += 1

View File

@ -80,6 +80,10 @@ class LocalClientInterface(Interface):
self.online = True self.online = True
self.writing = False self.writing = False
self.announce_rate_target = None
self.announce_rate_grace = None
self.announce_rate_penalty = None
if connected_socket == None: if connected_socket == None:
thread = threading.Thread(target=self.read_loop) thread = threading.Thread(target=self.read_loop)
thread.setDaemon(True) thread.setDaemon(True)
@ -285,6 +289,10 @@ class LocalServerInterface(Interface):
thread.setDaemon(True) thread.setDaemon(True)
thread.start() thread.start()
self.announce_rate_target = None
self.announce_rate_grace = None
self.announce_rate_penalty = None
self.bitrate = 1000*1000*1000 self.bitrate = 1000*1000*1000
self.online = True self.online = True

View File

@ -456,6 +456,9 @@ class TCPServerInterface(Interface):
spawned_interface.ifac_size = self.ifac_size spawned_interface.ifac_size = self.ifac_size
spawned_interface.ifac_netname = self.ifac_netname spawned_interface.ifac_netname = self.ifac_netname
spawned_interface.ifac_netkey = self.ifac_netkey spawned_interface.ifac_netkey = self.ifac_netkey
spawned_interface.announce_rate_target = self.announce_rate_target
spawned_interface.announce_rate_grace = self.announce_rate_grace
spawned_interface.announce_rate_penalty = self.announce_rate_penalty
spawned_interface.online = True spawned_interface.online = True
RNS.log("Spawned new TCPClient Interface: "+str(spawned_interface), RNS.LOG_VERBOSE) RNS.log("Spawned new TCPClient Interface: "+str(spawned_interface), RNS.LOG_VERBOSE)
RNS.Transport.interfaces.append(spawned_interface) RNS.Transport.interfaces.append(spawned_interface)

View File

@ -394,6 +394,27 @@ class Reticulum:
if c.as_int("bitrate") >= Reticulum.MINIMUM_BITRATE: if c.as_int("bitrate") >= Reticulum.MINIMUM_BITRATE:
configured_bitrate = c.as_int("bitrate") configured_bitrate = c.as_int("bitrate")
announce_rate_target = None
if "announce_rate_target" in c:
if c.as_int("announce_rate_target") > 0:
announce_rate_target = c.as_int("announce_rate_target")
announce_rate_grace = None
if "announce_rate_grace" in c:
if c.as_int("announce_rate_grace") >= 0:
announce_rate_grace = c.as_int("announce_rate_grace")
announce_rate_penalty = None
if "announce_rate_penalty" in c:
if c.as_int("announce_rate_penalty") >= 0:
announce_rate_penalty = c.as_int("announce_rate_penalty")
if announce_rate_target != None and announce_rate_grace == None:
announce_rate_grace = 0
if announce_rate_target != None and announce_rate_penalty == None:
announce_rate_penalty = 0
announce_cap = Reticulum.ANNOUNCE_CAP/100.0 announce_cap = Reticulum.ANNOUNCE_CAP/100.0
if "announce_cap" in c: if "announce_cap" in c:
if c.as_float("announce_cap") > 0 and c.as_float("announce_cap") <= 100: if c.as_float("announce_cap") > 0 and c.as_float("announce_cap") <= 100:
@ -765,6 +786,10 @@ class Reticulum:
interface.ifac_size = 8 interface.ifac_size = 8
if interface != None: if interface != None:
interface.announce_rate_target = announce_rate_target
interface.announce_rate_grace = announce_rate_grace
interface.announce_rate_penalty = announce_rate_penalty
interface.ifac_netname = ifac_netname interface.ifac_netname = ifac_netname
interface.ifac_netkey = ifac_netkey interface.ifac_netkey = ifac_netkey

View File

@ -71,6 +71,7 @@ class Transport:
REVERSE_TIMEOUT = 30*60 # Reverse table entries are removed after max 30 minutes REVERSE_TIMEOUT = 30*60 # Reverse table entries are removed after max 30 minutes
DESTINATION_TIMEOUT = PATHFINDER_E # Destination table entries are removed if unused for one week DESTINATION_TIMEOUT = PATHFINDER_E # Destination table entries are removed if unused for one week
MAX_RECEIPTS = 1024 # Maximum number of receipts to keep track of MAX_RECEIPTS = 1024 # Maximum number of receipts to keep track of
MAX_RATE_TIMESTAMPS = 16 # Maximum number of announce timestamps to keep per destination
interfaces = [] # All active interfaces interfaces = [] # All active interfaces
destinations = [] # All active destinations destinations = [] # All active destinations
@ -90,6 +91,7 @@ class Transport:
held_announces = {} # A table containing temporarily held announce-table entries held_announces = {} # A table containing temporarily held announce-table entries
announce_handlers = [] # A table storing externally registered announce handlers announce_handlers = [] # A table storing externally registered announce handlers
tunnels = {} # A table storing tunnels to other transport instances tunnels = {} # A table storing tunnels to other transport instances
announce_rate_table = {} # A table for keeping track of announce rates
# Transport control destinations are used # Transport control destinations are used
# for control purposes like path requests # for control purposes like path requests
@ -1094,6 +1096,42 @@ class Transport:
if should_add: if should_add:
now = time.time() now = time.time()
rate_blocked = False
if packet.context != RNS.Packet.PATH_RESPONSE and packet.receiving_interface.announce_rate_target != None:
if not packet.destination_hash in Transport.announce_rate_table:
rate_entry = { "last": now, "rate_violations": 0, "blocked_until": 0, "timestamps": [now]}
Transport.announce_rate_table[packet.destination_hash] = rate_entry
else:
rate_entry = Transport.announce_rate_table[packet.destination_hash]
rate_entry["timestamps"].append(now)
while len(rate_entry["timestamps"]) > Transport.MAX_RATE_TIMESTAMPS:
rate_entry["timestamps"].pop(0)
current_rate = now - rate_entry["last"]
if now > rate_entry["blocked_until"]:
if current_rate < packet.receiving_interface.announce_rate_target:
rate_entry["rate_violations"] += 1
else:
rate_entry["rate_violations"] = max(0, rate_entry["rate_violations"]-1)
if rate_entry["rate_violations"] > packet.receiving_interface.announce_rate_grace:
rate_target = packet.receiving_interface.announce_rate_target
rate_penalty = packet.receiving_interface.announce_rate_penalty
rate_entry["blocked_until"] = rate_entry["last"] + rate_target + rate_penalty
rate_blocked = True
else:
rate_entry["last"] = now
else:
rate_blocked = True
retries = 0 retries = 0
announce_hops = packet.hops announce_hops = packet.hops
local_rebroadcasts = 0 local_rebroadcasts = 0
@ -1114,23 +1152,27 @@ class Transport:
if (RNS.Reticulum.transport_enabled() or Transport.from_local_client(packet)) and packet.context != RNS.Packet.PATH_RESPONSE: if (RNS.Reticulum.transport_enabled() or Transport.from_local_client(packet)) and packet.context != RNS.Packet.PATH_RESPONSE:
# Insert announce into announce table for retransmission # Insert announce into announce table for retransmission
if Transport.from_local_client(packet): if rate_blocked:
# If the announce is from a local client, RNS.log("Blocking rebroadcast of announce from "+RNS.prettyhexrep(packet.destination_hash)+" due to excessive announce rate", RNS.LOG_DEBUG)
# it is announced immediately, but only one time.
retransmit_timeout = now
retries = Transport.PATHFINDER_R
Transport.announce_table[packet.destination_hash] = [ else:
now, if Transport.from_local_client(packet):
retransmit_timeout, # If the announce is from a local client,
retries, # it is announced immediately, but only one time.
received_from, retransmit_timeout = now
announce_hops, retries = Transport.PATHFINDER_R
packet,
local_rebroadcasts, Transport.announce_table[packet.destination_hash] = [
block_rebroadcasts, now,
attached_interface retransmit_timeout,
] retries,
received_from,
announce_hops,
packet,
local_rebroadcasts,
block_rebroadcasts,
attached_interface
]
# TODO: Check from_local_client once and store result # TODO: Check from_local_client once and store result
elif Transport.from_local_client(packet) and packet.context == RNS.Packet.PATH_RESPONSE: elif Transport.from_local_client(packet) and packet.context == RNS.Packet.PATH_RESPONSE: