Added display read command to RNodeInterface

This commit is contained in:
Mark Qvist 2024-12-08 14:25:58 +01:00
parent 32b5c7a3af
commit ae7dffdfc0
2 changed files with 140 additions and 1 deletions

View File

@ -68,6 +68,7 @@ class KISS():
CMD_RANDOM = 0x40
CMD_FB_EXT = 0x41
CMD_FB_READ = 0x42
CMD_DISP_READ = 0x66
CMD_FB_WRITE = 0x43
CMD_BT_CTRL = 0x46
CMD_PLATFORM = 0x48
@ -262,6 +263,8 @@ class RNodeInterface(Interface):
BATTERY_STATE_CHARGING = 0x02
BATTERY_STATE_CHARGED = 0x03
DISPLAY_READ_INTERVAL = 1.0
@classmethod
def bluetooth_control(device_serial = None, port = None, enable_bluetooth = False, disable_bluetooth = False, pairing_mode = False):
if (port != None or device_serial != None) and (enable_bluetooth or disable_bluetooth or pairing_mode):
@ -468,6 +471,15 @@ class RNodeInterface(Interface):
self.r_premable_time_ms = None
self.r_battery_state = RNodeInterface.BATTERY_STATE_UNKNOWN
self.r_battery_percent = 0
self.r_framebuffer = b""
self.r_framebuffer_readtime = 0
self.r_framebuffer_latency = 0
self.r_disp = b""
self.r_disp_readtime = 0
self.r_disp_latency = 0
self.should_read_display = False
self.read_display_interval = RNodeInterface.DISPLAY_READ_INTERVAL
self.packet_queue = []
self.flow_control = flow_control
@ -809,6 +821,33 @@ class RNodeInterface(Interface):
if written != len(kiss_command):
raise IOError("An IO error occurred while writing framebuffer data device")
def read_framebuffer(self):
kiss_command = bytes([KISS.FEND])+bytes([KISS.CMD_FB_READ])+bytes([0x01])+bytes([KISS.FEND])
written = self.serial.write(kiss_command)
self.r_framebuffer_readtime = time.time()
if written != len(kiss_command):
raise IOError("An IO error occurred while sending framebuffer read command")
def read_display(self):
kiss_command = bytes([KISS.FEND])+bytes([KISS.CMD_DISP_READ])+bytes([0x01])+bytes([KISS.FEND])
written = self.serial.write(kiss_command)
self.r_disp_readtime = time.time()
if written != len(kiss_command):
raise IOError("An IO error occurred while sending display read command")
def _read_display_job(self):
while self.should_read_display:
self.read_display()
time.sleep(self.read_display_interval)
def start_display_updates(self):
if not self.should_read_display:
self.should_read_display = True
threading.Thread(target=self._read_display_job, daemon=True).start()
def stop_display_updates(self):
self.should_read_display = False
def hard_reset(self):
kiss_command = bytes([KISS.FEND, KISS.CMD_RESET, 0xf8, KISS.FEND])
written = self.write_mux(kiss_command)
@ -1285,6 +1324,37 @@ class RNodeInterface(Interface):
raise IOError("ESP32 reset")
elif (command == KISS.CMD_READY):
self.process_queue()
elif (command == KISS.CMD_FB_READ):
if (byte == KISS.FESC):
escape = True
else:
if (escape):
if (byte == KISS.TFEND):
byte = KISS.FEND
if (byte == KISS.TFESC):
byte = KISS.FESC
escape = False
command_buffer = command_buffer+bytes([byte])
if (len(command_buffer) == 512):
self.r_framebuffer_latency = time.time() - self.r_framebuffer_readtime
self.r_framebuffer = command_buffer
elif (command == KISS.CMD_DISP_READ):
if (byte == KISS.FESC):
escape = True
else:
if (escape):
if (byte == KISS.TFEND):
byte = KISS.FEND
if (byte == KISS.TFESC):
byte = KISS.FESC
escape = False
command_buffer = command_buffer+bytes([byte])
if (len(command_buffer) == 1024):
self.r_disp_latency = time.time() - self.r_disp_readtime
self.r_disp = command_buffer
elif (command == KISS.CMD_DETECT):
if byte == KISS.DETECT_RESP:
self.detected = True

View File

@ -59,6 +59,7 @@ class KISS():
CMD_RANDOM = 0x40
CMD_FB_EXT = 0x41
CMD_FB_READ = 0x42
CMD_DISP_READ = 0x66
CMD_FB_WRITE = 0x43
CMD_BT_CTRL = 0x46
CMD_PLATFORM = 0x48
@ -118,6 +119,8 @@ class RNodeInterface(Interface):
BATTERY_STATE_CHARGING = 0x02
BATTERY_STATE_CHARGED = 0x03
DISPLAY_READ_INTERVAL = 1.0
def __init__(self, owner, configuration):
if RNS.vendor.platformutils.is_android():
raise SystemError("Invalid interface type. The Android-specific RNode interface must be used on Android")
@ -236,6 +239,15 @@ class RNodeInterface(Interface):
self.r_premable_time_ms = None
self.r_battery_state = RNodeInterface.BATTERY_STATE_UNKNOWN
self.r_battery_percent = 0
self.r_framebuffer = b""
self.r_framebuffer_readtime = 0
self.r_framebuffer_latency = 0
self.r_disp = b""
self.r_disp_readtime = 0
self.r_disp_latency = 0
self.should_read_display = False
self.read_display_interval = RNodeInterface.DISPLAY_READ_INTERVAL
self.packet_queue = []
self.flow_control = flow_control
@ -453,7 +465,34 @@ class RNodeInterface(Interface):
written = self.serial.write(kiss_command)
if written != len(kiss_command):
raise IOError("An IO error occurred while writing framebuffer data device")
raise IOError("An IO error occurred while writing framebuffer data to device")
def read_framebuffer(self):
kiss_command = bytes([KISS.FEND])+bytes([KISS.CMD_FB_READ])+bytes([0x01])+bytes([KISS.FEND])
written = self.serial.write(kiss_command)
self.r_framebuffer_readtime = time.time()
if written != len(kiss_command):
raise IOError("An IO error occurred while sending framebuffer read command")
def read_display(self):
kiss_command = bytes([KISS.FEND])+bytes([KISS.CMD_DISP_READ])+bytes([0x01])+bytes([KISS.FEND])
written = self.serial.write(kiss_command)
self.r_disp_readtime = time.time()
if written != len(kiss_command):
raise IOError("An IO error occurred while sending display read command")
def _read_display_job(self):
while self.should_read_display:
self.read_display()
time.sleep(self.read_display_interval)
def start_display_updates(self):
if not self.should_read_display:
self.should_read_display = True
threading.Thread(target=self._read_display_job, daemon=True).start()
def stop_display_updates(self):
self.should_read_display = False
def hard_reset(self):
kiss_command = bytes([KISS.FEND, KISS.CMD_RESET, 0xf8, KISS.FEND])
@ -916,6 +955,36 @@ class RNodeInterface(Interface):
raise IOError("ESP32 reset")
elif (command == KISS.CMD_READY):
self.process_queue()
elif (command == KISS.CMD_FB_READ):
if (byte == KISS.FESC):
escape = True
else:
if (escape):
if (byte == KISS.TFEND):
byte = KISS.FEND
if (byte == KISS.TFESC):
byte = KISS.FESC
escape = False
command_buffer = command_buffer+bytes([byte])
if (len(command_buffer) == 512):
self.r_framebuffer_latency = time.time() - self.r_framebuffer_readtime
self.r_framebuffer = command_buffer
elif (command == KISS.CMD_DISP_READ):
if (byte == KISS.FESC):
escape = True
else:
if (escape):
if (byte == KISS.TFEND):
byte = KISS.FEND
if (byte == KISS.TFESC):
byte = KISS.FESC
escape = False
command_buffer = command_buffer+bytes([byte])
if (len(command_buffer) == 1024):
self.r_disp_latency = time.time() - self.r_disp_readtime
self.r_disp = command_buffer
elif (command == KISS.CMD_DETECT):
if byte == KISS.DETECT_RESP:
self.detected = True