42 lines
1.3 KiB
Python
42 lines
1.3 KiB
Python
|
import asyncio
|
||
|
import serial_asyncio
|
||
|
from rich.console import Console
|
||
|
from rich import inspect
|
||
|
from datetime import datetime
|
||
|
|
||
|
import mtrreader
|
||
|
import mtrlog
|
||
|
|
||
|
class OutputProtocol(asyncio.Protocol):
|
||
|
def connection_made(self, transport):
|
||
|
self.transport = transport
|
||
|
print('port opened', transport)
|
||
|
transport.serial.rts = False # You can manipulate Serial object via transport
|
||
|
transport.write(b'Hello, World!\n') # Write serial data via transport
|
||
|
|
||
|
def data_received(self, data):
|
||
|
print('data received', repr(data))
|
||
|
msg = mtrreader.MtrDataMessage(data)
|
||
|
inspect(msg)
|
||
|
print(mtrlog.MtrLogFormatter.format(self, msg, datetime.now()))
|
||
|
if b'\n' in data:
|
||
|
self.transport.close()
|
||
|
|
||
|
def connection_lost(self, exc):
|
||
|
print('port closed')
|
||
|
self.transport.loop.stop()
|
||
|
|
||
|
def pause_writing(self):
|
||
|
print('pause writing')
|
||
|
print(self.transport.get_write_buffer_size())
|
||
|
|
||
|
def resume_writing(self):
|
||
|
print(self.transport.get_write_buffer_size())
|
||
|
print('resume writing')
|
||
|
|
||
|
loop = asyncio.get_event_loop()
|
||
|
coro = serial_asyncio.create_serial_connection(loop, OutputProtocol, '/dev/ttyUSB0', baudrate=9600)
|
||
|
transport, protocol = loop.run_until_complete(coro)
|
||
|
loop.run_forever()
|
||
|
loop.close()
|