from __future__ import annotations import sys from threading import RLock from RNS.vendor import umsgpack from RNS.Channel import Channel, MessageBase, SystemMessageTypes import RNS from io import RawIOBase, BufferedRWPair, BufferedReader, BufferedWriter from typing import Callable from contextlib import AbstractContextManager class StreamDataMessage(MessageBase): MSGTYPE = SystemMessageTypes.SMT_STREAM_DATA """ Message type for ``Channel``. ``StreamDataMessage`` uses a system-reserved message type. """ STREAM_ID_MAX = 65535 """ While not essential for the current message packing method (umsgpack), the stream id is clamped to the size of a UInt16 for future struct packing. """ OVERHEAD = 0 """ The number of bytes used by this messa When the Buffer package is imported, this value is calculated based on the value of RNS.Link.MDU. """ MAX_DATA_LEN = 0 """ When the Buffer package is imported, this value is calculcated based on the value of OVERHEAD """ def __init__(self, stream_id: int = None, data: bytes = None, eof: bool = False): """ This class is used to encapsulate binary stream data to be sent over a ``Channel``. :param stream_id: id of stream relative to receiver :param data: binary data :param eof: set to True if signalling End of File """ super().__init__() if stream_id is not None and stream_id > self.STREAM_ID_MAX: raise ValueError("stream_id must be 0-65535") self.stream_id = stream_id self.data = data or bytes() self.eof = eof def pack(self) -> bytes: if self.stream_id is None: raise ValueError("stream_id") return umsgpack.packb((self.stream_id, self.eof, bytes(self.data))) def unpack(self, raw): self.stream_id, self.eof, self.data = umsgpack.unpackb(raw) _link_sized_bytes = ("\0"*RNS.Link.MDU).encode("utf-8") StreamDataMessage.OVERHEAD = len(StreamDataMessage(stream_id=StreamDataMessage.STREAM_ID_MAX, data=_link_sized_bytes, eof=True).pack()) - len(_link_sized_bytes) - 6 # 6 is envelope overhead StreamDataMessage.MAX_DATA_LEN = RNS.Link.MDU - StreamDataMessage.OVERHEAD _link_sized_bytes = None class RawChannelReader(RawIOBase, AbstractContextManager): """ An implementation of RawIOBase that receives binary stream data sent over a ``Channel``. This class generally need not be instantiated directly. Use :func:`RNS.Buffer.create_reader`, :func:`RNS.Buffer.create_writer`, and :func:`RNS.Buffer.create_bidirectional_buffer` functions to create buffered streams with optional callbacks. For additional information on the API of this object, see the Python documentation for ``RawIOBase``. """ def __init__(self, stream_id: int, channel: Channel): """ Create a raw channel reader. :param stream_id: local stream id to receive at :param channel: ``Channel`` object to receive from """ self._stream_id = stream_id self._channel = channel self._lock = RLock() self._buffer = bytearray() self._eof = False self._channel._register_message_type(StreamDataMessage, is_system_type=True) self._channel.add_message_handler(self._handle_message) self._listeners: [Callable[[int], None]] = [] def add_ready_callback(self, cb: Callable[[int], None]): """ Add a function to be called when new data is available. The function should have the signature ``(ready_bytes: int) -> None`` :param cb: function to call """ with self._lock: self._listeners.append(cb) def remove_ready_callback(self, cb: Callable[[int], None]): """ Remove a function added with :func:`RNS.RawChannelReader.add_ready_callback()` :param cb: function to remove """ with self._lock: self._listeners.remove(cb) def _handle_message(self, message: MessageBase): if isinstance(message, StreamDataMessage): if message.stream_id == self._stream_id: with self._lock: if message.data is not None: self._buffer.extend(message.data) if message.eof: self._eof = True for listener in self._listeners: try: listener(len(self._buffer)) except Exception as ex: RNS.log("Error calling RawChannelReader(" + str(self._stream_id) + ") callback: " + str(ex)) return True return False def _read(self, __size: int) -> bytes | None: with self._lock: result = self._buffer[:__size] self._buffer = self._buffer[__size:] return result if len(result) > 0 or self._eof else None def readinto(self, __buffer: bytearray) -> int | None: ready = self._read(len(__buffer)) if ready: __buffer[:len(ready)] = ready return len(ready) if ready else None def writable(self) -> bool: return False def seekable(self) -> bool: return False def readable(self) -> bool: return True def close(self): with self._lock: self._channel.remove_message_handler(self._handle_message) self._listeners.clear() def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() return False class RawChannelWriter(RawIOBase, AbstractContextManager): """ An implementation of RawIOBase that receives binary stream data sent over a channel. This class generally need not be instantiated directly. Use :func:`RNS.Buffer.create_reader`, :func:`RNS.Buffer.create_writer`, and :func:`RNS.Buffer.create_bidirectional_buffer` functions to create buffered streams with optional callbacks. For additional information on the API of this object, see the Python documentation for ``RawIOBase``. """ def __init__(self, stream_id: int, channel: Channel): """ Create a raw channel writer. :param stream_id: remote stream id to sent do :param channel: ``Channel`` object to send on """ self._stream_id = stream_id self._channel = channel self._eof = False def write(self, __b: bytes) -> int | None: try: if self._channel.is_ready_to_send(): chunk = __b[:StreamDataMessage.MAX_DATA_LEN] message = StreamDataMessage(self._stream_id, chunk, self._eof) self._channel.send(message) return len(chunk) except RNS.Channel.ChannelException as cex: if cex.type != RNS.Channel.CEType.ME_LINK_NOT_READY: raise return 0 def close(self): self._eof = True self.write(bytes()) def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() return False def seekable(self) -> bool: return False def readable(self) -> bool: return False def writable(self) -> bool: return True class Buffer: """ Static functions for creating buffered streams that send and receive over a ``Channel``. These functions use ``BufferedReader``, ``BufferedWriter``, and ``BufferedRWPair`` to add buffering to ``RawChannelReader`` and ``RawChannelWriter``. """ @staticmethod def create_reader(stream_id: int, channel: Channel, ready_callback: Callable[[int], None] | None = None) -> BufferedReader: """ Create a buffered reader that reads binary data sent over a ``Channel``, with an optional callback when new data is available. Callback signature: ``(ready_bytes: int) -> None`` For more information on the reader-specific functions of this object, see the Python documentation for ``BufferedReader`` :param stream_id: the local stream id to receive from :param channel: the channel to receive on :param ready_callback: function to call when new data is available :return: a BufferedReader object """ reader = RawChannelReader(stream_id, channel) if ready_callback: reader.add_ready_callback(ready_callback) return BufferedReader(reader) @staticmethod def create_writer(stream_id: int, channel: Channel) -> BufferedWriter: """ Create a buffered writer that writes binary data over a ``Channel``. For more information on the writer-specific functions of this object, see the Python documentation for ``BufferedWriter`` :param stream_id: the remote stream id to send to :param channel: the channel to send on :return: a BufferedWriter object """ writer = RawChannelWriter(stream_id, channel) return BufferedWriter(writer) @staticmethod def create_bidirectional_buffer(receive_stream_id: int, send_stream_id: int, channel: Channel, ready_callback: Callable[[int], None] | None = None) -> BufferedRWPair: """ Create a buffered reader/writer pair that reads and writes binary data over a ``Channel``, with an optional callback when new data is available. Callback signature: ``(ready_bytes: int) -> None`` For more information on the reader-specific functions of this object, see the Python documentation for ``BufferedRWPair`` :param receive_stream_id: the local stream id to receive at :param send_stream_id: the remote stream id to send to :param channel: the channel to send and receive on :param ready_callback: function to call when new data is available :return: a BufferedRWPair object """ reader = RawChannelReader(receive_stream_id, channel) if ready_callback: reader.add_ready_callback(ready_callback) writer = RawChannelWriter(send_stream_id, channel) return BufferedRWPair(reader, writer)