Compare commits

...

8 Commits

Author SHA1 Message Date
Mark Qvist
0b66649158 Avoid nRF52 hard reset after EEPROM wipe 2024-05-18 00:18:54 +02:00
markqvist
e28dd6e14a
Merge pull request #502 from jacobeva/master
Extend RAK4631 support
2024-05-18 00:15:48 +02:00
markqvist
0a15b4c6c1
Merge branch 'master' into master 2024-05-18 00:15:13 +02:00
markqvist
62db09571d
Merge pull request #504 from jacobeva/hash-feature
Add ability to get target and calculated firmware hash from device
2024-05-18 00:04:24 +02:00
Mark Qvist
444ae0206b Added better handling on Windows of interfaces that are non-adoptable for AutoInterface 2024-05-17 23:54:48 +02:00
Mark Qvist
4b07e30b9d Updated version 2024-05-17 23:54:04 +02:00
jacob.eva
746a38f818
Add ability to get target and calculated firmware hash from device 2024-05-13 22:55:49 +01:00
jacob.eva
c230eceaa6
Extend RAK4631 support 2024-05-13 21:49:57 +01:00
4 changed files with 334 additions and 130 deletions

View File

@ -181,78 +181,90 @@ class AutoInterface(Interface):
suitable_interfaces = 0
for ifname in self.list_interfaces():
if RNS.vendor.platformutils.is_darwin() and ifname in AutoInterface.DARWIN_IGNORE_IFS and not ifname in self.allowed_interfaces:
RNS.log(str(self)+" skipping Darwin AWDL or tethering interface "+str(ifname), RNS.LOG_EXTREME)
elif RNS.vendor.platformutils.is_darwin() and ifname == "lo0":
RNS.log(str(self)+" skipping Darwin loopback interface "+str(ifname), RNS.LOG_EXTREME)
elif RNS.vendor.platformutils.is_android() and ifname in AutoInterface.ANDROID_IGNORE_IFS and not ifname in self.allowed_interfaces:
RNS.log(str(self)+" skipping Android system interface "+str(ifname), RNS.LOG_EXTREME)
elif ifname in self.ignored_interfaces:
RNS.log(str(self)+" ignoring disallowed interface "+str(ifname), RNS.LOG_EXTREME)
elif ifname in AutoInterface.ALL_IGNORE_IFS:
RNS.log(str(self)+" skipping interface "+str(ifname), RNS.LOG_EXTREME)
else:
if len(self.allowed_interfaces) > 0 and not ifname in self.allowed_interfaces:
RNS.log(str(self)+" ignoring interface "+str(ifname)+" since it was not allowed", RNS.LOG_EXTREME)
try:
if RNS.vendor.platformutils.is_darwin() and ifname in AutoInterface.DARWIN_IGNORE_IFS and not ifname in self.allowed_interfaces:
RNS.log(str(self)+" skipping Darwin AWDL or tethering interface "+str(ifname), RNS.LOG_EXTREME)
elif RNS.vendor.platformutils.is_darwin() and ifname == "lo0":
RNS.log(str(self)+" skipping Darwin loopback interface "+str(ifname), RNS.LOG_EXTREME)
elif RNS.vendor.platformutils.is_android() and ifname in AutoInterface.ANDROID_IGNORE_IFS and not ifname in self.allowed_interfaces:
RNS.log(str(self)+" skipping Android system interface "+str(ifname), RNS.LOG_EXTREME)
elif ifname in self.ignored_interfaces:
RNS.log(str(self)+" ignoring disallowed interface "+str(ifname), RNS.LOG_EXTREME)
elif ifname in AutoInterface.ALL_IGNORE_IFS:
RNS.log(str(self)+" skipping interface "+str(ifname), RNS.LOG_EXTREME)
else:
addresses = self.list_addresses(ifname)
if self.netinfo.AF_INET6 in addresses:
link_local_addr = None
for address in addresses[self.netinfo.AF_INET6]:
if "addr" in address:
if address["addr"].startswith("fe80:"):
link_local_addr = self.descope_linklocal(address["addr"])
self.link_local_addresses.append(link_local_addr)
self.adopted_interfaces[ifname] = link_local_addr
self.multicast_echoes[ifname] = time.time()
RNS.log(str(self)+" Selecting link-local address "+str(link_local_addr)+" for interface "+str(ifname), RNS.LOG_EXTREME)
if link_local_addr == None:
RNS.log(str(self)+" No link-local IPv6 address configured for "+str(ifname)+", skipping interface", RNS.LOG_EXTREME)
else:
mcast_addr = self.mcast_discovery_address
RNS.log(str(self)+" Creating multicast discovery listener on "+str(ifname)+" with address "+str(mcast_addr), RNS.LOG_EXTREME)
# Struct with interface index
if_struct = struct.pack("I", self.interface_name_to_index(ifname))
# Set up multicast socket
discovery_socket = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
discovery_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
if hasattr(socket, "SO_REUSEPORT"):
discovery_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
discovery_socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_IF, if_struct)
# Join multicast group
mcast_group = socket.inet_pton(socket.AF_INET6, mcast_addr) + if_struct
discovery_socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, mcast_group)
# Bind socket
if RNS.vendor.platformutils.is_windows():
# window throws "[WinError 10049] The requested address is not valid in its context"
# when trying to use the multicast address as host, or when providing interface index
# passing an empty host appears to work, but probably not exactly how we want it to...
discovery_socket.bind(('', self.discovery_port))
if len(self.allowed_interfaces) > 0 and not ifname in self.allowed_interfaces:
RNS.log(str(self)+" ignoring interface "+str(ifname)+" since it was not allowed", RNS.LOG_EXTREME)
else:
addresses = self.list_addresses(ifname)
if self.netinfo.AF_INET6 in addresses:
link_local_addr = None
for address in addresses[self.netinfo.AF_INET6]:
if "addr" in address:
if address["addr"].startswith("fe80:"):
link_local_addr = self.descope_linklocal(address["addr"])
self.link_local_addresses.append(link_local_addr)
self.adopted_interfaces[ifname] = link_local_addr
self.multicast_echoes[ifname] = time.time()
nice_name = self.netinfo.interface_name_to_nice_name(ifname)
if nice_name != None and nice_name != ifname:
RNS.log(f"{self} Selecting link-local address {link_local_addr} for interface {nice_name} / {ifname}", RNS.LOG_EXTREME)
else:
RNS.log(f"{self} Selecting link-local address {link_local_addr} for interface {ifname}", RNS.LOG_EXTREME)
if link_local_addr == None:
RNS.log(str(self)+" No link-local IPv6 address configured for "+str(ifname)+", skipping interface", RNS.LOG_EXTREME)
else:
mcast_addr = self.mcast_discovery_address
RNS.log(str(self)+" Creating multicast discovery listener on "+str(ifname)+" with address "+str(mcast_addr), RNS.LOG_EXTREME)
# Struct with interface index
if_struct = struct.pack("I", self.interface_name_to_index(ifname))
# Set up multicast socket
discovery_socket = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
discovery_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
if hasattr(socket, "SO_REUSEPORT"):
discovery_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
discovery_socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_IF, if_struct)
# Join multicast group
mcast_group = socket.inet_pton(socket.AF_INET6, mcast_addr) + if_struct
discovery_socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, mcast_group)
# Bind socket
if RNS.vendor.platformutils.is_windows():
# window throws "[WinError 10049] The requested address is not valid in its context"
# when trying to use the multicast address as host, or when providing interface index
# passing an empty host appears to work, but probably not exactly how we want it to...
discovery_socket.bind(('', self.discovery_port))
if self.discovery_scope == AutoInterface.SCOPE_LINK:
addr_info = socket.getaddrinfo(mcast_addr+"%"+ifname, self.discovery_port, socket.AF_INET6, socket.SOCK_DGRAM)
else:
addr_info = socket.getaddrinfo(mcast_addr, self.discovery_port, socket.AF_INET6, socket.SOCK_DGRAM)
discovery_socket.bind(addr_info[0][4])
if self.discovery_scope == AutoInterface.SCOPE_LINK:
addr_info = socket.getaddrinfo(mcast_addr+"%"+ifname, self.discovery_port, socket.AF_INET6, socket.SOCK_DGRAM)
else:
addr_info = socket.getaddrinfo(mcast_addr, self.discovery_port, socket.AF_INET6, socket.SOCK_DGRAM)
# Set up thread for discovery packets
def discovery_loop():
self.discovery_handler(discovery_socket, ifname)
discovery_socket.bind(addr_info[0][4])
thread = threading.Thread(target=discovery_loop)
thread.daemon = True
thread.start()
# Set up thread for discovery packets
def discovery_loop():
self.discovery_handler(discovery_socket, ifname)
suitable_interfaces += 1
thread = threading.Thread(target=discovery_loop)
thread.daemon = True
thread.start()
suitable_interfaces += 1
except Exception as e:
nice_name = self.netinfo.interface_name_to_nice_name(ifname)
if nice_name != None and nice_name != ifname:
RNS.log(f"Could not configure the system interface {nice_name} / {ifname} for use with {self}, skipping it. The contained exception was: {e}", RNS.LOG_ERROR)
else:
RNS.log(f"Could not configure the system interface {ifname} for use with {self}, skipping it. The contained exception was: {e}", RNS.LOG_ERROR)
if suitable_interfaces == 0:
RNS.log(str(self)+" could not autoconfigure. This interface currently provides no connectivity.", RNS.LOG_WARNING)

View File

@ -128,6 +128,10 @@ class ROM():
MCU_ESP32 = 0x81
MCU_NRF52 = 0x71
PRODUCT_RAK4631 = 0x10
MODEL_11 = 0x11
MODEL_12 = 0x12
PRODUCT_RNODE = 0x03
MODEL_A1 = 0xA1
MODEL_A6 = 0xA6
@ -196,7 +200,7 @@ class ROM():
BOARD_GENERIC_ESP32 = 0x35
BOARD_LORA32_V2_0 = 0x36
BOARD_LORA32_V2_1 = 0x37
BOARD_RAK4630 = 0x51
BOARD_RAK4631 = 0x51
mapped_product = ROM.PRODUCT_RNODE
products = {
@ -208,22 +212,25 @@ products = {
ROM.PRODUCT_T32_21: "LilyGO LoRa32 v2.1",
ROM.PRODUCT_H32_V2: "Heltec LoRa32 v2",
ROM.PRODUCT_H32_V3: "Heltec LoRa32 v3",
ROM.PRODUCT_RAK4631: "RAK4631",
}
platforms = {
ROM.PLATFORM_AVR: "AVR",
ROM.PLATFORM_ESP32:"ESP32",
ROM.PLATFORM_NRF52:"NRF52",
ROM.PLATFORM_NRF52: "NRF52",
}
mcus = {
ROM.MCU_1284P: "ATmega1284P",
ROM.MCU_2560:"ATmega2560",
ROM.MCU_ESP32:"Espressif Systems ESP32",
ROM.MCU_NRF52:"Nordic nRF52840",
ROM.MCU_NRF52: "Nordic Semiconductor nRF52840",
}
models = {
0x11: [430000000, 510000000, 22, "430 - 510 MHz", "rnode_firmware_rak4631.zip", "SX1262"],
0x12: [779000000, 928000000, 22, "779 - 928 MHz", "rnode_firmware_rak4631.zip", "SX1262"],
0xA4: [410000000, 525000000, 14, "410 - 525 MHz", "rnode_firmware.hex", "SX1278"],
0xA9: [820000000, 1020000000, 17, "820 - 1020 MHz", "rnode_firmware.hex", "SX1276"],
0xA1: [410000000, 525000000, 22, "410 - 525 MHz", "rnode_firmware_t3s3.zip", "SX1268"],
@ -306,6 +313,7 @@ class RNode():
self.bandwidth = None
self.detected = None
self.usb_serial_id = None
self.platform = None
self.mcu = None
@ -324,6 +332,7 @@ class RNode():
self.checksum = None
self.device_hash = None
self.firmware_hash = None
self.firmware_hash_target = None
self.signature = None
self.signature_valid = False
self.locally_signed = False
@ -471,6 +480,8 @@ class RNode():
escape = False
command_buffer = command_buffer+bytes([byte])
if (len(command_buffer) == 33):
if command_buffer[0] == 0x01:
self.firmware_hash_target = command_buffer[1:]
if command_buffer[0] == 0x02:
self.firmware_hash = command_buffer[1:]
@ -588,7 +599,7 @@ class RNode():
self.version = str(self.major_version)+"."+minstr
def detect(self):
kiss_command = bytes([KISS.FEND, KISS.CMD_DETECT, KISS.DETECT_REQ, KISS.FEND, KISS.CMD_FW_VERSION, 0x00, KISS.FEND, KISS.CMD_PLATFORM, 0x00, KISS.FEND, KISS.CMD_MCU, 0x00, KISS.FEND, KISS.CMD_BOARD, 0x00, KISS.FEND, KISS.CMD_DEV_HASH, 0x01, KISS.FEND, KISS.CMD_HASHES, 0x02, KISS.FEND])
kiss_command = bytes([KISS.FEND, KISS.CMD_DETECT, KISS.DETECT_REQ, KISS.FEND, KISS.CMD_FW_VERSION, 0x00, KISS.FEND, KISS.CMD_PLATFORM, 0x00, KISS.FEND, KISS.CMD_MCU, 0x00, KISS.FEND, KISS.CMD_BOARD, 0x00, KISS.FEND, KISS.CMD_DEV_HASH, 0x01, KISS.FEND, KISS.CMD_HASHES, 0x01, KISS.FEND, KISS.CMD_HASHES, 0x02, KISS.FEND])
written = self.serial.write(kiss_command)
if written != len(kiss_command):
raise IOError("An IO error occurred while detecting hardware for "+self(str))
@ -735,6 +746,10 @@ class RNode():
if written != len(kiss_command):
raise IOError("An IO error occurred while wiping EEPROM")
sleep(13);
# Due to the current janky emulated EEPROM implementation for the
# RAK4631, extra time must be given to allow for writing.
if self.board == ROM.BOARD_RAK4631:
sleep(10)
def hard_reset(self):
kiss_command = bytes([KISS.FEND, KISS.CMD_RESET, 0xf8, KISS.FEND])
@ -1249,6 +1264,8 @@ def main():
parser.add_argument("-k", "--key", action="store_true", help="Generate a new signing key and exit") #
parser.add_argument("-S", "--sign", action="store_true", help="Display public part of signing key")
parser.add_argument("-H", "--firmware-hash", action="store", help="Display installed firmware hash")
parser.add_argument("-K", "--get-target-firmware-hash", action="store_true", help=argparse.SUPPRESS) # Get target firmware hash from device
parser.add_argument("-L", "--get-firmware-hash", action="store_true", help=argparse.SUPPRESS) # Get calculated firmware hash from device
parser.add_argument("--platform", action="store", metavar="platform", type=str, default=None, help="Platform specification for device bootstrap")
parser.add_argument("--product", action="store", metavar="product", type=str, default=None, help="Product specification for device bootstrap") #
parser.add_argument("--model", action="store", metavar="model", type=str, default=None, help="Model code for device bootstrap")
@ -1400,6 +1417,7 @@ def main():
graceful_exit()
rnode = RNode(rnode_serial)
rnode.usb_serial_id = port_serialno
thread = threading.Thread(target=rnode.readLoop, daemon=True).start()
try:
rnode.device_probe()
@ -1412,54 +1430,58 @@ def main():
else:
RNS.log("Could not detect a connected RNode")
if rnode.provisioned:
if not rnode.signature_valid:
print("\nThe device signature in this RNode is unknown and cannot be verified. It is still")
print("possible to extract the firmware from it, but you should make absolutely sure that")
print("it comes from a trusted source. It is possible that someone could have modified the")
print("firmware. If that is the case, these modifications will propagate to any new RNodes")
print("descendent from this one!")
print("\nHit enter if you are sure you want to continue.")
input()
if rnode.platform == ROM.PLATFORM_ESP32:
if rnode.provisioned:
if not rnode.signature_valid:
print("\nThe device signature in this RNode is unknown and cannot be verified. It is still")
print("possible to extract the firmware from it, but you should make absolutely sure that")
print("it comes from a trusted source. It is possible that someone could have modified the")
print("firmware. If that is the case, these modifications will propagate to any new RNodes")
print("descendent from this one!")
print("\nHit enter if you are sure you want to continue.")
input()
if rnode.firmware_hash != None:
extracted_hash = rnode.firmware_hash
extracted_version = rnode.version
rnode.disconnect()
v_str = str(extracted_version)+" "+RNS.hexrep(extracted_hash, delimit=False)
print("\nFound RNode Firmvare v"+v_str)
if rnode.firmware_hash != None:
extracted_hash = rnode.firmware_hash
extracted_version = rnode.version
rnode.disconnect()
v_str = str(extracted_version)+" "+RNS.hexrep(extracted_hash, delimit=False)
print("\nFound RNode Firmvare v"+v_str)
print("\nReady to extract firmware images from the RNode")
print("Press enter to start the extraction process")
input()
extract_recovery_esptool()
print("\nReady to extract firmware images from the RNode")
print("Press enter to start the extraction process")
input()
extract_recovery_esptool()
hash_f = open(EXT_DIR+"/extracted_rnode_firmware.version", "wb")
hash_f.write(v_str.encode("utf-8"))
hash_f.close()
hash_f = open(EXT_DIR+"/extracted_rnode_firmware.version", "wb")
hash_f.write(v_str.encode("utf-8"))
hash_f.close()
extraction_parts = [
("bootloader", "python \""+CNF_DIR+"/recovery_esptool.py\" --chip esp32 --port "+port_path+" --baud "+args.baud_flash+" --before default_reset --after hard_reset read_flash 0x1000 0x4650 \""+EXT_DIR+"/extracted_rnode_firmware.bootloader\""),
("partition table", "python \""+CNF_DIR+"/recovery_esptool.py\" --chip esp32 --port "+port_path+" --baud "+args.baud_flash+" --before default_reset --after hard_reset read_flash 0x8000 0xC00 \""+EXT_DIR+"/extracted_rnode_firmware.partitions\""),
("app boot", "python \""+CNF_DIR+"/recovery_esptool.py\" --chip esp32 --port "+port_path+" --baud "+args.baud_flash+" --before default_reset --after hard_reset read_flash 0xe000 0x2000 \""+EXT_DIR+"/extracted_rnode_firmware.boot_app0\""),
("application image", "python \""+CNF_DIR+"/recovery_esptool.py\" --chip esp32 --port "+port_path+" --baud "+args.baud_flash+" --before default_reset --after hard_reset read_flash 0x10000 0x200000 \""+EXT_DIR+"/extracted_rnode_firmware.bin\""),
("console image", "python \""+CNF_DIR+"/recovery_esptool.py\" --chip esp32 --port "+port_path+" --baud "+args.baud_flash+" --before default_reset --after hard_reset read_flash 0x210000 0x1F0000 \""+EXT_DIR+"/extracted_console_image.bin\""),
]
import subprocess, shlex
for part, command in extraction_parts:
print("Extracting "+part+"...")
if subprocess.call(shlex.split(command), stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) != 0:
print("The extraction failed, the following command did not complete successfully:\n"+command)
graceful_exit(182)
extraction_parts = [
("bootloader", "python \""+CNF_DIR+"/recovery_esptool.py\" --chip esp32 --port "+port_path+" --baud "+args.baud_flash+" --before default_reset --after hard_reset read_flash 0x1000 0x4650 \""+EXT_DIR+"/extracted_rnode_firmware.bootloader\""),
("partition table", "python \""+CNF_DIR+"/recovery_esptool.py\" --chip esp32 --port "+port_path+" --baud "+args.baud_flash+" --before default_reset --after hard_reset read_flash 0x8000 0xC00 \""+EXT_DIR+"/extracted_rnode_firmware.partitions\""),
("app boot", "python \""+CNF_DIR+"/recovery_esptool.py\" --chip esp32 --port "+port_path+" --baud "+args.baud_flash+" --before default_reset --after hard_reset read_flash 0xe000 0x2000 \""+EXT_DIR+"/extracted_rnode_firmware.boot_app0\""),
("application image", "python \""+CNF_DIR+"/recovery_esptool.py\" --chip esp32 --port "+port_path+" --baud "+args.baud_flash+" --before default_reset --after hard_reset read_flash 0x10000 0x200000 \""+EXT_DIR+"/extracted_rnode_firmware.bin\""),
("console image", "python \""+CNF_DIR+"/recovery_esptool.py\" --chip esp32 --port "+port_path+" --baud "+args.baud_flash+" --before default_reset --after hard_reset read_flash 0x210000 0x1F0000 \""+EXT_DIR+"/extracted_console_image.bin\""),
]
import subprocess, shlex
for part, command in extraction_parts:
print("Extracting "+part+"...")
if subprocess.call(shlex.split(command), stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) != 0:
print("The extraction failed, the following command did not complete successfully:\n"+command)
exit(182)
print("\nFirmware successfully extracted!")
print("\nYou can now use this firmware to update or autoinstall other RNodes")
graceful_exit()
else:
print("Could not read firmware information from device")
print("\nFirmware successfully extracted!")
print("\nYou can now use this firmware to update or autoinstall other RNodes")
exit()
else:
print("Could not read firmware information from device")
print("\nRNode firmware extraction failed")
graceful_exit(180)
print("\nRNode firmware extraction failed")
graceful_exit(180)
else:
print("\nFirmware extraction is currently only supported on ESP32-based RNodes.")
graceful_exit(170)
if args.autoinstall:
clear()
@ -1533,6 +1555,7 @@ def main():
graceful_exit()
rnode = RNode(rnode_serial)
rnode.usb_serial_id = port_serialno
thread = threading.Thread(target=rnode.readLoop, daemon=True).start()
try:
rnode.device_probe()
@ -1577,6 +1600,7 @@ def main():
print("[7] Heltec LoRa32 v2")
print("[8] Heltec LoRa32 v3")
#print("[9] LilyGO LoRa T3S3")
print("[10] RAK4631")
print(" .")
print(" / \\ Select one of these options if you want to easily turn")
print(" | a supported development board into an RNode.")
@ -1588,7 +1612,7 @@ def main():
try:
c_dev = int(input())
c_mod = False
if c_dev < 1 or c_dev > 9:
if c_dev < 1 or c_dev > 10:
raise ValueError()
elif c_dev == 1:
selected_product = ROM.PRODUCT_RNODE
@ -1717,6 +1741,15 @@ def main():
print("")
print("Please note that Bluetooth is currently not implemented on this board.")
print("")
elif c_dev == 10:
selected_product = ROM.PRODUCT_RAK4631
clear()
print("")
print("---------------------------------------------------------------------------")
print(" RAK4631 RNode Installer")
print("")
print("Important! Using RNode firmware on RAKwireless devices should currently be")
print("considered experimental. It is not intended for production or critical use.")
print("The currently supplied firmware is provided AS-IS as a courtesey to those")
print("who would like to experiment with it. Hit enter to continue.")
print("---------------------------------------------------------------------------")
@ -1969,11 +2002,6 @@ def main():
elif selected_product == ROM.PRODUCT_H32_V3:
selected_mcu = ROM.MCU_ESP32
print("\nWhat band is this Heltec LoRa32 V3 for?\n")
print("[1] 433 MHz")
print("[2] 868 MHz")
print("[3] 915 MHz")
print("[4] 923 MHz")
print("\n? ", end="")
try:
c_model = int(input())
if c_model < 1 or c_model > 4:
@ -1984,6 +2012,27 @@ def main():
elif c_model > 1:
selected_model = ROM.MODEL_CA
selected_platform = ROM.PLATFORM_ESP32
except Exception as e:
print("That band does not exist, exiting now.")
exit()
elif selected_product == ROM.PRODUCT_RAK4631:
selected_mcu = ROM.MCU_NRF52
print("\nWhat band is this RAK4631 for?\n")
print("[1] 433 MHz")
print("[2] 868 MHz")
print("[3] 915 MHz")
print("[4] 923 MHz")
print("\n? ", end="")
try:
c_model = int(input())
if c_model < 1 or c_model > 4:
raise ValueError()
elif c_model == 1:
selected_model = ROM.MODEL_11
selected_platform = ROM.PLATFORM_NRF52
elif c_model > 1:
selected_model = ROM.MODEL_12
selected_platform = ROM.PLATFORM_NRF52
except Exception as e:
print("That band does not exist, exiting now.")
graceful_exit()
@ -2173,16 +2222,26 @@ def main():
if not args.autoinstall:
graceful_exit()
def get_partition_hash(partition_file):
def get_partition_hash(platform, partition_file):
try:
firmware_data = open(partition_file, "rb").read()
calc_hash = hashlib.sha256(firmware_data[0:-32]).digest()
part_hash = firmware_data[-32:]
if platform == ROM.PLATFORM_ESP32 or platform == ROM.PLATFORM_AVR:
firmware_data = open(partition_file, "rb").read()
# Calculate the digest manually and see if it matches the
# SHA256 digest included in the ESP32 image.
calc_hash = hashlib.sha256(firmware_data[0:-32]).digest()
part_hash = firmware_data[-32:]
if calc_hash == part_hash:
return part_hash
else:
return None
if calc_hash == part_hash:
return part_hash
else:
return None
elif platform == ROM.PLATFORM_NRF52:
# Calculate digest manually, as it is not included in the image.
firmware_data = open(partition_file, "rb")
hash = hashlib.file_digest(firmware_data, 'sha256').digest()
firmware_data.close()
return hash
except Exception as e:
RNS.log("Could not calculate firmware partition hash. The contained exception was:")
RNS.log(str(e))
@ -2684,6 +2743,21 @@ def main():
RNS.log("Please install \""+flasher+"\" and try again.")
graceful_exit()
elif platform == ROM.PLATFORM_NRF52:
flasher = "adafruit-nrfutil"
if which(flasher) is not None:
return [flasher, "dfu", "serial", "--package", UPD_DIR+"/"+selected_version+"/"+fw_filename, "-p", args.port, "-b", "115200", "-t", "1200"]
else:
RNS.log("")
RNS.log("You do not currently have the \""+flasher+"\" program installed on your system.")
RNS.log("Unfortunately, that means we can't proceed, since it is needed to flash your")
RNS.log("board. You can install it via your package manager, for example:")
RNS.log("")
RNS.log(" pip3 install --user adafruit-nrfutil")
RNS.log("")
RNS.log("Please install \""+flasher+"\" and try again.")
graceful_exit()
if args.port:
wants_fw_provision = False
if args.flash:
@ -2745,6 +2819,11 @@ def main():
wants_fw_provision = True
RNS.log("Waiting for ESP32 reset...")
time.sleep(7)
if args.platform == ROM.PLATFORM_NRF52:
wants_fw_provision = True
RNS.log("Waiting for NRF52 reset...")
# Don't need to wait as long this time.
time.sleep(5)
else:
RNS.log("Error from flasher ("+str(flash_status)+") while writing.")
RNS.log("Some boards have trouble flashing at high speeds, and you can")
@ -2770,6 +2849,11 @@ def main():
graceful_exit()
rnode = RNode(rnode_serial)
ports = list_ports.comports()
for port in ports:
if port.device == args.port:
rnode.usb_serial_id = port.serial_number
break
thread = threading.Thread(target=rnode.readLoop, daemon=True).start()
try:
@ -2788,7 +2872,10 @@ def main():
if args.eeprom_wipe:
RNS.log("WARNING: EEPROM is being wiped! Power down device NOW if you do not want this!")
rnode.wipe_eeprom()
rnode.hard_reset()
if rnode.platform != ROM.PLATFORM_NRF52:
rnode.hard_reset()
graceful_exit()
RNS.log("Reading EEPROM...")
@ -2811,8 +2898,12 @@ def main():
fw_filename = "rnode_firmware_featheresp32.zip"
elif rnode.board == ROM.BOARD_GENERIC_ESP32:
fw_filename = "rnode_firmware_esp32_generic.zip"
elif rnode.platform == ROM.PLATFORM_NRF52:
if rnode.board == ROM.BOARD_RAK4631:
fw_filename = "rnode_firmware_rak4631.zip"
else:
fw_filename = None
if args.update:
RNS.log("ERROR: No firmware found for this board. Cannot update.")
graceful_exit()
@ -2893,12 +2984,16 @@ def main():
partition_full_path = EXT_DIR+"/extracted_rnode_firmware.bin"
else:
partition_full_path = UPD_DIR+"/"+selected_version+"/"+partition_filename
partition_hash = get_partition_hash(partition_full_path)
partition_hash = get_partition_hash(rnode.platform, partition_full_path)
if partition_hash != None:
rnode.set_firmware_hash(partition_hash)
rnode.indicate_firmware_update()
sleep(1)
if rnode.platform == ROM.PLATFORM_NRF52:
# Allow extra time for writing to EEPROM on NRF52. Current implementation is slow.
sleep(14)
rnode.disconnect()
flash_status = call(get_flasher_call(rnode.platform, fw_filename))
if flash_status == 0:
@ -3090,6 +3185,27 @@ def main():
if rnode.platform == ROM.PLATFORM_ESP32:
RNS.log("Waiting for ESP32 reset...")
time.sleep(6)
elif rnode.platform == ROM.PLATFORM_NRF52:
rnode_serial.close()
RNS.log("Waiting for NRF52 reset...")
time.sleep(14)
selected_port = None
ports = list_ports.comports()
for port in ports:
if port.serial_number == rnode.usb_serial_id:
selected_port = port
break
if selected_port is None:
RNS.log("Could not detect new port for NRF52...")
else:
try:
rnode_serial = rnode_open_serial(selected_port.device)
rnode.serial = rnode_serial
thread = threading.Thread(target=rnode.readLoop, daemon=True).start()
except Exception as e:
RNS.log("Could not open the specified serial port. The contained exception was:")
RNS.log(str(e))
exit()
else:
time.sleep(3)
@ -3114,6 +3230,8 @@ def main():
if args.product != None:
if args.product == "03":
mapped_product = ROM.PRODUCT_RNODE
elif args.product == "10":
mapped_product = ROM.PRODUCT_RAK4631
elif args.product == "f0":
mapped_product = ROM.PRODUCT_HMBRW
elif args.product == "e0":
@ -3130,7 +3248,11 @@ def main():
else:
model = mapped_model
else:
if args.model == "a4":
if args.model == "11":
model = ROM.MODEL_11
elif args.model == "12":
model = ROM.MODEL_12
elif args.model == "a4":
model = ROM.MODEL_A4
elif args.model == "a9":
model = ROM.MODEL_A9
@ -3243,7 +3365,9 @@ def main():
time.sleep(0.006)
rnode.write_eeprom(ROM.ADDR_INFO_LOCK, ROM.INFO_LOCK_BYTE)
if rnode.platform == ROM.PLATFORM_NRF52:
# Allow extra time for writing to EEPROM on NRF52. Current implementation is slow.
sleep(3)
RNS.log("EEPROM written! Validating...")
if wants_fw_provision:
@ -3257,18 +3381,58 @@ def main():
vf.close()
else:
partition_filename = fw_filename.replace(".zip", ".bin")
partition_hash = get_partition_hash(UPD_DIR+"/"+selected_version+"/"+partition_filename)
partition_hash = get_partition_hash(rnode.platform, UPD_DIR+"/"+selected_version+"/"+partition_filename)
if partition_hash != None:
time.sleep(0.75)
RNS.log("Setting firmware checksum...")
rnode.set_firmware_hash(partition_hash)
rnode.hard_reset()
if rnode.platform == ROM.PLATFORM_ESP32:
rnode.hard_reset()
RNS.log("Waiting for ESP32 reset...")
time.sleep(6.5)
elif rnode.platform == ROM.PLATFORM_NRF52:
rnode.hard_reset()
# The hard reset on this platform is different
# to that of the ESP32 platform, it causes
# disruption to the serial connection.
# Therefore, we have to reestablish the serial
# connection after the reset.
rnode_serial.close()
RNS.log("Waiting for NRF52 reset...")
# Give plenty of time for to allow for
# potential e-ink display refresh too.
time.sleep(14)
# After the hard reset, the port number will
# change. We need to find the new port number,
# which can be done non-interactively by
# comparing the USB serial numbers of the
# original port and the one we are currently
# iterating.
selected_port = None
ports = list_ports.comports()
for port in ports:
if port.serial_number == rnode.usb_serial_id:
selected_port = port
break
if selected_port is None:
RNS.log("Could not detect new port for NRF52...")
else:
try:
rnode_serial = rnode_open_serial(selected_port.device)
rnode.serial = rnode_serial
thread = threading.Thread(target=rnode.readLoop, daemon=True).start()
except Exception as e:
RNS.log("Could not open the specified serial port. The contained exception was:")
RNS.log(str(e))
exit()
else:
rnode.hard_reset()
rnode.download_eeprom()
if rnode.provisioned:
RNS.log("EEPROM Bootstrapping successful!")
@ -3367,6 +3531,22 @@ def main():
RNS.log("This device has not been provisioned yet, cannot set firmware hash")
graceful_exit(77)
if args.get_target_firmware_hash:
if rnode.provisioned:
RNS.log(f"The target firmware hash is: {rnode.firmware_hash_target.hex()}")
else:
RNS.log("This device has not been provisioned yet, cannot get firmware hash")
exit(77)
if args.get_firmware_hash:
if rnode.provisioned:
RNS.log(f"The actual firmware hash is: {rnode.firmware_hash.hex()}")
else:
RNS.log("This device has not been provisioned yet, cannot get firmware hash")
exit(77)
if rnode.provisioned:
if args.normal:
rnode.setNormalMode()

View File

@ -1 +1 @@
__version__ = "0.7.4"
__version__ = "0.7.5"

View File

@ -18,6 +18,18 @@ def interface_names_to_indexes() -> dict:
results[adapter.name] = adapter.index
return results
def interface_name_to_nice_name(ifname) -> str:
try:
adapters = RNS.vendor.ifaddr.get_adapters(include_unconfigured=True)
for adapter in adapters:
if adapter.name == ifname:
if hasattr(adapter, "nice_name"):
return adapter.nice_name
except:
return None
return None
def ifaddresses(ifname) -> dict:
adapters = RNS.vendor.ifaddr.get_adapters(include_unconfigured=True)
ifa = {}