From 067c275c46d97df148a2a160bdd8bdc2da069b56 Mon Sep 17 00:00:00 2001 From: Aaron Heise <5148966+acehoss@users.noreply.github.com> Date: Thu, 2 Mar 2023 17:13:55 -0600 Subject: [PATCH] Buffer: send and receive binary data over Channel (also some minor fixes in channel) --- Examples/Buffer.py | 323 ++++++++++++++++++++++++++++++++++++++ Examples/Channel.py | 9 +- RNS/Buffer.py | 305 +++++++++++++++++++++++++++++++++++ RNS/Channel.py | 2 + RNS/__init__.py | 1 + docs/source/reference.rst | 46 +++++- tests/channel.py | 115 +++++++++++++- tests/link.py | 51 ++++++ 8 files changed, 837 insertions(+), 15 deletions(-) create mode 100644 Examples/Buffer.py create mode 100644 RNS/Buffer.py diff --git a/Examples/Buffer.py b/Examples/Buffer.py new file mode 100644 index 0000000..be9d23a --- /dev/null +++ b/Examples/Buffer.py @@ -0,0 +1,323 @@ +########################################################## +# This RNS example demonstrates how to set up a link to # +# a destination, and pass binary data over it using a # +# using a channel buffer. # +########################################################## +from __future__ import annotations +import os +import sys +import time +import argparse +from datetime import datetime + +import RNS +from RNS.vendor import umsgpack + +# Let's define an app name. We'll use this for all +# destinations we create. Since this echo example +# is part of a range of example utilities, we'll put +# them all within the app namespace "example_utilities" +APP_NAME = "example_utilities" + + +########################################################## +#### Server Part ######################################### +########################################################## + +# A reference to the latest client link that connected +latest_client_link = None + +# A reference to the latest buffer object +latest_buffer = None + +# This initialisation is executed when the users chooses +# to run as a server +def server(configpath): + # We must first initialise Reticulum + reticulum = RNS.Reticulum(configpath) + + # Randomly create a new identity for our link example + server_identity = RNS.Identity() + + # We create a destination that clients can connect to. We + # want clients to create links to this destination, so we + # need to create a "single" destination type. + server_destination = RNS.Destination( + server_identity, + RNS.Destination.IN, + RNS.Destination.SINGLE, + APP_NAME, + "bufferexample" + ) + + # We configure a function that will get called every time + # a new client creates a link to this destination. + server_destination.set_link_established_callback(client_connected) + + # Everything's ready! + # Let's Wait for client requests or user input + server_loop(server_destination) + +def server_loop(destination): + # Let the user know that everything is ready + RNS.log( + "Link example "+ + RNS.prettyhexrep(destination.hash)+ + " running, waiting for a connection." + ) + + RNS.log("Hit enter to manually send an announce (Ctrl-C to quit)") + + # We enter a loop that runs until the users exits. + # If the user hits enter, we will announce our server + # destination on the network, which will let clients + # know how to create messages directed towards it. + while True: + entered = input() + destination.announce() + RNS.log("Sent announce from "+RNS.prettyhexrep(destination.hash)) + +# When a client establishes a link to our server +# destination, this function will be called with +# a reference to the link. +def client_connected(link): + global latest_client_link, latest_buffer + latest_client_link = link + + RNS.log("Client connected") + link.set_link_closed_callback(client_disconnected) + + # If a new connection is received, the old reader + # needs to be disconnected. + if latest_buffer: + latest_buffer.close() + + + # Create buffer objects. + # The stream_id parameter to these functions is + # a bit like a file descriptor, except that it + # is unique to the *receiver*. + # + # In this example, both the reader and the writer + # use stream_id = 0, but there are actually two + # separate unidirectional streams flowing in + # opposite directions. + # + channel = link.get_channel() + latest_buffer = RNS.Buffer.create_bidirectional_buffer(0, 0, channel, server_buffer_ready) + +def client_disconnected(link): + RNS.log("Client disconnected") + +def server_buffer_ready(ready_bytes: int): + """ + Callback from buffer when buffer has data available + + :param ready_bytes: The number of bytes ready to read + """ + global latest_buffer + + data = latest_buffer.read(ready_bytes) + data = data.decode("utf-8") + + RNS.log("Received data on the link: " + data) + + reply_message = "I received \""+data+"\" over the buffer" + reply_message = reply_message.encode("utf-8") + latest_buffer.write(reply_message) + latest_buffer.flush() + + + + +########################################################## +#### Client Part ######################################### +########################################################## + +# A reference to the server link +server_link = None + +# A reference to the buffer object, needed to share the +# object from the link connected callback to the client +# loop. +buffer = None + +# This initialisation is executed when the users chooses +# to run as a client +def client(destination_hexhash, configpath): + # We need a binary representation of the destination + # hash that was entered on the command line + try: + dest_len = (RNS.Reticulum.TRUNCATED_HASHLENGTH//8)*2 + if len(destination_hexhash) != dest_len: + raise ValueError( + "Destination length is invalid, must be {hex} hexadecimal characters ({byte} bytes).".format(hex=dest_len, byte=dest_len//2) + ) + + destination_hash = bytes.fromhex(destination_hexhash) + except: + RNS.log("Invalid destination entered. Check your input!\n") + exit() + + # We must first initialise Reticulum + reticulum = RNS.Reticulum(configpath) + + # Check if we know a path to the destination + if not RNS.Transport.has_path(destination_hash): + RNS.log("Destination is not yet known. Requesting path and waiting for announce to arrive...") + RNS.Transport.request_path(destination_hash) + while not RNS.Transport.has_path(destination_hash): + time.sleep(0.1) + + # Recall the server identity + server_identity = RNS.Identity.recall(destination_hash) + + # Inform the user that we'll begin connecting + RNS.log("Establishing link with server...") + + # When the server identity is known, we set + # up a destination + server_destination = RNS.Destination( + server_identity, + RNS.Destination.OUT, + RNS.Destination.SINGLE, + APP_NAME, + "bufferexample" + ) + + # And create a link + link = RNS.Link(server_destination) + + # We'll also set up functions to inform the + # user when the link is established or closed + link.set_link_established_callback(link_established) + link.set_link_closed_callback(link_closed) + + # Everything is set up, so let's enter a loop + # for the user to interact with the example + client_loop() + +def client_loop(): + global server_link + + # Wait for the link to become active + while not server_link: + time.sleep(0.1) + + should_quit = False + while not should_quit: + try: + print("> ", end=" ") + text = input() + + # Check if we should quit the example + if text == "quit" or text == "q" or text == "exit": + should_quit = True + server_link.teardown() + else: + # Otherwise, encode the text and write it to the buffer. + text = text.encode("utf-8") + buffer.write(text) + # Flush the buffer to force the data to be sent. + buffer.flush() + + + except Exception as e: + RNS.log("Error while sending data over the link: "+str(e)) + should_quit = True + server_link.teardown() + +# This function is called when a link +# has been established with the server +def link_established(link): + # We store a reference to the link + # instance for later use + global server_link, buffer + server_link = link + + # Create buffer, see server_client_connected() for + # more detail about setting up the buffer. + channel = link.get_channel() + buffer = RNS.Buffer.create_bidirectional_buffer(0, 0, channel, client_buffer_ready) + + # Inform the user that the server is + # connected + RNS.log("Link established with server, enter some text to send, or \"quit\" to quit") + +# When a link is closed, we'll inform the +# user, and exit the program +def link_closed(link): + if link.teardown_reason == RNS.Link.TIMEOUT: + RNS.log("The link timed out, exiting now") + elif link.teardown_reason == RNS.Link.DESTINATION_CLOSED: + RNS.log("The link was closed by the server, exiting now") + else: + RNS.log("Link closed, exiting now") + + RNS.Reticulum.exit_handler() + time.sleep(1.5) + os._exit(0) + +# When the buffer has new data, read it and write it to the terminal. +def client_buffer_ready(ready_bytes: int): + global buffer + data = buffer.read(ready_bytes) + RNS.log("Received data on the link: " + data.decode("utf-8")) + print("> ", end=" ") + sys.stdout.flush() + + +########################################################## +#### Program Startup ##################################### +########################################################## + +# This part of the program runs at startup, +# and parses input of from the user, and then +# starts up the desired program mode. +if __name__ == "__main__": + try: + parser = argparse.ArgumentParser(description="Simple buffer example") + + parser.add_argument( + "-s", + "--server", + action="store_true", + help="wait for incoming link requests from clients" + ) + + parser.add_argument( + "--config", + action="store", + default=None, + help="path to alternative Reticulum config directory", + type=str + ) + + parser.add_argument( + "destination", + nargs="?", + default=None, + help="hexadecimal hash of the server destination", + type=str + ) + + args = parser.parse_args() + + if args.config: + configarg = args.config + else: + configarg = None + + if args.server: + server(configarg) + else: + if (args.destination == None): + print("") + parser.print_help() + print("") + else: + client(args.destination, configarg) + + except KeyboardInterrupt: + print("") + exit() \ No newline at end of file diff --git a/Examples/Channel.py b/Examples/Channel.py index 53b878c..c4ce315 100644 --- a/Examples/Channel.py +++ b/Examples/Channel.py @@ -243,11 +243,6 @@ def client(destination_hexhash, configpath): # And create a link link = RNS.Link(server_destination) - # We set a callback that will get executed - # every time a packet is received over the - # link - link.set_packet_callback(client_message_received) - # We'll also set up functions to inform the # user when the link is established or closed link.set_link_established_callback(link_established) @@ -330,7 +325,7 @@ def link_closed(link): time.sleep(1.5) os._exit(0) -# When a packet is received over the link, we +# When a packet is received over the channel, we # simply print out the data. def client_message_received(message): if isinstance(message, StringMessage): @@ -348,7 +343,7 @@ def client_message_received(message): # starts up the desired program mode. if __name__ == "__main__": try: - parser = argparse.ArgumentParser(description="Simple link example") + parser = argparse.ArgumentParser(description="Simple channel example") parser.add_argument( "-s", diff --git a/RNS/Buffer.py b/RNS/Buffer.py new file mode 100644 index 0000000..75989c5 --- /dev/null +++ b/RNS/Buffer.py @@ -0,0 +1,305 @@ +from __future__ import annotations +import sys +from threading import RLock +from RNS.vendor import umsgpack +from RNS.Channel import Channel, MessageBase, SystemMessageTypes +import RNS +from io import RawIOBase, BufferedRWPair, BufferedReader, BufferedWriter +from typing import Callable +from contextlib import AbstractContextManager + + +class StreamDataMessage(MessageBase): + MSGTYPE = SystemMessageTypes.SMT_STREAM_DATA + """ + Message type for ``Channel``. ``StreamDataMessage`` + uses a system-reserved message type. + """ + + STREAM_ID_MAX = 65535 + """ + While not essential for the current message packing + method (umsgpack), the stream id is clamped to the + size of a UInt16 for future struct packing. + """ + + OVERHEAD = 0 + """ + The number of bytes used by this messa + + When the Buffer package is imported, this value is + calculated based on the value of RNS.Link.MDU. + """ + + MAX_DATA_LEN = 0 + """ + When the Buffer package is imported, this value is + calculcated based on the value of OVERHEAD + """ + + def __init__(self, stream_id: int = None, data: bytes = None, eof: bool = False): + """ + This class is used to encapsulate binary stream + data to be sent over a ``Channel``. + + :param stream_id: id of stream relative to receiver + :param data: binary data + :param eof: set to True if signalling End of File + """ + super().__init__() + if stream_id is not None and stream_id > self.STREAM_ID_MAX: + raise ValueError("stream_id must be 0-65535") + self.stream_id = stream_id + self.data = data or bytes() + self.eof = eof + + def pack(self) -> bytes: + if self.stream_id is None: + raise ValueError("stream_id") + return umsgpack.packb((self.stream_id, self.eof, bytes(self.data))) + + def unpack(self, raw): + self.stream_id, self.eof, self.data = umsgpack.unpackb(raw) + + +_link_sized_bytes = ("\0"*RNS.Link.MDU).encode("utf-8") +StreamDataMessage.OVERHEAD = len(StreamDataMessage(stream_id=StreamDataMessage.STREAM_ID_MAX, + data=_link_sized_bytes, + eof=True).pack()) - len(_link_sized_bytes) - 6 # 6 is envelope overhead +StreamDataMessage.MAX_DATA_LEN = RNS.Link.MDU - StreamDataMessage.OVERHEAD +_link_sized_bytes = None + + +class RawChannelReader(RawIOBase, AbstractContextManager): + """ + An implementation of RawIOBase that receives + binary stream data sent over a ``Channel``. + + This class generally need not be instantiated directly. + Use :func:`RNS.Buffer.create_reader`, + :func:`RNS.Buffer.create_writer`, and + :func:`RNS.Buffer.create_bidirectional_buffer` functions + to create buffered streams with optional callbacks. + + For additional information on the API of this + object, see the Python documentation for + ``RawIOBase``. + """ + def __init__(self, stream_id: int, channel: Channel): + """ + Create a raw channel reader. + + :param stream_id: local stream id to receive at + :param channel: ``Channel`` object to receive from + """ + self._stream_id = stream_id + self._channel = channel + self._lock = RLock() + self._buffer = bytearray() + self._eof = False + self._channel._register_message_type(StreamDataMessage, is_system_type=True) + self._channel.add_message_handler(self._handle_message) + self._listeners: [Callable[[int], None]] = [] + + def add_ready_callback(self, cb: Callable[[int], None]): + """ + Add a function to be called when new data is available. + The function should have the signature ``(ready_bytes: int) -> None`` + + :param cb: function to call + """ + with self._lock: + self._listeners.append(cb) + + def remove_ready_callback(self, cb: Callable[[int], None]): + """ + Remove a function added with :func:`RNS.RawChannelReader.add_ready_callback()` + + :param cb: function to remove + """ + with self._lock: + self._listeners.remove(cb) + + def _handle_message(self, message: MessageBase): + if isinstance(message, StreamDataMessage): + if message.stream_id == self._stream_id: + with self._lock: + if message.data is not None: + self._buffer.extend(message.data) + if message.eof: + self._eof = True + for listener in self._listeners: + try: + listener(len(self._buffer)) + except Exception as ex: + RNS.log("Error calling RawChannelReader(" + str(self._stream_id) + ") callback: " + str(ex)) + return True + return False + + def _read(self, __size: int) -> bytes | None: + with self._lock: + result = self._buffer[:__size] + self._buffer = self._buffer[__size:] + return result if len(result) > 0 or self._eof else None + + def readinto(self, __buffer: bytearray) -> int | None: + ready = self._read(len(__buffer)) + if ready: + __buffer[:len(ready)] = ready + return len(ready) if ready else None + + def writable(self) -> bool: + return False + + def seekable(self) -> bool: + return False + + def readable(self) -> bool: + return True + + def close(self): + with self._lock: + self._channel.remove_message_handler(self._handle_message) + self._listeners.clear() + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + return False + + +class RawChannelWriter(RawIOBase, AbstractContextManager): + """ + An implementation of RawIOBase that receives + binary stream data sent over a channel. + + This class generally need not be instantiated directly. + Use :func:`RNS.Buffer.create_reader`, + :func:`RNS.Buffer.create_writer`, and + :func:`RNS.Buffer.create_bidirectional_buffer` functions + to create buffered streams with optional callbacks. + + For additional information on the API of this + object, see the Python documentation for + ``RawIOBase``. + """ + def __init__(self, stream_id: int, channel: Channel): + """ + Create a raw channel writer. + + :param stream_id: remote stream id to sent do + :param channel: ``Channel`` object to send on + """ + self._stream_id = stream_id + self._channel = channel + self._eof = False + + def write(self, __b: bytes) -> int | None: + try: + if self._channel.is_ready_to_send(): + chunk = __b[:StreamDataMessage.MAX_DATA_LEN] + message = StreamDataMessage(self._stream_id, chunk, self._eof) + self._channel.send(message) + return len(chunk) + except RNS.Channel.ChannelException as cex: + if cex.type != RNS.Channel.CEType.ME_LINK_NOT_READY: + raise + return 0 + + def close(self): + self._eof = True + self.write(bytes()) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + return False + + def seekable(self) -> bool: + return False + + def readable(self) -> bool: + return False + + def writable(self) -> bool: + return True + +class Buffer: + """ + Static functions for creating buffered streams that send + and receive over a ``Channel``. + + These functions use ``BufferedReader``, ``BufferedWriter``, + and ``BufferedRWPair`` to add buffering to + ``RawChannelReader`` and ``RawChannelWriter``. + """ + @staticmethod + def create_reader(stream_id: int, channel: Channel, + ready_callback: Callable[[int], None] | None = None) -> BufferedReader: + """ + Create a buffered reader that reads binary data sent + over a ``Channel``, with an optional callback when + new data is available. + + Callback signature: ``(ready_bytes: int) -> None`` + + For more information on the reader-specific functions + of this object, see the Python documentation for + ``BufferedReader`` + + :param stream_id: the local stream id to receive from + :param channel: the channel to receive on + :param ready_callback: function to call when new data is available + :return: a BufferedReader object + """ + reader = RawChannelReader(stream_id, channel) + if ready_callback: + reader.add_ready_callback(ready_callback) + return BufferedReader(reader) + + @staticmethod + def create_writer(stream_id: int, channel: Channel) -> BufferedWriter: + """ + Create a buffered writer that writes binary data over + a ``Channel``. + + For more information on the writer-specific functions + of this object, see the Python documentation for + ``BufferedWriter`` + + :param stream_id: the remote stream id to send to + :param channel: the channel to send on + :return: a BufferedWriter object + """ + writer = RawChannelWriter(stream_id, channel) + return BufferedWriter(writer) + + @staticmethod + def create_bidirectional_buffer(receive_stream_id: int, send_stream_id: int, channel: Channel, + ready_callback: Callable[[int], None] | None = None) -> BufferedRWPair: + """ + Create a buffered reader/writer pair that reads and + writes binary data over a ``Channel``, with an + optional callback when new data is available. + + Callback signature: ``(ready_bytes: int) -> None`` + + For more information on the reader-specific functions + of this object, see the Python documentation for + ``BufferedRWPair`` + + :param receive_stream_id: the local stream id to receive at + :param send_stream_id: the remote stream id to send to + :param channel: the channel to send and receive on + :param ready_callback: function to call when new data is available + :return: a BufferedRWPair object + """ + reader = RawChannelReader(receive_stream_id, channel) + if ready_callback: + reader.add_ready_callback(ready_callback) + writer = RawChannelWriter(send_stream_id, channel) + return BufferedRWPair(reader, writer) diff --git a/RNS/Channel.py b/RNS/Channel.py index 839bf27..2192646 100644 --- a/RNS/Channel.py +++ b/RNS/Channel.py @@ -34,6 +34,8 @@ import RNS from abc import ABC, abstractmethod TPacket = TypeVar("TPacket") +class SystemMessageTypes(enum.IntEnum): + SMT_STREAM_DATA = 0xff00 class ChannelOutletBase(ABC, Generic[TPacket]): """ diff --git a/RNS/__init__.py b/RNS/__init__.py index 0ec2140..a3c8acf 100755 --- a/RNS/__init__.py +++ b/RNS/__init__.py @@ -33,6 +33,7 @@ from .Reticulum import Reticulum from .Identity import Identity from .Link import Link, RequestReceipt from .Channel import MessageBase +from .Buffer import Buffer, RawChannelReader, RawChannelWriter from .Transport import Transport from .Destination import Destination from .Packet import Packet diff --git a/docs/source/reference.rst b/docs/source/reference.rst index 8d519a8..d789201 100644 --- a/docs/source/reference.rst +++ b/docs/source/reference.rst @@ -130,7 +130,7 @@ This chapter lists and explains all classes exposed by the Reticulum Network Sta .. only:: latex Channel - ------ + ------- .. autoclass:: RNS.Channel.Channel() :members: @@ -144,11 +144,53 @@ This chapter lists and explains all classes exposed by the Reticulum Network Sta .. only:: latex MessageBase - ------ + ----------- .. autoclass:: RNS.MessageBase() :members: +.. _api-buffer: + +.. only:: html + + |start-h3| Buffer |end-h3| + +.. only:: latex + + Buffer + ------ + +.. autoclass:: RNS.Buffer + :members: + +.. _api-rawchannelreader: + +.. only:: html + + |start-h3| RawChannelReader |end-h3| + +.. only:: latex + + RawChannelReader + ---------------- + +.. autoclass:: RNS.RawChannelReader + :members: __init__, add_ready_callback, remove_ready_callback + +.. _api-rawchannelwriter: + +.. only:: html + + |start-h3| RawChannelWriter |end-h3| + +.. only:: latex + + RawChannelWriter + ---------------- + +.. autoclass:: RNS.RawChannelWriter + :members: __init__ + .. _api-transport: .. only:: html diff --git a/tests/channel.py b/tests/channel.py index 3a00cbe..a5c287d 100644 --- a/tests/channel.py +++ b/tests/channel.py @@ -2,6 +2,7 @@ from __future__ import annotations import threading import RNS from RNS.Channel import MessageState, ChannelOutletBase, Channel, MessageBase +import RNS.Buffer from RNS.vendor import umsgpack from typing import Callable import contextlib @@ -91,17 +92,20 @@ class ChannelOutletTest(ChannelOutletBase): self._rtt = rtt self._usable = True self.packets = [] + self.lock = threading.RLock() self.packet_callback: Callable[[ChannelOutletBase, bytes], None] | None = None def send(self, raw: bytes) -> Packet: - packet = Packet(raw) - packet.send() - self.packets.append(packet) - return packet + with self.lock: + packet = Packet(raw) + packet.send() + self.packets.append(packet) + return packet def resend(self, packet: Packet) -> Packet: - packet.send() - return packet + with self.lock: + packet.send() + return packet @property def mdu(self): @@ -370,6 +374,105 @@ class TestChannel(unittest.TestCase): self.eat_own_dog_food(message, check) + def test_buffer_small_bidirectional(self): + data = "Hello\n" + with RNS.Buffer.create_bidirectional_buffer(0, 0, self.h.channel) as buffer: + count = buffer.write(data.encode("utf-8")) + buffer.flush() + + self.assertEqual(len(data), count) + self.assertEqual(1, len(self.h.outlet.packets)) + + packet = self.h.outlet.packets[0] + self.h.channel._receive(packet.raw) + result = buffer.readline() + + self.assertIsNotNone(result) + self.assertEqual(len(result), len(data)) + + decoded = result.decode("utf-8") + + self.assertEqual(data, decoded) + + def test_buffer_big(self): + writer = RNS.Buffer.create_writer(15, self.h.channel) + reader = RNS.Buffer.create_reader(15, self.h.channel) + data = "01234556789"*1024 # 10 KB + count = 0 + write_finished = False + + def write_thread(): + nonlocal count, write_finished + count = writer.write(data.encode("utf-8")) + writer.flush() + writer.close() + write_finished = True + threading.Thread(target=write_thread, name="Write Thread", daemon=True).start() + + while not write_finished or next(filter(lambda x: x.state != MessageState.MSGSTATE_DELIVERED, + self.h.outlet.packets), None) is not None: + with self.h.outlet.lock: + for packet in self.h.outlet.packets: + if packet.state != MessageState.MSGSTATE_DELIVERED: + self.h.channel._receive(packet.raw) + packet.delivered() + time.sleep(0.0001) + + self.assertEqual(len(data), count) + + read_finished = False + result = bytes() + + def read_thread(): + nonlocal read_finished, result + result = reader.read() + read_finished = True + threading.Thread(target=read_thread, name="Read Thread", daemon=True).start() + + timeout_at = time.time() + 7 + while not read_finished and time.time() < timeout_at: + time.sleep(0.001) + + self.assertTrue(read_finished) + self.assertEqual(len(data), len(result)) + + decoded = result.decode("utf-8") + + self.assertSequenceEqual(data, decoded) + + def test_buffer_small_with_callback(self): + callbacks = 0 + last_cb_value = None + + def callback(ready: int): + nonlocal callbacks, last_cb_value + callbacks += 1 + last_cb_value = ready + + data = "Hello\n" + with RNS.RawChannelWriter(0, self.h.channel) as writer, RNS.RawChannelReader(0, self.h.channel) as reader: + reader.add_ready_callback(callback) + count = writer.write(data.encode("utf-8")) + writer.flush() + + self.assertEqual(len(data), count) + self.assertEqual(1, len(self.h.outlet.packets)) + + packet = self.h.outlet.packets[0] + self.h.channel._receive(packet.raw) + + self.assertEqual(1, callbacks) + self.assertEqual(len(data), last_cb_value) + + result = reader.readline() + + self.assertIsNotNone(result) + self.assertEqual(len(result), len(data)) + + decoded = result.decode("utf-8") + + self.assertEqual(data, decoded) + if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/tests/link.py b/tests/link.py index 203e982..818ac99 100644 --- a/tests/link.py +++ b/tests/link.py @@ -396,6 +396,48 @@ class TestLink(unittest.TestCase): self.assertEqual(l1.status, RNS.Link.CLOSED) self.assertEqual(0, len(l1._channel._rx_ring)) + def test_11_buffer_round_trip(self): + global c_rns + init_rns(self) + print("") + print("Buffer round trip test") + + # TODO: Load this from public bytes only + id1 = RNS.Identity.from_bytes(bytes.fromhex(fixed_keys[0][0])) + self.assertEqual(id1.hash, bytes.fromhex(fixed_keys[0][1])) + + dest = RNS.Destination(id1, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "link", "establish") + + self.assertEqual(dest.hash, bytes.fromhex("fb48da0e82e6e01ba0c014513f74540d")) + + l1 = RNS.Link(dest) + time.sleep(1) + self.assertEqual(l1.status, RNS.Link.ACTIVE) + buffer = None + + received = [] + def handle_data(ready_bytes: int): + data = buffer.read(ready_bytes) + received.append(data) + + channel = l1.get_channel() + buffer = RNS.Buffer.create_bidirectional_buffer(0, 0, channel, handle_data) + + buffer.write("Hi there".encode("utf-8")) + buffer.flush() + + time.sleep(0.5) + + self.assertEqual(1 , len(received)) + + rx_message = received[0].decode("utf-8") + + self.assertEqual("Hi there back at you", rx_message) + + l1.teardown() + time.sleep(0.5) + self.assertEqual(l1.status, RNS.Link.CLOSED) + def size_str(self, num, suffix='B'): units = ['','K','M','G','T','P','E','Z'] @@ -462,6 +504,15 @@ def targets(yp=False): channel.register_message_type(MessageTest) channel.add_message_handler(handle_message) + buffer = None + + def handle_buffer(ready_bytes: int): + data = buffer.read(ready_bytes) + buffer.write((data.decode("utf-8") + " back at you").encode("utf-8")) + buffer.flush() + + buffer = RNS.Buffer.create_bidirectional_buffer(0, 0, channel, handle_buffer) + m_rns = RNS.Reticulum("./tests/rnsconfig") id1 = RNS.Identity.from_bytes(bytes.fromhex(fixed_keys[0][0])) d1 = RNS.Destination(id1, RNS.Destination.IN, RNS.Destination.SINGLE, APP_NAME, "link", "establish")