Compare commits

..

3 Commits

Author SHA1 Message Date
Pavol Rusnak
c6cc2616e8
Merge 011448c22d into 527f6cc906 2024-10-08 20:04:00 +00:00
Pavol Rusnak
011448c22d
remove dangling spaces 2024-10-08 22:03:30 +02:00
Pavol Rusnak
2a5a439921
modernize 2024-10-08 22:03:09 +02:00
25 changed files with 76 additions and 143 deletions

View File

@ -93,9 +93,7 @@ def announceLoop(destination_1, destination_2):
# Send the announce including the app data
destination_1.announce(app_data=fruit.encode("utf-8"))
RNS.log(
"Sent announce from "+
RNS.prettyhexrep(destination_1.hash)+
" ("+destination_1.name+")"
f"Sent announce from {RNS.prettyhexrep(destination_1.hash)} ({destination_1.name})"
)
# Randomly select a noble gas
@ -104,9 +102,7 @@ def announceLoop(destination_1, destination_2):
# Send the announce including the app data
destination_2.announce(app_data=noble_gas.encode("utf-8"))
RNS.log(
"Sent announce from "+
RNS.prettyhexrep(destination_2.hash)+
" ("+destination_2.name+")"
f"Sent announce from {RNS.prettyhexrep(destination_2.hash)} ({destination_2.name})"
)
# We will need to define an announce handler class that
@ -131,8 +127,7 @@ class ExampleAnnounceHandler:
if app_data:
RNS.log(
"The announce contained the following app data: "+
app_data.decode("utf-8")
f"The announce contained the following app data: {app_data.decode('utf-8')}"
)
##########################################################

View File

@ -54,9 +54,7 @@ def packet_callback(data, packet):
def broadcastLoop(destination):
# Let the user know that everything is ready
RNS.log(
"Broadcast example "+
RNS.prettyhexrep(destination.hash)+
" running, enter text and hit enter to broadcast (Ctrl-C to quit)"
f"Broadcast example {RNS.prettyhexrep(destination.hash)} running, enter text and hit enter to broadcast (Ctrl-C to quit)"
)
# We enter a loop that runs until the users exits.

View File

@ -61,9 +61,7 @@ def server(configpath):
def server_loop(destination):
# Let the user know that everything is ready
RNS.log(
"Link buffer example "+
RNS.prettyhexrep(destination.hash)+
" running, waiting for a connection."
f"Link buffer example {RNS.prettyhexrep(destination.hash)} running, waiting for a connection."
)
RNS.log("Hit enter to manually send an announce (Ctrl-C to quit)")

View File

@ -124,9 +124,7 @@ def server(configpath):
def server_loop(destination):
# Let the user know that everything is ready
RNS.log(
"Link example "+
RNS.prettyhexrep(destination.hash)+
" running, waiting for a connection."
f"Link example {RNS.prettyhexrep(destination.hash)} running, waiting for a connection."
)
RNS.log("Hit enter to manually send an announce (Ctrl-C to quit)")
@ -280,14 +278,11 @@ def client_loop():
channel.send(message)
else:
RNS.log(
"Cannot send this packet, the data size of "+
str(packed_size)+" bytes exceeds the link packet MDU of "+
str(channel.MDU)+" bytes",
f"Cannot send this packet, the data size of {packed_size} bytes exceeds the link packet MDU of {channel.MDU} bytes",
RNS.LOG_ERROR
)
else:
RNS.log("Channel is not ready to send, please wait for " +
"pending messages to complete.", RNS.LOG_ERROR)
RNS.log(f"Channel is not ready to send, please wait for pending messages to complete.", RNS.LOG_ERROR)
except Exception as e:
RNS.log(f"Error while sending data over the link: {e}")

View File

@ -64,9 +64,7 @@ def server(configpath):
def announceLoop(destination):
# Let the user know that everything is ready
RNS.log(
"Echo server "+
RNS.prettyhexrep(destination.hash)+
" running, hit enter to manually send an announce (Ctrl-C to quit)"
f"Echo server {RNS.prettyhexrep(destination.hash)} running, hit enter to manually send an announce (Ctrl-C to quit)"
)
# We enter a loop that runs until the users exits.
@ -142,9 +140,7 @@ def client(destination_hexhash, configpath, timeout=None):
# Tell the user that the client is ready!
RNS.log(
"Echo client ready, hit enter to send echo request to "+
destination_hexhash+
" (Ctrl-C to quit)"
f"Echo client ready, hit enter to send echo request to {destination_hexhash} (Ctrl-C to quit)"
)
# We enter a loop that runs until the user exits.
@ -247,10 +243,7 @@ def packet_delivered(receipt):
reception_stats += f" [SNR {receipt.proof_packet.snr} dB]"
RNS.log(
"Valid reply received from "+
RNS.prettyhexrep(receipt.destination.hash)+
", round-trip time is "+rttstring+
reception_stats
f"Valid reply received from {RNS.prettyhexrep(receipt.destination.hash)}, round-trip time is {rttstring}{reception_stats}"
)
# This function is called if a packet times out.

View File

@ -53,9 +53,7 @@ def server(configpath):
def server_loop(destination):
# Let the user know that everything is ready
RNS.log(
"Link identification example "+
RNS.prettyhexrep(destination.hash)+
" running, waiting for a connection."
f"Link identification example {RNS.prettyhexrep(destination.hash)} running, waiting for a connection."
)
RNS.log("Hit enter to manually send an announce (Ctrl-C to quit)")
@ -209,9 +207,7 @@ def client_loop():
RNS.Packet(server_link, data).send()
else:
RNS.log(
"Cannot send this packet, the data size of "+
str(len(data))+" bytes exceeds the link packet MDU of "+
str(RNS.Link.MDU)+" bytes",
f"Cannot send this packet, the data size of {len(data)} bytes exceeds the link packet MDU of {RNS.Link.MDU} bytes",
RNS.LOG_ERROR
)

View File

@ -53,9 +53,7 @@ def server(configpath):
def server_loop(destination):
# Let the user know that everything is ready
RNS.log(
"Link example "+
RNS.prettyhexrep(destination.hash)+
" running, waiting for a connection."
f"Link example {RNS.prettyhexrep(destination.hash)} running, waiting for a connection."
)
RNS.log("Hit enter to manually send an announce (Ctrl-C to quit)")
@ -189,9 +187,7 @@ def client_loop():
RNS.Packet(server_link, data).send()
else:
RNS.log(
"Cannot send this packet, the data size of "+
str(len(data))+" bytes exceeds the link packet MDU of "+
str(RNS.Link.MDU)+" bytes",
f"Cannot send this packet, the data size of {len(data)} bytes exceeds the link packet MDU of {RNS.Link.MDU} bytes",
RNS.LOG_ERROR
)

View File

@ -51,9 +51,7 @@ def program_setup(configpath):
def announceLoop(destination):
# Let the user know that everything is ready
RNS.log(
"Minimal example "+
RNS.prettyhexrep(destination.hash)+
" running, hit enter to manually send an announce (Ctrl-C to quit)"
f"Minimal example {RNS.prettyhexrep(destination.hash)} running, hit enter to manually send an announce (Ctrl-C to quit)"
)
# We enter a loop that runs until the users exits.

View File

@ -75,9 +75,7 @@ def server(configpath):
def announceLoop(destination):
# Let the user know that everything is ready
RNS.log(
"Ratcheted echo server "+
RNS.prettyhexrep(destination.hash)+
" running, hit enter to manually send an announce (Ctrl-C to quit)"
f"Ratcheted echo server {RNS.prettyhexrep(destination.hash)} running, hit enter to manually send an announce (Ctrl-C to quit)"
)
# We enter a loop that runs until the users exits.
@ -153,9 +151,7 @@ def client(destination_hexhash, configpath, timeout=None):
# Tell the user that the client is ready!
RNS.log(
"Echo client ready, hit enter to send echo request to "+
destination_hexhash+
" (Ctrl-C to quit)"
f"Echo client ready, hit enter to send echo request to {destination_hexhash} (Ctrl-C to quit)"
)
# We enter a loop that runs until the user exits.
@ -259,10 +255,7 @@ def packet_delivered(receipt):
reception_stats += f" [SNR {receipt.proof_packet.snr} dB]"
RNS.log(
"Valid reply received from "+
RNS.prettyhexrep(receipt.destination.hash)+
", round-trip time is "+rttstring+
reception_stats
f"Valid reply received from {RNS.prettyhexrep(receipt.destination.hash)}, round-trip time is {rttstring}{reception_stats}"
)
# This function is called if a packet times out.

View File

@ -67,9 +67,7 @@ def server(configpath):
def server_loop(destination):
# Let the user know that everything is ready
RNS.log(
"Request example "+
RNS.prettyhexrep(destination.hash)+
" running, waiting for a connection."
f"Request example {RNS.prettyhexrep(destination.hash)} running, waiting for a connection."
)
RNS.log("Hit enter to manually send an announce (Ctrl-C to quit)")

View File

@ -62,9 +62,7 @@ def server(configpath):
def server_loop(destination):
# Let the user know that everything is ready
RNS.log(
"Speedtest "+
RNS.prettyhexrep(destination.hash)+
" running, waiting for a connection."
f"Speedtest {RNS.prettyhexrep(destination.hash)} running, waiting for a connection."
)
RNS.log("Hit enter to manually send an announce (Ctrl-C to quit)")

View File

@ -60,13 +60,11 @@ class HMAC:
if hasattr(self._inner, 'block_size'):
blocksize = self._inner.block_size
if blocksize < 16:
_warnings.warn('block_size of %d seems too small; using our '
'default of %d.' % (blocksize, self.blocksize),
_warnings.warn(f'block_size of {int(blocksize)} seems too small; using our default of {int(self.blocksize)}.',
RuntimeWarning, 2)
blocksize = self.blocksize
else:
_warnings.warn('No block_size attribute on given digest object; '
'Assuming %d.' % (self.blocksize),
_warnings.warn(f'No block_size attribute on given digest object; Assuming {int(self.blocksize)}.',
RuntimeWarning, 2)
blocksize = self.blocksize

View File

@ -1256,7 +1256,7 @@ class RNodeInterface(Interface):
RNS.log(f"{self} hardware error (code {RNS.hexrep(byte)}): Memory exhausted", RNS.LOG_ERROR)
self.hw_errors.append({"error": KISS.ERROR_MEMORY_LOW, "description": "Memory exhausted on connected device"})
elif (byte == KISS.ERROR_MODEM_TIMEOUT):
RNS.log(str(self)+" hardware error (code "+RNS.hexrep(byte)+"): Modem communication timed out", RNS.LOG_ERROR)
RNS.log(f"{self} hardware error (code {RNS.hexrep(byte)}): Modem communication timed out", RNS.LOG_ERROR)
self.hw_errors.append({"error": KISS.ERROR_MODEM_TIMEOUT, "description": "Modem communication timed out on connected device"})
else:
RNS.log(f"{self} hardware error (code {RNS.hexrep(byte)})", RNS.LOG_ERROR)

View File

@ -169,7 +169,7 @@ class AutoInterface(Interface):
self.group_hash = RNS.Identity.full_hash(self.group_id)
g = self.group_hash
#gt = "{:02x}".format(g[1]+(g[0]<<8))
#gt = f"{g[1] + (g[0] << 8):02x}"
gt = "0"
gt += f":{g[3] + (g[2] << 8):02x}"
gt += f":{g[5] + (g[4] << 8):02x}"

View File

@ -852,7 +852,7 @@ class RNodeInterface(Interface):
RNS.log(f"{self} hardware error (code {RNS.hexrep(byte)}): Memory exhausted", RNS.LOG_ERROR)
self.hw_errors.append({"error": KISS.ERROR_MEMORY_LOW, "description": "Memory exhausted on connected device"})
elif (byte == KISS.ERROR_MODEM_TIMEOUT):
RNS.log(str(self)+" hardware error (code "+RNS.hexrep(byte)+"): Modem communication timed out", RNS.LOG_ERROR)
RNS.log(f"{self} hardware error (code {RNS.hexrep(byte)}): Modem communication timed out", RNS.LOG_ERROR)
self.hw_errors.append({"error": KISS.ERROR_MODEM_TIMEOUT, "description": "Modem communication timed out on connected device"})
else:
RNS.log(f"{self} hardware error (code {RNS.hexrep(byte)})", RNS.LOG_ERROR)

View File

@ -492,7 +492,7 @@ class RNode():
command_buffer = command_buffer+bytes([byte])
if (len(command_buffer) == 4):
self.r_bt_pin = command_buffer[0] << 24 | command_buffer[1] << 16 | command_buffer[2] << 8 | command_buffer[3]
RNS.log("Bluetooth pairing PIN is: {:06d}".format(self.r_bt_pin))
RNS.log(f"Bluetooth pairing PIN is: {self.r_bt_pin:06d}")
elif (command == KISS.CMD_DEV_HASH):
if (byte == KISS.FESC):

View File

@ -178,11 +178,7 @@ def program_setup(configdir, destination_hexhash, size=None, full_name = None, v
reception_stats += f" [SNR {receipt.proof_packet.snr} dB]"
print(
"Valid reply from "+
RNS.prettyhexrep(receipt.destination.hash)+
"\nRound-trip time is "+rttstring+
" over "+str(hops)+" hop"+ms+
reception_stats+"\n"
f"Valid reply from {RNS.prettyhexrep(receipt.destination.hash)}\nRound-trip time is {rttstring} over {hops} hop{ms}{reception_stats}\n"
)
else:

View File

@ -42,12 +42,12 @@ def size_str(num, suffix='B'):
for unit in units:
if abs(num) < 1000.0:
if unit == "":
return "{:.0f} {}{}".format(num, unit, suffix)
return f"{num:.0f} {unit}{suffix}"
else:
return "{:.2f} {}{}".format(num, unit, suffix)
return f"{num:.2f} {unit}{suffix}"
num /= 1000.0
return "{:.2f}{}{}".format(num, last_unit, suffix)
return f"{num:.2f}{last_unit}{suffix}"
request_result = None
request_concluded = False
@ -144,7 +144,7 @@ def program_setup(configdir, dispall=False, verbosity=0, name_filter=None, json=
try:
dest_len = (RNS.Reticulum.TRUNCATED_HASHLENGTH//8)*2
if len(remote) != dest_len:
raise ValueError("Destination length is invalid, must be {hex} hexadecimal characters ({byte} bytes).".format(hex=dest_len, byte=dest_len//2))
raise ValueError(f"Destination length is invalid, must be {dest_len} hexadecimal characters ({dest_len // 2} bytes).")
try:
identity_hash = bytes.fromhex(remote)
destination_hash = RNS.Destination.hash_from_name_and_identity("rnstransport.remote.management", identity_hash)
@ -281,13 +281,13 @@ def program_setup(configdir, dispall=False, verbosity=0, name_filter=None, json=
if "ifac_netname" in ifstat and ifstat["ifac_netname"] != None:
print(" Network : {nn}".format(nn=ifstat["ifac_netname"]))
print(" Status : {ss}".format(ss=ss))
print(f" Status : {ss}")
if clients != None and clients_string != "":
print(" "+clients_string)
if not (name.startswith("Shared Instance[") or name.startswith("TCPInterface[Client") or name.startswith("LocalInterface[")):
print(" Mode : {mode}".format(mode=modestr))
print(f" Mode : {modestr}")
if "bitrate" in ifstat and ifstat["bitrate"] != None:
print(" Rate : {ss}".format(ss=speed_str(ifstat["bitrate"])))
@ -295,7 +295,7 @@ def program_setup(configdir, dispall=False, verbosity=0, name_filter=None, json=
if "battery_percent" in ifstat and ifstat["battery_percent"] != None:
try:
bpi = int(ifstat["battery_percent"])
print(" Battery : {bp}%".format(bp=bpi))
print(f" Battery : {bpi}%")
except:
pass
@ -488,10 +488,10 @@ def speed_str(num, suffix='bps'):
for unit in units:
if abs(num) < 1000.0:
return "{:3.2f} {}{}".format(num, unit, suffix)
return f"{num:3.2f} {unit}{suffix}"
num /= 1000.0
return "{:.2f} {}{}".format(num, last_unit, suffix)
return f"{num:.2f} {last_unit}{suffix}"
if __name__ == "__main__":
main()

View File

@ -159,12 +159,12 @@ def hexrep(data, delimit=True):
delimiter = ":"
if not delimit:
delimiter = ""
hexrep = delimiter.join("{:02x}".format(c) for c in data)
hexrep = delimiter.join(f"{c:02x}" for c in data)
return hexrep
def prettyhexrep(data):
delimiter = ""
hexrep = "<"+delimiter.join("{:02x}".format(c) for c in data)+">"
hexrep = "<"+delimiter.join(f"{c:02x}" for c in data)+">"
return hexrep
def prettyspeed(num, suffix="b"):
@ -182,12 +182,12 @@ def prettysize(num, suffix='B'):
for unit in units:
if abs(num) < 1000.0:
if unit == "":
return "{:.0f} {}{}".format(num, unit, suffix)
return f"{num:.0f} {unit}{suffix}"
else:
return "{:.2f} {}{}".format(num, unit, suffix)
return f"{num:.2f} {unit}{suffix}"
num /= 1000.0
return "{:.2f}{}{}".format(num, last_unit, suffix)
return f"{num:.2f}{last_unit}{suffix}"
def prettyfrequency(hz, suffix="Hz"):
num = hz*1e6
@ -196,10 +196,10 @@ def prettyfrequency(hz, suffix="Hz"):
for unit in units:
if abs(num) < 1000.0:
return "{:.2f} {}{}".format(num, unit, suffix)
return f"{num:.2f} {unit}{suffix}"
num /= 1000.0
return "{:.2f}{}{}".format(num, last_unit, suffix)
return f"{num:.2f}{last_unit}{suffix}"
def prettydistance(m, suffix="m"):
num = m*1e6
@ -212,10 +212,10 @@ def prettydistance(m, suffix="m"):
if unit == "c": divisor = 100
if abs(num) < divisor:
return "{:.2f} {}{}".format(num, unit, suffix)
return f"{num:.2f} {unit}{suffix}"
num /= divisor
return "{:.2f} {}{}".format(num, last_unit, suffix)
return f"{num:.2f} {last_unit}{suffix}"
def prettytime(time, verbose=False, compact=False):
days = int(time // (24 * 3600))

View File

@ -745,7 +745,7 @@ class Section(dict):
return self[key]
except MissingInterpolationOption:
return dict.__getitem__(self, key)
return '{%s}' % ', '.join([('{}: {}'.format(repr(key), repr(_getval(key))))
return '{%s}' % ', '.join([f'{key!r}: {_getval(key)!r}'
for key in (self.scalars + self.sections)])
__str__ = __repr__
@ -1363,9 +1363,7 @@ class ConfigObj(Section):
return self[key]
except MissingInterpolationOption:
return dict.__getitem__(self, key)
return ('ConfigObj({%s})' %
', '.join([('{}: {}'.format(repr(key), repr(_getval(key))))
for key in (self.scalars + self.sections)]))
return ('ConfigObj({%s})' % ', '.join([f'{key!r}: {_getval(key)!r}' for key in (self.scalars + self.sections)]))
def _handle_bom(self, infile):
@ -1986,20 +1984,12 @@ class ConfigObj(Section):
val = self._decode_element(self._quote(this_entry))
else:
val = repr(this_entry)
return '{}{}{}{}{}'.format(indent_string,
self._decode_element(self._quote(entry, multiline=False)),
self._a_to_u(' = '),
val,
self._decode_element(comment))
return f"{indent_string}{self._decode_element(self._quote(entry, multiline=False))}{self._a_to_u(' = ')}{val}{self._decode_element(comment)}"
def _write_marker(self, indent_string, depth, entry, comment):
"""Write a section marker line"""
return '{}{}{}{}{}'.format(indent_string,
self._a_to_u('[' * depth),
self._quote(self._decode_element(entry), multiline=False),
self._a_to_u(']' * depth),
self._decode_element(comment))
return f"{indent_string}{self._a_to_u('[' * depth)}{self._quote(self._decode_element(entry), multiline=False)}{self._a_to_u(']' * depth)}{self._decode_element(comment)}"
def _handle_comment(self, comment):

View File

@ -54,13 +54,11 @@ def hello(min_version, max_version):
return f"HELLO VERSION MIN={min_version} MAX={max_version}\n".encode()
def session_create(style, session_id, destination, options=""):
return "SESSION CREATE STYLE={} ID={} DESTINATION={} {}\n".format(
style, session_id, destination, options).encode()
return f"SESSION CREATE STYLE={style} ID={session_id} DESTINATION={destination} {options}\n".encode()
def stream_connect(session_id, destination, silent="false"):
return "STREAM CONNECT ID={} DESTINATION={} SILENT={}\n".format(
session_id, destination, silent).encode()
return f"STREAM CONNECT ID={session_id} DESTINATION={destination} SILENT={silent}\n".encode()
def stream_accept(session_id, silent="false"):
return f"STREAM ACCEPT ID={session_id} SILENT={silent}\n".encode()

View File

@ -141,8 +141,7 @@ class ServerTunnel(I2PTunnel):
# data and dest may come in one chunk
dest, data = incoming.split(b"\n", 1)
remote_destination = sam.Destination(dest.decode())
logger.debug("{} client connected: {}.b32.i2p".format(
self.session_name, remote_destination.base32))
logger.debug(f"{self.session_name} client connected: {remote_destination.base32}.b32.i2p")
except Exception as e:
self.status["exception"] = e

View File

@ -58,9 +58,7 @@ class Adapter:
self.index = index
def __repr__(self) -> str:
return "Adapter(name={name}, nice_name={nice_name}, ips={ips}, index={index})".format(
name=repr(self.name), nice_name=repr(self.nice_name), ips=repr(self.ips), index=repr(self.index)
)
return f"Adapter(name={self.name!r}, nice_name={self.nice_name!r}, ips={self.ips!r}, index={self.index!r})"
# Type of an IPv4 address (a string in "xxx.xxx.xxx.xxx" format)
@ -112,9 +110,7 @@ class IP:
return isinstance(self.ip, tuple)
def __repr__(self) -> str:
return "IP(ip={ip}, network_prefix={network_prefix}, nice_name={nice_name})".format(
ip=repr(self.ip), network_prefix=repr(self.network_prefix), nice_name=repr(self.nice_name)
)
return f"IP(ip={self.ip!r}, network_prefix={self.network_prefix!r}, nice_name={self.nice_name!r})"
if platform.system() == "Darwin" or "BSD" in platform.system():

4
RNS/vendor/six.py vendored
View File

@ -964,9 +964,7 @@ def python_2_unicode_compatible(klass):
"""
if PY2:
if '__str__' not in klass.__dict__:
raise ValueError("@python_2_unicode_compatible cannot be applied "
"to %s because it doesn't define __str__()." %
klass.__name__)
raise ValueError(f"@python_2_unicode_compatible cannot be applied to {klass.__name__} because it doesn't define __str__().")
klass.__unicode__ = klass.__str__
klass.__str__ = lambda self: self.__unicode__().encode('utf-8')
return klass

View File

@ -100,7 +100,7 @@ class Ext:
if not isinstance(type, int):
raise TypeError("ext type is not type integer")
elif not (-2**7 <= type <= 2**7 - 1):
raise ValueError(f"ext type value {type:d} is out of range (-128 to 127)")
raise ValueError(f"ext type value {type} is out of range (-128 to 127)")
# Check data is type bytes or str
elif sys.version_info[0] == 3 and not isinstance(data, bytes):
raise TypeError("ext data is not type \'bytes\'")
@ -127,7 +127,7 @@ class Ext:
"""
String representation of this Ext object.
"""
s = f"Ext Object (Type: {self.type:d}, Data: "
s = f"Ext Object (Type: {self.type}, Data: "
s += " ".join([f"0x{ord(self.data[i:i + 1]):02}"
for i in xrange(min(len(self.data), 8))])
if len(self.data) > 8:
@ -177,11 +177,11 @@ def ext_serializable(ext_type):
if not isinstance(ext_type, int):
raise TypeError("Ext type is not type integer")
elif not (-2**7 <= ext_type <= 2**7 - 1):
raise ValueError(f"Ext type value {ext_type:d} is out of range of -128 to 127")
raise ValueError(f"Ext type value {ext_type} is out of range of -128 to 127")
elif ext_type in _ext_type_to_class:
raise ValueError(f"Ext type {ext_type:d} already registered with class {repr(_ext_type_to_class[ext_type]):s}")
raise ValueError(f"Ext type {ext_type} already registered with class {_ext_type_to_class[ext_type]!r}")
elif cls in _ext_class_to_type:
raise ValueError(f"Class {repr(cls):s} already registered with Ext type {ext_type:d}")
raise ValueError(f"Class {cls!r} already registered with Ext type {ext_type}")
_ext_type_to_class[ext_type] = cls
_ext_class_to_type[cls] = ext_type
@ -495,7 +495,7 @@ def _pack2(obj, fp, **options):
try:
_pack_ext(Ext(_ext_class_to_type[obj.__class__], obj.packb()), fp, options)
except AttributeError:
raise NotImplementedError(f"Ext serializable class {repr(obj.__class__):s} is missing implementation of packb()")
raise NotImplementedError(f"Ext serializable class {obj.__class__!r} is missing implementation of packb()")
elif isinstance(obj, bool):
_pack_boolean(obj, fp, options)
elif isinstance(obj, (int, long)):
@ -525,7 +525,7 @@ def _pack2(obj, fp, **options):
_pack_ext(ext_handlers[t](obj), fp, options)
else:
raise UnsupportedTypeException(
f"unsupported type: {str(type(obj)):s}")
f"unsupported type: {type(obj)}")
elif _ext_class_to_type:
# Linear search for superclass
t = next((t for t in _ext_class_to_type if isinstance(obj, t)), None)
@ -533,11 +533,11 @@ def _pack2(obj, fp, **options):
try:
_pack_ext(Ext(_ext_class_to_type[t], obj.packb()), fp, options)
except AttributeError:
raise NotImplementedError(f"Ext serializable class {repr(t):s} is missing implementation of packb()")
raise NotImplementedError(f"Ext serializable class {t!r} is missing implementation of packb()")
else:
raise UnsupportedTypeException(f"unsupported type: {str(type(obj)):s}")
raise UnsupportedTypeException(f"unsupported type: {type(obj)}")
else:
raise UnsupportedTypeException(f"unsupported type: {str(type(obj)):s}")
raise UnsupportedTypeException(f"unsupported type: {type(obj)}")
# Pack for Python 3, with unicode 'str' type, 'bytes' type, and no 'long' type
@ -582,7 +582,7 @@ def _pack3(obj, fp, **options):
try:
_pack_ext(Ext(_ext_class_to_type[obj.__class__], obj.packb()), fp, options)
except AttributeError:
raise NotImplementedError(f"Ext serializable class {repr(obj.__class__):s} is missing implementation of packb()")
raise NotImplementedError(f"Ext serializable class {obj.__class__!r} is missing implementation of packb()")
elif isinstance(obj, bool):
_pack_boolean(obj, fp, options)
elif isinstance(obj, int):
@ -612,7 +612,7 @@ def _pack3(obj, fp, **options):
_pack_ext(ext_handlers[t](obj), fp, options)
else:
raise UnsupportedTypeException(
f"unsupported type: {str(type(obj)):s}")
f"unsupported type: {type(obj)}")
elif _ext_class_to_type:
# Linear search for superclass
t = next((t for t in _ext_class_to_type if isinstance(obj, t)), None)
@ -620,12 +620,12 @@ def _pack3(obj, fp, **options):
try:
_pack_ext(Ext(_ext_class_to_type[t], obj.packb()), fp, options)
except AttributeError:
raise NotImplementedError(f"Ext serializable class {repr(t):s} is missing implementation of packb()")
raise NotImplementedError(f"Ext serializable class {t!r} is missing implementation of packb()")
else:
raise UnsupportedTypeException(f"unsupported type: {str(type(obj)):s}")
raise UnsupportedTypeException(f"unsupported type: {type(obj)}")
else:
raise UnsupportedTypeException(
f"unsupported type: {str(type(obj)):s}")
f"unsupported type: {type(obj)}")
def _packb2(obj, **options):
@ -842,7 +842,7 @@ def _unpack_ext(code, fp, options):
try:
return _ext_type_to_class[ext_type].unpackb(ext_data)
except AttributeError:
raise NotImplementedError(f"Ext serializable class {repr(_ext_type_to_class[ext_type]):s} is missing implementation of unpackb()")
raise NotImplementedError(f"Ext serializable class {_ext_type_to_class[ext_type]!r} is missing implementation of unpackb()")
# Timestamp extension
if ext_type == -1:
@ -868,7 +868,7 @@ def _unpack_ext_timestamp(ext_data, options):
microseconds = struct.unpack(">I", ext_data[0:4])[0] // 1000
else:
raise UnsupportedTimestampException(
f"unsupported timestamp with data length {len(ext_data):d}")
f"unsupported timestamp with data length {len(ext_data)}")
return _epoch + datetime.timedelta(seconds=seconds,
microseconds=microseconds)
@ -916,10 +916,10 @@ def _unpack_map(code, fp, options):
k = _deep_list_to_tuple(k)
elif not isinstance(k, Hashable):
raise UnhashableKeyException(
f"encountered unhashable key: \"{str(k):s}\" ({str(type(k)):s})")
f"encountered unhashable key: \"{k}\" ({type(k)})")
elif k in d:
raise DuplicateKeyException(
f"encountered duplicate key: \"{str(k):s}\" ({str(type(k)):s})")
f"encountered duplicate key: \"{k}\" ({type(k)})")
# Unpack value
v = _unpack(fp, options)
@ -928,7 +928,7 @@ def _unpack_map(code, fp, options):
d[k] = v
except TypeError:
raise UnhashableKeyException(
f"encountered unhashable key: \"{str(k):s}\"")
f"encountered unhashable key: \"{k}\"")
return d