otime-testing/mtrreader.py
2022-03-17 21:57:10 +01:00

277 lines
9.9 KiB
Python

# From http://ttime.no/rs232.pdf
#
# MESSAGE DESCRIPTION:
# ====================
#
# MTR--datamessage
# ---------------
# Fieldname # bytes
# Preamble 4 FFFFFFFF(hex) (4 "FF"'s never occur "inside" a message).
# (Can be used to "resynchronize" logic if a connection is
# broken)
# Package-size 1 number of bytes excluding preamble (=230)
# Package-type 1 'M' as "MTR-datamessage"
# MTR-id 2 Serial number of MTR2; Least significant byte first
# Timestamp 6 Binary Year, Month, Day, Hour, Minute, Second
# TS-milliseconds 2 Milliseconds NOT YET USED, WILL BE 0 IN THIS VERSION
# Package# 4 Binary Counter, from 1 and up; Least sign byte first
# Card-id 3 Binary, Least sign byte first
# Producweek 1 0-53 ; 0 when package is retrived from "history"
# Producyear 1 94-99,0-..X ; 0 when package is retrived from "history"
# ECardHeadSum 1 Headchecksum from card; 0 when package is retrived from
# "history"
# The following fields are repeated 50 times:
# CodeN 1 ControlCode; unused positions have 0
# TimeN 2 Time binary seconds. Least sign. first, Most sign. last;
# unused:0
# ASCII-string 56 Various info depending on ECard-type; 20h (all spaces)
# when retr. from "history" (See ASCII-String)
# Checksum 1 Binary SUM (MOD 256) of all bytes including Preamble
# NULL-Filler 1 Binary 0 (to avoid potential 5 FF's. Making it easier to
# hunt PREAMBLE
# ---------------------------------------
# Size 234
#
# Status-message
# --------------
# Fieldname # bytes
# Preamble 4 FFFFFFFF(hex) (FFFFFFFF never occur elsewhere within
# a frame).
# Package-size 1 number of bytes excluding preamble (=55)
# Package-type 1 'S' as "Status-message" (0x53)
# MTR-id 2 Serial number of MTR2.
# CurrentTime 6 Binary Year, Month, Day, Hour, Minute, Second
# CurrentMilliseconds 2 Milliseconds NOT YET USED, WILL BE 0 IN THIS VERSION
# BatteryStatus 1 1 if battery low. 0 if battery OK.
# RecentPackage# 4 if this is 0, then ALL following # should be ignored!
# OldestPackage# 4 note: If RecentPack==0 then this is still 1! meaning:
# Number of packages in MTR is
# "RecentPackage# - OldestPackage# + 1"
# CurrentSessionStart# 4 Current session is from here to RecentPackage
# (if NOT = 0)
# Prev1SessStart# 4 Prev session was from Prev1SessStart# to
# CurrentSessionStart# - 1
# Prev2SessStart# 4
# Prev3SessStart# 4
# Prev4SessStart# 4
# Prev5SessStart# 4
# Prev6SessStart# 4
# Prev7SessStart# 4
# Checksum 1 Binary SUM (MOD 256) of all bytes including Preamble
# NULL-Filler 1 Binary 0 (to avoid potential 5 FF's. Making it easier
# to hunt PREAMBLE
# ---------------------------------------
# Size 59
import logging
logger = logging.getLogger()
def extend_with(old_items, new_items):
old_items.extend(new_items)
return new_items
def checksum_of(message_bytes):
return sum(message_bytes) % 256
class MtrReader:
def __init__(self, serial_port):
self.serial_port = serial_port
def send_status_command(self):
self.serial_port.write(b'/ST')
def send_spool_all_command(self):
self.serial_port.write(b'/SA')
def receive(self):
messages = []
timed_out = False
PREAMBLE = b'\xFF\xFF\xFF\xFF'
while not timed_out:
preamble_buffer = bytearray()
while preamble_buffer != PREAMBLE:
if len(preamble_buffer) == len(PREAMBLE):
# make room for incoming byte
preamble_buffer.pop(0)
bytes_read_waiting = self.serial_port.read()
timed_out = len(bytes_read_waiting) == 0
if timed_out:
logger.debug(
'Timed out, returning %d messages', len(messages))
return messages
preamble_buffer.extend(bytes_read_waiting)
logger.debug(
'Byte read waiting (hex): %s '
'(current preamble buffer: %s)',
bytes_read_waiting.hex(),
preamble_buffer.hex())
logger.debug('Saw preable, start package parsing')
message_bytes = bytearray()
message_bytes.extend(preamble_buffer)
package_size_numbytes = 1
package_type_numbytes = 1
package_size = int.from_bytes(
extend_with(
message_bytes,
self.serial_port.read(package_size_numbytes)),
'little')
package_type = int.from_bytes(
extend_with(
message_bytes,
self.serial_port.read(package_type_numbytes)),
'little')
num_remaining_bytes_expected = (
package_size
- package_size_numbytes
- package_type_numbytes)
remaining_bytes = self.serial_port.read(
num_remaining_bytes_expected)
if len(remaining_bytes) < num_remaining_bytes_expected:
logger.warning('Did not receive expected number of bytes')
continue
message_bytes.extend(remaining_bytes)
msg = None
if (package_type == ord('M')):
msg = MtrDataMessage(message_bytes)
elif package_type == ord('S'):
msg = MtrStatusMessage(message_bytes)
else:
logger.warning('Got unsupported package type %d', package_type)
continue
logger.info(
"Got message number %d (hex): %s",
len(messages) + 1, message_bytes.hex())
if not msg.is_checksum_valid():
logger.warning("Message has incorrect checksum")
continue
messages.append(msg)
return messages
class MtrStatusMessage:
def __init__(self, message_bytes):
self.message_bytes = message_bytes
def mtr_id(self):
return int.from_bytes(self.message_bytes[6:8], 'little')
def timestamp_year(self):
return int.from_bytes(self.message_bytes[8:9], 'little')
def timestamp_month(self):
return int.from_bytes(self.message_bytes[9:10], 'little')
def timestamp_day(self):
return int.from_bytes(self.message_bytes[10:11], 'little')
def timestamp_hours(self):
return int.from_bytes(self.message_bytes[11:12], 'little')
def timestamp_minutes(self):
return int.from_bytes(self.message_bytes[12:13], 'little')
def timestamp_seconds(self):
return int.from_bytes(self.message_bytes[13:14], 'little')
def timestamp_milliseconds(self):
return int.from_bytes(self.message_bytes[14:16], 'little')
def battery_status(self):
return int.from_bytes(self.message_bytes[16:17], 'little')
# package number fields not supported (yet)
def is_checksum_valid(self):
checksum = int.from_bytes(self.message_bytes[57:58], 'little')
# calculate checksum for message bytes up until checksum
calculated_checksum = checksum_of(self.message_bytes[:57])
logger.debug(
"Calculated checksum %d, read %d",
calculated_checksum, checksum)
return checksum == calculated_checksum
class MtrDataMessage:
def __init__(self, message_bytes):
self.message_bytes = message_bytes
def mtr_id(self):
return int.from_bytes(self.message_bytes[6:8], 'little')
def timestamp_year(self):
return int.from_bytes(self.message_bytes[8:9], 'little')
def timestamp_month(self):
return int.from_bytes(self.message_bytes[9:10], 'little')
def timestamp_day(self):
return int.from_bytes(self.message_bytes[10:11], 'little')
def timestamp_hours(self):
return int.from_bytes(self.message_bytes[11:12], 'little')
def timestamp_minutes(self):
return int.from_bytes(self.message_bytes[12:13], 'little')
def timestamp_seconds(self):
return int.from_bytes(self.message_bytes[13:14], 'little')
def timestamp_milliseconds(self):
return int.from_bytes(self.message_bytes[14:16], 'little')
def packet_num(self):
return int.from_bytes(self.message_bytes[16:20], 'little')
def card_id(self):
return int.from_bytes(self.message_bytes[20:23], 'little')
# product week (1 byte)
# product year (1 byte)
# ecard head checksum (1 byte)
def splits(self):
splits = []
splits_offset = 26
code_numbytes = 1
time_numbytes = 2
split_numbytes = code_numbytes + time_numbytes
for split_index in range(50):
code_offset = splits_offset + split_index * split_numbytes
time_offset = code_offset + code_numbytes
code = int.from_bytes(
self.message_bytes[code_offset:code_offset+code_numbytes],
'little')
time = int.from_bytes(
self.message_bytes[time_offset:time_offset+time_numbytes],
'little')
splits.append((code, time))
return splits
def ascii_string(self):
return self.message_bytes[176:232].decode('ascii')
def is_checksum_valid(self):
checksum = int.from_bytes(self.message_bytes[232:233], 'little')
# calculate checksum for message bytes up until checksum
calculated_checksum = checksum_of(self.message_bytes[:232])
logger.debug(
"Calculated checksum %d, read %d",
calculated_checksum, checksum)
return checksum == calculated_checksum