Compare commits

..

4 Commits

Author SHA1 Message Date
Pavol Rusnak
011448c22d
remove dangling spaces 2024-10-08 22:03:30 +02:00
Pavol Rusnak
2a5a439921
modernize 2024-10-08 22:03:09 +02:00
Mark Qvist
527f6cc906 Fuxed typo 2024-10-07 22:10:17 +02:00
Mark Qvist
a0d61f6441 Added error descriptions for modem communication timeout 2024-10-07 20:55:34 +02:00
25 changed files with 84 additions and 143 deletions

View File

@ -93,9 +93,7 @@ def announceLoop(destination_1, destination_2):
# Send the announce including the app data # Send the announce including the app data
destination_1.announce(app_data=fruit.encode("utf-8")) destination_1.announce(app_data=fruit.encode("utf-8"))
RNS.log( RNS.log(
"Sent announce from "+ f"Sent announce from {RNS.prettyhexrep(destination_1.hash)} ({destination_1.name})"
RNS.prettyhexrep(destination_1.hash)+
" ("+destination_1.name+")"
) )
# Randomly select a noble gas # Randomly select a noble gas
@ -104,9 +102,7 @@ def announceLoop(destination_1, destination_2):
# Send the announce including the app data # Send the announce including the app data
destination_2.announce(app_data=noble_gas.encode("utf-8")) destination_2.announce(app_data=noble_gas.encode("utf-8"))
RNS.log( RNS.log(
"Sent announce from "+ f"Sent announce from {RNS.prettyhexrep(destination_2.hash)} ({destination_2.name})"
RNS.prettyhexrep(destination_2.hash)+
" ("+destination_2.name+")"
) )
# We will need to define an announce handler class that # We will need to define an announce handler class that
@ -131,8 +127,7 @@ class ExampleAnnounceHandler:
if app_data: if app_data:
RNS.log( RNS.log(
"The announce contained the following app data: "+ f"The announce contained the following app data: {app_data.decode('utf-8')}"
app_data.decode("utf-8")
) )
########################################################## ##########################################################

View File

@ -54,9 +54,7 @@ def packet_callback(data, packet):
def broadcastLoop(destination): def broadcastLoop(destination):
# Let the user know that everything is ready # Let the user know that everything is ready
RNS.log( RNS.log(
"Broadcast example "+ f"Broadcast example {RNS.prettyhexrep(destination.hash)} running, enter text and hit enter to broadcast (Ctrl-C to quit)"
RNS.prettyhexrep(destination.hash)+
" running, enter text and hit enter to broadcast (Ctrl-C to quit)"
) )
# We enter a loop that runs until the users exits. # We enter a loop that runs until the users exits.

View File

@ -61,9 +61,7 @@ def server(configpath):
def server_loop(destination): def server_loop(destination):
# Let the user know that everything is ready # Let the user know that everything is ready
RNS.log( RNS.log(
"Link buffer example "+ f"Link buffer example {RNS.prettyhexrep(destination.hash)} running, waiting for a connection."
RNS.prettyhexrep(destination.hash)+
" running, waiting for a connection."
) )
RNS.log("Hit enter to manually send an announce (Ctrl-C to quit)") RNS.log("Hit enter to manually send an announce (Ctrl-C to quit)")

View File

@ -124,9 +124,7 @@ def server(configpath):
def server_loop(destination): def server_loop(destination):
# Let the user know that everything is ready # Let the user know that everything is ready
RNS.log( RNS.log(
"Link example "+ f"Link example {RNS.prettyhexrep(destination.hash)} running, waiting for a connection."
RNS.prettyhexrep(destination.hash)+
" running, waiting for a connection."
) )
RNS.log("Hit enter to manually send an announce (Ctrl-C to quit)") RNS.log("Hit enter to manually send an announce (Ctrl-C to quit)")
@ -280,14 +278,11 @@ def client_loop():
channel.send(message) channel.send(message)
else: else:
RNS.log( RNS.log(
"Cannot send this packet, the data size of "+ f"Cannot send this packet, the data size of {packed_size} bytes exceeds the link packet MDU of {channel.MDU} bytes",
str(packed_size)+" bytes exceeds the link packet MDU of "+
str(channel.MDU)+" bytes",
RNS.LOG_ERROR RNS.LOG_ERROR
) )
else: else:
RNS.log("Channel is not ready to send, please wait for " + RNS.log(f"Channel is not ready to send, please wait for pending messages to complete.", RNS.LOG_ERROR)
"pending messages to complete.", RNS.LOG_ERROR)
except Exception as e: except Exception as e:
RNS.log(f"Error while sending data over the link: {e}") RNS.log(f"Error while sending data over the link: {e}")

View File

@ -64,9 +64,7 @@ def server(configpath):
def announceLoop(destination): def announceLoop(destination):
# Let the user know that everything is ready # Let the user know that everything is ready
RNS.log( RNS.log(
"Echo server "+ f"Echo server {RNS.prettyhexrep(destination.hash)} running, hit enter to manually send an announce (Ctrl-C to quit)"
RNS.prettyhexrep(destination.hash)+
" running, hit enter to manually send an announce (Ctrl-C to quit)"
) )
# We enter a loop that runs until the users exits. # We enter a loop that runs until the users exits.
@ -142,9 +140,7 @@ def client(destination_hexhash, configpath, timeout=None):
# Tell the user that the client is ready! # Tell the user that the client is ready!
RNS.log( RNS.log(
"Echo client ready, hit enter to send echo request to "+ f"Echo client ready, hit enter to send echo request to {destination_hexhash} (Ctrl-C to quit)"
destination_hexhash+
" (Ctrl-C to quit)"
) )
# We enter a loop that runs until the user exits. # We enter a loop that runs until the user exits.
@ -247,10 +243,7 @@ def packet_delivered(receipt):
reception_stats += f" [SNR {receipt.proof_packet.snr} dB]" reception_stats += f" [SNR {receipt.proof_packet.snr} dB]"
RNS.log( RNS.log(
"Valid reply received from "+ f"Valid reply received from {RNS.prettyhexrep(receipt.destination.hash)}, round-trip time is {rttstring}{reception_stats}"
RNS.prettyhexrep(receipt.destination.hash)+
", round-trip time is "+rttstring+
reception_stats
) )
# This function is called if a packet times out. # This function is called if a packet times out.

View File

@ -53,9 +53,7 @@ def server(configpath):
def server_loop(destination): def server_loop(destination):
# Let the user know that everything is ready # Let the user know that everything is ready
RNS.log( RNS.log(
"Link identification example "+ f"Link identification example {RNS.prettyhexrep(destination.hash)} running, waiting for a connection."
RNS.prettyhexrep(destination.hash)+
" running, waiting for a connection."
) )
RNS.log("Hit enter to manually send an announce (Ctrl-C to quit)") RNS.log("Hit enter to manually send an announce (Ctrl-C to quit)")
@ -209,9 +207,7 @@ def client_loop():
RNS.Packet(server_link, data).send() RNS.Packet(server_link, data).send()
else: else:
RNS.log( RNS.log(
"Cannot send this packet, the data size of "+ f"Cannot send this packet, the data size of {len(data)} bytes exceeds the link packet MDU of {RNS.Link.MDU} bytes",
str(len(data))+" bytes exceeds the link packet MDU of "+
str(RNS.Link.MDU)+" bytes",
RNS.LOG_ERROR RNS.LOG_ERROR
) )

View File

@ -53,9 +53,7 @@ def server(configpath):
def server_loop(destination): def server_loop(destination):
# Let the user know that everything is ready # Let the user know that everything is ready
RNS.log( RNS.log(
"Link example "+ f"Link example {RNS.prettyhexrep(destination.hash)} running, waiting for a connection."
RNS.prettyhexrep(destination.hash)+
" running, waiting for a connection."
) )
RNS.log("Hit enter to manually send an announce (Ctrl-C to quit)") RNS.log("Hit enter to manually send an announce (Ctrl-C to quit)")
@ -189,9 +187,7 @@ def client_loop():
RNS.Packet(server_link, data).send() RNS.Packet(server_link, data).send()
else: else:
RNS.log( RNS.log(
"Cannot send this packet, the data size of "+ f"Cannot send this packet, the data size of {len(data)} bytes exceeds the link packet MDU of {RNS.Link.MDU} bytes",
str(len(data))+" bytes exceeds the link packet MDU of "+
str(RNS.Link.MDU)+" bytes",
RNS.LOG_ERROR RNS.LOG_ERROR
) )

View File

@ -51,9 +51,7 @@ def program_setup(configpath):
def announceLoop(destination): def announceLoop(destination):
# Let the user know that everything is ready # Let the user know that everything is ready
RNS.log( RNS.log(
"Minimal example "+ f"Minimal example {RNS.prettyhexrep(destination.hash)} running, hit enter to manually send an announce (Ctrl-C to quit)"
RNS.prettyhexrep(destination.hash)+
" running, hit enter to manually send an announce (Ctrl-C to quit)"
) )
# We enter a loop that runs until the users exits. # We enter a loop that runs until the users exits.

View File

@ -75,9 +75,7 @@ def server(configpath):
def announceLoop(destination): def announceLoop(destination):
# Let the user know that everything is ready # Let the user know that everything is ready
RNS.log( RNS.log(
"Ratcheted echo server "+ f"Ratcheted echo server {RNS.prettyhexrep(destination.hash)} running, hit enter to manually send an announce (Ctrl-C to quit)"
RNS.prettyhexrep(destination.hash)+
" running, hit enter to manually send an announce (Ctrl-C to quit)"
) )
# We enter a loop that runs until the users exits. # We enter a loop that runs until the users exits.
@ -153,9 +151,7 @@ def client(destination_hexhash, configpath, timeout=None):
# Tell the user that the client is ready! # Tell the user that the client is ready!
RNS.log( RNS.log(
"Echo client ready, hit enter to send echo request to "+ f"Echo client ready, hit enter to send echo request to {destination_hexhash} (Ctrl-C to quit)"
destination_hexhash+
" (Ctrl-C to quit)"
) )
# We enter a loop that runs until the user exits. # We enter a loop that runs until the user exits.
@ -259,10 +255,7 @@ def packet_delivered(receipt):
reception_stats += f" [SNR {receipt.proof_packet.snr} dB]" reception_stats += f" [SNR {receipt.proof_packet.snr} dB]"
RNS.log( RNS.log(
"Valid reply received from "+ f"Valid reply received from {RNS.prettyhexrep(receipt.destination.hash)}, round-trip time is {rttstring}{reception_stats}"
RNS.prettyhexrep(receipt.destination.hash)+
", round-trip time is "+rttstring+
reception_stats
) )
# This function is called if a packet times out. # This function is called if a packet times out.

View File

@ -67,9 +67,7 @@ def server(configpath):
def server_loop(destination): def server_loop(destination):
# Let the user know that everything is ready # Let the user know that everything is ready
RNS.log( RNS.log(
"Request example "+ f"Request example {RNS.prettyhexrep(destination.hash)} running, waiting for a connection."
RNS.prettyhexrep(destination.hash)+
" running, waiting for a connection."
) )
RNS.log("Hit enter to manually send an announce (Ctrl-C to quit)") RNS.log("Hit enter to manually send an announce (Ctrl-C to quit)")

View File

@ -62,9 +62,7 @@ def server(configpath):
def server_loop(destination): def server_loop(destination):
# Let the user know that everything is ready # Let the user know that everything is ready
RNS.log( RNS.log(
"Speedtest "+ f"Speedtest {RNS.prettyhexrep(destination.hash)} running, waiting for a connection."
RNS.prettyhexrep(destination.hash)+
" running, waiting for a connection."
) )
RNS.log("Hit enter to manually send an announce (Ctrl-C to quit)") RNS.log("Hit enter to manually send an announce (Ctrl-C to quit)")

View File

@ -60,13 +60,11 @@ class HMAC:
if hasattr(self._inner, 'block_size'): if hasattr(self._inner, 'block_size'):
blocksize = self._inner.block_size blocksize = self._inner.block_size
if blocksize < 16: if blocksize < 16:
_warnings.warn('block_size of %d seems too small; using our ' _warnings.warn(f'block_size of {int(blocksize)} seems too small; using our default of {int(self.blocksize)}.',
'default of %d.' % (blocksize, self.blocksize),
RuntimeWarning, 2) RuntimeWarning, 2)
blocksize = self.blocksize blocksize = self.blocksize
else: else:
_warnings.warn('No block_size attribute on given digest object; ' _warnings.warn(f'No block_size attribute on given digest object; Assuming {int(self.blocksize)}.',
'Assuming %d.' % (self.blocksize),
RuntimeWarning, 2) RuntimeWarning, 2)
blocksize = self.blocksize blocksize = self.blocksize

View File

@ -89,6 +89,7 @@ class KISS():
ERROR_EEPROM_LOCKED = 0x03 ERROR_EEPROM_LOCKED = 0x03
ERROR_QUEUE_FULL = 0x04 ERROR_QUEUE_FULL = 0x04
ERROR_MEMORY_LOW = 0x05 ERROR_MEMORY_LOW = 0x05
ERROR_MODEM_TIMEOUT = 0x06
ERROR_INVALID_FIRMWARE = 0x10 ERROR_INVALID_FIRMWARE = 0x10
ERROR_INVALID_BLE_MTU = 0x20 ERROR_INVALID_BLE_MTU = 0x20
@ -1254,6 +1255,9 @@ class RNodeInterface(Interface):
elif (byte == KISS.ERROR_MEMORY_LOW): elif (byte == KISS.ERROR_MEMORY_LOW):
RNS.log(f"{self} hardware error (code {RNS.hexrep(byte)}): Memory exhausted", RNS.LOG_ERROR) RNS.log(f"{self} hardware error (code {RNS.hexrep(byte)}): Memory exhausted", RNS.LOG_ERROR)
self.hw_errors.append({"error": KISS.ERROR_MEMORY_LOW, "description": "Memory exhausted on connected device"}) self.hw_errors.append({"error": KISS.ERROR_MEMORY_LOW, "description": "Memory exhausted on connected device"})
elif (byte == KISS.ERROR_MODEM_TIMEOUT):
RNS.log(f"{self} hardware error (code {RNS.hexrep(byte)}): Modem communication timed out", RNS.LOG_ERROR)
self.hw_errors.append({"error": KISS.ERROR_MODEM_TIMEOUT, "description": "Modem communication timed out on connected device"})
else: else:
RNS.log(f"{self} hardware error (code {RNS.hexrep(byte)})", RNS.LOG_ERROR) RNS.log(f"{self} hardware error (code {RNS.hexrep(byte)})", RNS.LOG_ERROR)
raise OSError("Unknown hardware failure") raise OSError("Unknown hardware failure")

View File

@ -169,7 +169,7 @@ class AutoInterface(Interface):
self.group_hash = RNS.Identity.full_hash(self.group_id) self.group_hash = RNS.Identity.full_hash(self.group_id)
g = self.group_hash g = self.group_hash
#gt = "{:02x}".format(g[1]+(g[0]<<8)) #gt = f"{g[1] + (g[0] << 8):02x}"
gt = "0" gt = "0"
gt += f":{g[3] + (g[2] << 8):02x}" gt += f":{g[3] + (g[2] << 8):02x}"
gt += f":{g[5] + (g[4] << 8):02x}" gt += f":{g[5] + (g[4] << 8):02x}"

View File

@ -80,6 +80,7 @@ class KISS():
ERROR_EEPROM_LOCKED = 0x03 ERROR_EEPROM_LOCKED = 0x03
ERROR_QUEUE_FULL = 0x04 ERROR_QUEUE_FULL = 0x04
ERROR_MEMORY_LOW = 0x05 ERROR_MEMORY_LOW = 0x05
ERROR_MODEM_TIMEOUT = 0x06
PLATFORM_AVR = 0x90 PLATFORM_AVR = 0x90
PLATFORM_ESP32 = 0x80 PLATFORM_ESP32 = 0x80
@ -850,6 +851,9 @@ class RNodeInterface(Interface):
elif (byte == KISS.ERROR_MEMORY_LOW): elif (byte == KISS.ERROR_MEMORY_LOW):
RNS.log(f"{self} hardware error (code {RNS.hexrep(byte)}): Memory exhausted", RNS.LOG_ERROR) RNS.log(f"{self} hardware error (code {RNS.hexrep(byte)}): Memory exhausted", RNS.LOG_ERROR)
self.hw_errors.append({"error": KISS.ERROR_MEMORY_LOW, "description": "Memory exhausted on connected device"}) self.hw_errors.append({"error": KISS.ERROR_MEMORY_LOW, "description": "Memory exhausted on connected device"})
elif (byte == KISS.ERROR_MODEM_TIMEOUT):
RNS.log(f"{self} hardware error (code {RNS.hexrep(byte)}): Modem communication timed out", RNS.LOG_ERROR)
self.hw_errors.append({"error": KISS.ERROR_MODEM_TIMEOUT, "description": "Modem communication timed out on connected device"})
else: else:
RNS.log(f"{self} hardware error (code {RNS.hexrep(byte)})", RNS.LOG_ERROR) RNS.log(f"{self} hardware error (code {RNS.hexrep(byte)})", RNS.LOG_ERROR)
raise OSError("Unknown hardware failure") raise OSError("Unknown hardware failure")

View File

@ -278,8 +278,8 @@ models = {
0xBB: [850000000, 950000000, 17, "850 - 950 MHz", "rnode_firmware_lora32v10.zip", "SX1276"], 0xBB: [850000000, 950000000, 17, "850 - 950 MHz", "rnode_firmware_lora32v10.zip", "SX1276"],
0xC4: [420000000, 520000000, 17, "420 - 520 MHz", "rnode_firmware_heltec32v2.zip", "SX1278"], 0xC4: [420000000, 520000000, 17, "420 - 520 MHz", "rnode_firmware_heltec32v2.zip", "SX1278"],
0xC9: [850000000, 950000000, 17, "850 - 950 MHz", "rnode_firmware_heltec32v2.zip", "SX1276"], 0xC9: [850000000, 950000000, 17, "850 - 950 MHz", "rnode_firmware_heltec32v2.zip", "SX1276"],
0xC5: [470000000, 510000000, 21, "470 - 510 MHz", "rnode_firmware_heltec32v3.zip", "SX1262"], 0xC5: [420000000, 520000000, 21, "420 - 520 MHz", "rnode_firmware_heltec32v3.zip", "SX1262"],
0xCA: [863000000, 928000000, 21, "863 - 928 MHz", "rnode_firmware_heltec32v3.zip", "SX1262"], 0xCA: [850000000, 950000000, 21, "850 - 950 MHz", "rnode_firmware_heltec32v3.zip", "SX1262"],
0xE4: [420000000, 520000000, 17, "420 - 520 MHz", "rnode_firmware_tbeam.zip", "SX1278"], 0xE4: [420000000, 520000000, 17, "420 - 520 MHz", "rnode_firmware_tbeam.zip", "SX1278"],
0xE9: [850000000, 950000000, 17, "850 - 950 MHz", "rnode_firmware_tbeam.zip", "SX1276"], 0xE9: [850000000, 950000000, 17, "850 - 950 MHz", "rnode_firmware_tbeam.zip", "SX1276"],
0xD4: [420000000, 520000000, 22, "420 - 520 MHz", "rnode_firmware_tdeck.zip", "SX1268"], 0xD4: [420000000, 520000000, 22, "420 - 520 MHz", "rnode_firmware_tdeck.zip", "SX1268"],
@ -492,7 +492,7 @@ class RNode():
command_buffer = command_buffer+bytes([byte]) command_buffer = command_buffer+bytes([byte])
if (len(command_buffer) == 4): if (len(command_buffer) == 4):
self.r_bt_pin = command_buffer[0] << 24 | command_buffer[1] << 16 | command_buffer[2] << 8 | command_buffer[3] self.r_bt_pin = command_buffer[0] << 24 | command_buffer[1] << 16 | command_buffer[2] << 8 | command_buffer[3]
RNS.log("Bluetooth pairing PIN is: {:06d}".format(self.r_bt_pin)) RNS.log(f"Bluetooth pairing PIN is: {self.r_bt_pin:06d}")
elif (command == KISS.CMD_DEV_HASH): elif (command == KISS.CMD_DEV_HASH):
if (byte == KISS.FESC): if (byte == KISS.FESC):

View File

@ -178,11 +178,7 @@ def program_setup(configdir, destination_hexhash, size=None, full_name = None, v
reception_stats += f" [SNR {receipt.proof_packet.snr} dB]" reception_stats += f" [SNR {receipt.proof_packet.snr} dB]"
print( print(
"Valid reply from "+ f"Valid reply from {RNS.prettyhexrep(receipt.destination.hash)}\nRound-trip time is {rttstring} over {hops} hop{ms}{reception_stats}\n"
RNS.prettyhexrep(receipt.destination.hash)+
"\nRound-trip time is "+rttstring+
" over "+str(hops)+" hop"+ms+
reception_stats+"\n"
) )
else: else:

View File

@ -42,12 +42,12 @@ def size_str(num, suffix='B'):
for unit in units: for unit in units:
if abs(num) < 1000.0: if abs(num) < 1000.0:
if unit == "": if unit == "":
return "{:.0f} {}{}".format(num, unit, suffix) return f"{num:.0f} {unit}{suffix}"
else: else:
return "{:.2f} {}{}".format(num, unit, suffix) return f"{num:.2f} {unit}{suffix}"
num /= 1000.0 num /= 1000.0
return "{:.2f}{}{}".format(num, last_unit, suffix) return f"{num:.2f}{last_unit}{suffix}"
request_result = None request_result = None
request_concluded = False request_concluded = False
@ -144,7 +144,7 @@ def program_setup(configdir, dispall=False, verbosity=0, name_filter=None, json=
try: try:
dest_len = (RNS.Reticulum.TRUNCATED_HASHLENGTH//8)*2 dest_len = (RNS.Reticulum.TRUNCATED_HASHLENGTH//8)*2
if len(remote) != dest_len: if len(remote) != dest_len:
raise ValueError("Destination length is invalid, must be {hex} hexadecimal characters ({byte} bytes).".format(hex=dest_len, byte=dest_len//2)) raise ValueError(f"Destination length is invalid, must be {dest_len} hexadecimal characters ({dest_len // 2} bytes).")
try: try:
identity_hash = bytes.fromhex(remote) identity_hash = bytes.fromhex(remote)
destination_hash = RNS.Destination.hash_from_name_and_identity("rnstransport.remote.management", identity_hash) destination_hash = RNS.Destination.hash_from_name_and_identity("rnstransport.remote.management", identity_hash)
@ -281,13 +281,13 @@ def program_setup(configdir, dispall=False, verbosity=0, name_filter=None, json=
if "ifac_netname" in ifstat and ifstat["ifac_netname"] != None: if "ifac_netname" in ifstat and ifstat["ifac_netname"] != None:
print(" Network : {nn}".format(nn=ifstat["ifac_netname"])) print(" Network : {nn}".format(nn=ifstat["ifac_netname"]))
print(" Status : {ss}".format(ss=ss)) print(f" Status : {ss}")
if clients != None and clients_string != "": if clients != None and clients_string != "":
print(" "+clients_string) print(" "+clients_string)
if not (name.startswith("Shared Instance[") or name.startswith("TCPInterface[Client") or name.startswith("LocalInterface[")): if not (name.startswith("Shared Instance[") or name.startswith("TCPInterface[Client") or name.startswith("LocalInterface[")):
print(" Mode : {mode}".format(mode=modestr)) print(f" Mode : {modestr}")
if "bitrate" in ifstat and ifstat["bitrate"] != None: if "bitrate" in ifstat and ifstat["bitrate"] != None:
print(" Rate : {ss}".format(ss=speed_str(ifstat["bitrate"]))) print(" Rate : {ss}".format(ss=speed_str(ifstat["bitrate"])))
@ -295,7 +295,7 @@ def program_setup(configdir, dispall=False, verbosity=0, name_filter=None, json=
if "battery_percent" in ifstat and ifstat["battery_percent"] != None: if "battery_percent" in ifstat and ifstat["battery_percent"] != None:
try: try:
bpi = int(ifstat["battery_percent"]) bpi = int(ifstat["battery_percent"])
print(" Battery : {bp}%".format(bp=bpi)) print(f" Battery : {bpi}%")
except: except:
pass pass
@ -488,10 +488,10 @@ def speed_str(num, suffix='bps'):
for unit in units: for unit in units:
if abs(num) < 1000.0: if abs(num) < 1000.0:
return "{:3.2f} {}{}".format(num, unit, suffix) return f"{num:3.2f} {unit}{suffix}"
num /= 1000.0 num /= 1000.0
return "{:.2f} {}{}".format(num, last_unit, suffix) return f"{num:.2f} {last_unit}{suffix}"
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@ -159,12 +159,12 @@ def hexrep(data, delimit=True):
delimiter = ":" delimiter = ":"
if not delimit: if not delimit:
delimiter = "" delimiter = ""
hexrep = delimiter.join("{:02x}".format(c) for c in data) hexrep = delimiter.join(f"{c:02x}" for c in data)
return hexrep return hexrep
def prettyhexrep(data): def prettyhexrep(data):
delimiter = "" delimiter = ""
hexrep = "<"+delimiter.join("{:02x}".format(c) for c in data)+">" hexrep = "<"+delimiter.join(f"{c:02x}" for c in data)+">"
return hexrep return hexrep
def prettyspeed(num, suffix="b"): def prettyspeed(num, suffix="b"):
@ -182,12 +182,12 @@ def prettysize(num, suffix='B'):
for unit in units: for unit in units:
if abs(num) < 1000.0: if abs(num) < 1000.0:
if unit == "": if unit == "":
return "{:.0f} {}{}".format(num, unit, suffix) return f"{num:.0f} {unit}{suffix}"
else: else:
return "{:.2f} {}{}".format(num, unit, suffix) return f"{num:.2f} {unit}{suffix}"
num /= 1000.0 num /= 1000.0
return "{:.2f}{}{}".format(num, last_unit, suffix) return f"{num:.2f}{last_unit}{suffix}"
def prettyfrequency(hz, suffix="Hz"): def prettyfrequency(hz, suffix="Hz"):
num = hz*1e6 num = hz*1e6
@ -196,10 +196,10 @@ def prettyfrequency(hz, suffix="Hz"):
for unit in units: for unit in units:
if abs(num) < 1000.0: if abs(num) < 1000.0:
return "{:.2f} {}{}".format(num, unit, suffix) return f"{num:.2f} {unit}{suffix}"
num /= 1000.0 num /= 1000.0
return "{:.2f}{}{}".format(num, last_unit, suffix) return f"{num:.2f}{last_unit}{suffix}"
def prettydistance(m, suffix="m"): def prettydistance(m, suffix="m"):
num = m*1e6 num = m*1e6
@ -212,10 +212,10 @@ def prettydistance(m, suffix="m"):
if unit == "c": divisor = 100 if unit == "c": divisor = 100
if abs(num) < divisor: if abs(num) < divisor:
return "{:.2f} {}{}".format(num, unit, suffix) return f"{num:.2f} {unit}{suffix}"
num /= divisor num /= divisor
return "{:.2f} {}{}".format(num, last_unit, suffix) return f"{num:.2f} {last_unit}{suffix}"
def prettytime(time, verbose=False, compact=False): def prettytime(time, verbose=False, compact=False):
days = int(time // (24 * 3600)) days = int(time // (24 * 3600))

View File

@ -745,7 +745,7 @@ class Section(dict):
return self[key] return self[key]
except MissingInterpolationOption: except MissingInterpolationOption:
return dict.__getitem__(self, key) return dict.__getitem__(self, key)
return '{%s}' % ', '.join([('{}: {}'.format(repr(key), repr(_getval(key)))) return '{%s}' % ', '.join([f'{key!r}: {_getval(key)!r}'
for key in (self.scalars + self.sections)]) for key in (self.scalars + self.sections)])
__str__ = __repr__ __str__ = __repr__
@ -1363,9 +1363,7 @@ class ConfigObj(Section):
return self[key] return self[key]
except MissingInterpolationOption: except MissingInterpolationOption:
return dict.__getitem__(self, key) return dict.__getitem__(self, key)
return ('ConfigObj({%s})' % return ('ConfigObj({%s})' % ', '.join([f'{key!r}: {_getval(key)!r}' for key in (self.scalars + self.sections)]))
', '.join([('{}: {}'.format(repr(key), repr(_getval(key))))
for key in (self.scalars + self.sections)]))
def _handle_bom(self, infile): def _handle_bom(self, infile):
@ -1986,20 +1984,12 @@ class ConfigObj(Section):
val = self._decode_element(self._quote(this_entry)) val = self._decode_element(self._quote(this_entry))
else: else:
val = repr(this_entry) val = repr(this_entry)
return '{}{}{}{}{}'.format(indent_string, return f"{indent_string}{self._decode_element(self._quote(entry, multiline=False))}{self._a_to_u(' = ')}{val}{self._decode_element(comment)}"
self._decode_element(self._quote(entry, multiline=False)),
self._a_to_u(' = '),
val,
self._decode_element(comment))
def _write_marker(self, indent_string, depth, entry, comment): def _write_marker(self, indent_string, depth, entry, comment):
"""Write a section marker line""" """Write a section marker line"""
return '{}{}{}{}{}'.format(indent_string, return f"{indent_string}{self._a_to_u('[' * depth)}{self._quote(self._decode_element(entry), multiline=False)}{self._a_to_u(']' * depth)}{self._decode_element(comment)}"
self._a_to_u('[' * depth),
self._quote(self._decode_element(entry), multiline=False),
self._a_to_u(']' * depth),
self._decode_element(comment))
def _handle_comment(self, comment): def _handle_comment(self, comment):

View File

@ -54,13 +54,11 @@ def hello(min_version, max_version):
return f"HELLO VERSION MIN={min_version} MAX={max_version}\n".encode() return f"HELLO VERSION MIN={min_version} MAX={max_version}\n".encode()
def session_create(style, session_id, destination, options=""): def session_create(style, session_id, destination, options=""):
return "SESSION CREATE STYLE={} ID={} DESTINATION={} {}\n".format( return f"SESSION CREATE STYLE={style} ID={session_id} DESTINATION={destination} {options}\n".encode()
style, session_id, destination, options).encode()
def stream_connect(session_id, destination, silent="false"): def stream_connect(session_id, destination, silent="false"):
return "STREAM CONNECT ID={} DESTINATION={} SILENT={}\n".format( return f"STREAM CONNECT ID={session_id} DESTINATION={destination} SILENT={silent}\n".encode()
session_id, destination, silent).encode()
def stream_accept(session_id, silent="false"): def stream_accept(session_id, silent="false"):
return f"STREAM ACCEPT ID={session_id} SILENT={silent}\n".encode() return f"STREAM ACCEPT ID={session_id} SILENT={silent}\n".encode()

View File

@ -141,8 +141,7 @@ class ServerTunnel(I2PTunnel):
# data and dest may come in one chunk # data and dest may come in one chunk
dest, data = incoming.split(b"\n", 1) dest, data = incoming.split(b"\n", 1)
remote_destination = sam.Destination(dest.decode()) remote_destination = sam.Destination(dest.decode())
logger.debug("{} client connected: {}.b32.i2p".format( logger.debug(f"{self.session_name} client connected: {remote_destination.base32}.b32.i2p")
self.session_name, remote_destination.base32))
except Exception as e: except Exception as e:
self.status["exception"] = e self.status["exception"] = e

View File

@ -58,9 +58,7 @@ class Adapter:
self.index = index self.index = index
def __repr__(self) -> str: def __repr__(self) -> str:
return "Adapter(name={name}, nice_name={nice_name}, ips={ips}, index={index})".format( return f"Adapter(name={self.name!r}, nice_name={self.nice_name!r}, ips={self.ips!r}, index={self.index!r})"
name=repr(self.name), nice_name=repr(self.nice_name), ips=repr(self.ips), index=repr(self.index)
)
# Type of an IPv4 address (a string in "xxx.xxx.xxx.xxx" format) # Type of an IPv4 address (a string in "xxx.xxx.xxx.xxx" format)
@ -112,9 +110,7 @@ class IP:
return isinstance(self.ip, tuple) return isinstance(self.ip, tuple)
def __repr__(self) -> str: def __repr__(self) -> str:
return "IP(ip={ip}, network_prefix={network_prefix}, nice_name={nice_name})".format( return f"IP(ip={self.ip!r}, network_prefix={self.network_prefix!r}, nice_name={self.nice_name!r})"
ip=repr(self.ip), network_prefix=repr(self.network_prefix), nice_name=repr(self.nice_name)
)
if platform.system() == "Darwin" or "BSD" in platform.system(): if platform.system() == "Darwin" or "BSD" in platform.system():

4
RNS/vendor/six.py vendored
View File

@ -964,9 +964,7 @@ def python_2_unicode_compatible(klass):
""" """
if PY2: if PY2:
if '__str__' not in klass.__dict__: if '__str__' not in klass.__dict__:
raise ValueError("@python_2_unicode_compatible cannot be applied " raise ValueError(f"@python_2_unicode_compatible cannot be applied to {klass.__name__} because it doesn't define __str__().")
"to %s because it doesn't define __str__()." %
klass.__name__)
klass.__unicode__ = klass.__str__ klass.__unicode__ = klass.__str__
klass.__str__ = lambda self: self.__unicode__().encode('utf-8') klass.__str__ = lambda self: self.__unicode__().encode('utf-8')
return klass return klass

View File

@ -100,7 +100,7 @@ class Ext:
if not isinstance(type, int): if not isinstance(type, int):
raise TypeError("ext type is not type integer") raise TypeError("ext type is not type integer")
elif not (-2**7 <= type <= 2**7 - 1): elif not (-2**7 <= type <= 2**7 - 1):
raise ValueError(f"ext type value {type:d} is out of range (-128 to 127)") raise ValueError(f"ext type value {type} is out of range (-128 to 127)")
# Check data is type bytes or str # Check data is type bytes or str
elif sys.version_info[0] == 3 and not isinstance(data, bytes): elif sys.version_info[0] == 3 and not isinstance(data, bytes):
raise TypeError("ext data is not type \'bytes\'") raise TypeError("ext data is not type \'bytes\'")
@ -127,7 +127,7 @@ class Ext:
""" """
String representation of this Ext object. String representation of this Ext object.
""" """
s = f"Ext Object (Type: {self.type:d}, Data: " s = f"Ext Object (Type: {self.type}, Data: "
s += " ".join([f"0x{ord(self.data[i:i + 1]):02}" s += " ".join([f"0x{ord(self.data[i:i + 1]):02}"
for i in xrange(min(len(self.data), 8))]) for i in xrange(min(len(self.data), 8))])
if len(self.data) > 8: if len(self.data) > 8:
@ -177,11 +177,11 @@ def ext_serializable(ext_type):
if not isinstance(ext_type, int): if not isinstance(ext_type, int):
raise TypeError("Ext type is not type integer") raise TypeError("Ext type is not type integer")
elif not (-2**7 <= ext_type <= 2**7 - 1): elif not (-2**7 <= ext_type <= 2**7 - 1):
raise ValueError(f"Ext type value {ext_type:d} is out of range of -128 to 127") raise ValueError(f"Ext type value {ext_type} is out of range of -128 to 127")
elif ext_type in _ext_type_to_class: elif ext_type in _ext_type_to_class:
raise ValueError(f"Ext type {ext_type:d} already registered with class {repr(_ext_type_to_class[ext_type]):s}") raise ValueError(f"Ext type {ext_type} already registered with class {_ext_type_to_class[ext_type]!r}")
elif cls in _ext_class_to_type: elif cls in _ext_class_to_type:
raise ValueError(f"Class {repr(cls):s} already registered with Ext type {ext_type:d}") raise ValueError(f"Class {cls!r} already registered with Ext type {ext_type}")
_ext_type_to_class[ext_type] = cls _ext_type_to_class[ext_type] = cls
_ext_class_to_type[cls] = ext_type _ext_class_to_type[cls] = ext_type
@ -495,7 +495,7 @@ def _pack2(obj, fp, **options):
try: try:
_pack_ext(Ext(_ext_class_to_type[obj.__class__], obj.packb()), fp, options) _pack_ext(Ext(_ext_class_to_type[obj.__class__], obj.packb()), fp, options)
except AttributeError: except AttributeError:
raise NotImplementedError(f"Ext serializable class {repr(obj.__class__):s} is missing implementation of packb()") raise NotImplementedError(f"Ext serializable class {obj.__class__!r} is missing implementation of packb()")
elif isinstance(obj, bool): elif isinstance(obj, bool):
_pack_boolean(obj, fp, options) _pack_boolean(obj, fp, options)
elif isinstance(obj, (int, long)): elif isinstance(obj, (int, long)):
@ -525,7 +525,7 @@ def _pack2(obj, fp, **options):
_pack_ext(ext_handlers[t](obj), fp, options) _pack_ext(ext_handlers[t](obj), fp, options)
else: else:
raise UnsupportedTypeException( raise UnsupportedTypeException(
f"unsupported type: {str(type(obj)):s}") f"unsupported type: {type(obj)}")
elif _ext_class_to_type: elif _ext_class_to_type:
# Linear search for superclass # Linear search for superclass
t = next((t for t in _ext_class_to_type if isinstance(obj, t)), None) t = next((t for t in _ext_class_to_type if isinstance(obj, t)), None)
@ -533,11 +533,11 @@ def _pack2(obj, fp, **options):
try: try:
_pack_ext(Ext(_ext_class_to_type[t], obj.packb()), fp, options) _pack_ext(Ext(_ext_class_to_type[t], obj.packb()), fp, options)
except AttributeError: except AttributeError:
raise NotImplementedError(f"Ext serializable class {repr(t):s} is missing implementation of packb()") raise NotImplementedError(f"Ext serializable class {t!r} is missing implementation of packb()")
else: else:
raise UnsupportedTypeException(f"unsupported type: {str(type(obj)):s}") raise UnsupportedTypeException(f"unsupported type: {type(obj)}")
else: else:
raise UnsupportedTypeException(f"unsupported type: {str(type(obj)):s}") raise UnsupportedTypeException(f"unsupported type: {type(obj)}")
# Pack for Python 3, with unicode 'str' type, 'bytes' type, and no 'long' type # Pack for Python 3, with unicode 'str' type, 'bytes' type, and no 'long' type
@ -582,7 +582,7 @@ def _pack3(obj, fp, **options):
try: try:
_pack_ext(Ext(_ext_class_to_type[obj.__class__], obj.packb()), fp, options) _pack_ext(Ext(_ext_class_to_type[obj.__class__], obj.packb()), fp, options)
except AttributeError: except AttributeError:
raise NotImplementedError(f"Ext serializable class {repr(obj.__class__):s} is missing implementation of packb()") raise NotImplementedError(f"Ext serializable class {obj.__class__!r} is missing implementation of packb()")
elif isinstance(obj, bool): elif isinstance(obj, bool):
_pack_boolean(obj, fp, options) _pack_boolean(obj, fp, options)
elif isinstance(obj, int): elif isinstance(obj, int):
@ -612,7 +612,7 @@ def _pack3(obj, fp, **options):
_pack_ext(ext_handlers[t](obj), fp, options) _pack_ext(ext_handlers[t](obj), fp, options)
else: else:
raise UnsupportedTypeException( raise UnsupportedTypeException(
f"unsupported type: {str(type(obj)):s}") f"unsupported type: {type(obj)}")
elif _ext_class_to_type: elif _ext_class_to_type:
# Linear search for superclass # Linear search for superclass
t = next((t for t in _ext_class_to_type if isinstance(obj, t)), None) t = next((t for t in _ext_class_to_type if isinstance(obj, t)), None)
@ -620,12 +620,12 @@ def _pack3(obj, fp, **options):
try: try:
_pack_ext(Ext(_ext_class_to_type[t], obj.packb()), fp, options) _pack_ext(Ext(_ext_class_to_type[t], obj.packb()), fp, options)
except AttributeError: except AttributeError:
raise NotImplementedError(f"Ext serializable class {repr(t):s} is missing implementation of packb()") raise NotImplementedError(f"Ext serializable class {t!r} is missing implementation of packb()")
else: else:
raise UnsupportedTypeException(f"unsupported type: {str(type(obj)):s}") raise UnsupportedTypeException(f"unsupported type: {type(obj)}")
else: else:
raise UnsupportedTypeException( raise UnsupportedTypeException(
f"unsupported type: {str(type(obj)):s}") f"unsupported type: {type(obj)}")
def _packb2(obj, **options): def _packb2(obj, **options):
@ -842,7 +842,7 @@ def _unpack_ext(code, fp, options):
try: try:
return _ext_type_to_class[ext_type].unpackb(ext_data) return _ext_type_to_class[ext_type].unpackb(ext_data)
except AttributeError: except AttributeError:
raise NotImplementedError(f"Ext serializable class {repr(_ext_type_to_class[ext_type]):s} is missing implementation of unpackb()") raise NotImplementedError(f"Ext serializable class {_ext_type_to_class[ext_type]!r} is missing implementation of unpackb()")
# Timestamp extension # Timestamp extension
if ext_type == -1: if ext_type == -1:
@ -868,7 +868,7 @@ def _unpack_ext_timestamp(ext_data, options):
microseconds = struct.unpack(">I", ext_data[0:4])[0] // 1000 microseconds = struct.unpack(">I", ext_data[0:4])[0] // 1000
else: else:
raise UnsupportedTimestampException( raise UnsupportedTimestampException(
f"unsupported timestamp with data length {len(ext_data):d}") f"unsupported timestamp with data length {len(ext_data)}")
return _epoch + datetime.timedelta(seconds=seconds, return _epoch + datetime.timedelta(seconds=seconds,
microseconds=microseconds) microseconds=microseconds)
@ -916,10 +916,10 @@ def _unpack_map(code, fp, options):
k = _deep_list_to_tuple(k) k = _deep_list_to_tuple(k)
elif not isinstance(k, Hashable): elif not isinstance(k, Hashable):
raise UnhashableKeyException( raise UnhashableKeyException(
f"encountered unhashable key: \"{str(k):s}\" ({str(type(k)):s})") f"encountered unhashable key: \"{k}\" ({type(k)})")
elif k in d: elif k in d:
raise DuplicateKeyException( raise DuplicateKeyException(
f"encountered duplicate key: \"{str(k):s}\" ({str(type(k)):s})") f"encountered duplicate key: \"{k}\" ({type(k)})")
# Unpack value # Unpack value
v = _unpack(fp, options) v = _unpack(fp, options)
@ -928,7 +928,7 @@ def _unpack_map(code, fp, options):
d[k] = v d[k] = v
except TypeError: except TypeError:
raise UnhashableKeyException( raise UnhashableKeyException(
f"encountered unhashable key: \"{str(k):s}\"") f"encountered unhashable key: \"{k}\"")
return d return d