import asyncio import serial_asyncio from rich.console import Console from rich import inspect from datetime import datetime import mtrreader import mtrlog class OutputProtocol(asyncio.Protocol): def connection_made(self, transport): self.transport = transport print('port opened', transport) transport.serial.rts = False # You can manipulate Serial object via transport transport.write(b'Hello, World!\n') # Write serial data via transport def data_received(self, data): print('data received', repr(data)) msg = mtrreader.MtrDataMessage(data) inspect(msg) print(mtrlog.MtrLogFormatter.format(self, msg, datetime.now())) if b'\n' in data: self.transport.close() def connection_lost(self, exc): print('port closed') self.transport.loop.stop() def pause_writing(self): print('pause writing') print(self.transport.get_write_buffer_size()) def resume_writing(self): print(self.transport.get_write_buffer_size()) print('resume writing') loop = asyncio.get_event_loop() coro = serial_asyncio.create_serial_connection(loop, OutputProtocol, '/dev/ttyUSB0', baudrate=9600) transport, protocol = loop.run_until_complete(coro) loop.run_forever() loop.close()