# MIT License # # Copyright (c) 2016-2023 Mark Qvist / unsigned.io and contributors. # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in all # copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. from __future__ import annotations import collections import enum import threading import time from types import TracebackType from typing import Type, Callable, TypeVar, Generic, NewType import abc import contextlib import struct import RNS from abc import ABC, abstractmethod TPacket = TypeVar("TPacket") class SystemMessageTypes(enum.IntEnum): SMT_STREAM_DATA = 0xff00 class ChannelOutletBase(ABC, Generic[TPacket]): """ An abstract transport layer interface used by Channel. DEPRECATED: This was created for testing; eventually Channel will use Link or a LinkBase interface directly. """ @abstractmethod def send(self, raw: bytes) -> TPacket: raise NotImplemented() @abstractmethod def resend(self, packet: TPacket) -> TPacket: raise NotImplemented() @property @abstractmethod def mdu(self): raise NotImplemented() @property @abstractmethod def rtt(self): raise NotImplemented() @property @abstractmethod def is_usable(self): raise NotImplemented() @abstractmethod def get_packet_state(self, packet: TPacket) -> MessageState: raise NotImplemented() @abstractmethod def timed_out(self): raise NotImplemented() @abstractmethod def __str__(self): raise NotImplemented() @abstractmethod def set_packet_timeout_callback(self, packet: TPacket, callback: Callable[[TPacket], None] | None, timeout: float | None = None): raise NotImplemented() @abstractmethod def set_packet_delivered_callback(self, packet: TPacket, callback: Callable[[TPacket], None] | None): raise NotImplemented() @abstractmethod def get_packet_id(self, packet: TPacket) -> any: raise NotImplemented() class CEType(enum.IntEnum): """ ChannelException type codes """ ME_NO_MSG_TYPE = 0 ME_INVALID_MSG_TYPE = 1 ME_NOT_REGISTERED = 2 ME_LINK_NOT_READY = 3 ME_ALREADY_SENT = 4 ME_TOO_BIG = 5 class ChannelException(Exception): """ An exception thrown by Channel, with a type code. """ def __init__(self, ce_type: CEType, *args): super().__init__(args) self.type = ce_type class MessageState(enum.IntEnum): """ Set of possible states for a Message """ MSGSTATE_NEW = 0 MSGSTATE_SENT = 1 MSGSTATE_DELIVERED = 2 MSGSTATE_FAILED = 3 class MessageBase(abc.ABC): """ Base type for any messages sent or received on a Channel. Subclasses must define the two abstract methods as well as the ``MSGTYPE`` class variable. """ # MSGTYPE must be unique within all classes sent over a # channel. Additionally, MSGTYPE > 0xf000 are reserved. MSGTYPE = None """ Defines a unique identifier for a message class. * Must be unique within all classes registered with a ``Channel`` * Must be less than ``0xf000``. Values greater than or equal to ``0xf000`` are reserved. """ @abstractmethod def pack(self) -> bytes: """ Create and return the binary representation of the message :return: binary representation of message """ raise NotImplemented() @abstractmethod def unpack(self, raw: bytes): """ Populate message from binary representation :param raw: binary representation """ raise NotImplemented() MessageCallbackType = NewType("MessageCallbackType", Callable[[MessageBase], bool]) class Envelope: """ Internal wrapper used to transport messages over a channel and track its state within the channel framework. """ def unpack(self, message_factories: dict[int, Type]) -> MessageBase: msgtype, self.sequence, length = struct.unpack(">HHH", self.raw[:6]) raw = self.raw[6:] ctor = message_factories.get(msgtype, None) if ctor is None: raise ChannelException(CEType.ME_NOT_REGISTERED, f"Unable to find constructor for Channel MSGTYPE {hex(msgtype)}") message = ctor() message.unpack(raw) return message def pack(self) -> bytes: if self.message.__class__.MSGTYPE is None: raise ChannelException(CEType.ME_NO_MSG_TYPE, f"{self.message.__class__} lacks MSGTYPE") data = self.message.pack() self.raw = struct.pack(">HHH", self.message.MSGTYPE, self.sequence, len(data)) + data return self.raw def __init__(self, outlet: ChannelOutletBase, message: MessageBase = None, raw: bytes = None, sequence: int = None): self.ts = time.time() self.id = id(self) self.message = message self.raw = raw self.packet: TPacket = None self.sequence = sequence self.outlet = outlet self.tries = 0 self.tracked = False class Channel(contextlib.AbstractContextManager): """ Provides reliable delivery of messages over a link. ``Channel`` differs from ``Request`` and ``Resource`` in some important ways: **Continuous** Messages can be sent or received as long as the ``Link`` is open. **Bi-directional** Messages can be sent in either direction on the ``Link``; neither end is the client or server. **Size-constrained** Messages must be encoded into a single packet. ``Channel`` is similar to ``Packet``, except that it provides reliable delivery (automatic retries) as well as a structure for exchanging several types of messages over the ``Link``. ``Channel`` is not instantiated directly, but rather obtained from a ``Link`` with ``get_channel()``. """ def __init__(self, outlet: ChannelOutletBase): """ @param outlet: """ self._outlet = outlet self._lock = threading.RLock() self._tx_ring: collections.deque[Envelope] = collections.deque() self._rx_ring: collections.deque[Envelope] = collections.deque() self._message_callbacks: [MessageCallbackType] = [] self._next_sequence = 0 self._message_factories: dict[int, Type[MessageBase]] = {} self._max_tries = 5 def __enter__(self) -> Channel: return self def __exit__(self, __exc_type: Type[BaseException] | None, __exc_value: BaseException | None, __traceback: TracebackType | None) -> bool | None: self._shutdown() return False def register_message_type(self, message_class: Type[MessageBase]): """ Register a message class for reception over a ``Channel``. Message classes must extend ``MessageBase``. :param message_class: Class to register """ self._register_message_type(message_class, is_system_type=False) def _register_message_type(self, message_class: Type[MessageBase], *, is_system_type: bool = False): with self._lock: if not issubclass(message_class, MessageBase): raise ChannelException(CEType.ME_INVALID_MSG_TYPE, f"{message_class} is not a subclass of {MessageBase}.") if message_class.MSGTYPE is None: raise ChannelException(CEType.ME_INVALID_MSG_TYPE, f"{message_class} has invalid MSGTYPE class attribute.") if message_class.MSGTYPE >= 0xf000 and not is_system_type: raise ChannelException(CEType.ME_INVALID_MSG_TYPE, f"{message_class} has system-reserved message type.") try: message_class() except Exception as ex: raise ChannelException(CEType.ME_INVALID_MSG_TYPE, f"{message_class} raised an exception when constructed with no arguments: {ex}") self._message_factories[message_class.MSGTYPE] = message_class def add_message_handler(self, callback: MessageCallbackType): """ Add a handler for incoming messages. A handler has the following signature: ``(message: MessageBase) -> bool`` Handlers are processed in the order they are added. If any handler returns True, processing of the message stops; handlers after the returning handler will not be called. :param callback: Function to call """ with self._lock: if callback not in self._message_callbacks: self._message_callbacks.append(callback) def remove_message_handler(self, callback: MessageCallbackType): """ Remove a handler added with ``add_message_handler``. :param callback: handler to remove """ with self._lock: if callback in self._message_callbacks: self._message_callbacks.remove(callback) def _shutdown(self): with self._lock: self._message_callbacks.clear() self._clear_rings() def _clear_rings(self): with self._lock: for envelope in self._tx_ring: if envelope.packet is not None: self._outlet.set_packet_timeout_callback(envelope.packet, None) self._outlet.set_packet_delivered_callback(envelope.packet, None) self._tx_ring.clear() self._rx_ring.clear() def _emplace_envelope(self, envelope: Envelope, ring: collections.deque[Envelope]) -> bool: with self._lock: i = 0 for existing in ring: if existing.sequence > envelope.sequence \ and not existing.sequence // 2 > envelope.sequence: # account for overflow ring.insert(i, envelope) return True if existing.sequence == envelope.sequence: RNS.log(f"Envelope: Emplacement of duplicate envelope sequence.", RNS.LOG_EXTREME) return False i += 1 envelope.tracked = True ring.append(envelope) return True def _prune_rx_ring(self): with self._lock: # Implementation for fixed window = 1 stale = list(sorted(self._rx_ring, key=lambda env: env.sequence, reverse=True))[1:] for env in stale: env.tracked = False self._rx_ring.remove(env) def _run_callbacks(self, message: MessageBase): with self._lock: cbs = self._message_callbacks.copy() for cb in cbs: try: if cb(message): return except Exception as ex: RNS.log(f"Channel: Error running message callback: {ex}", RNS.LOG_ERROR) def _receive(self, raw: bytes): try: envelope = Envelope(outlet=self._outlet, raw=raw) with self._lock: message = envelope.unpack(self._message_factories) prev_env = self._rx_ring[0] if len(self._rx_ring) > 0 else None if prev_env and envelope.sequence != prev_env.sequence + 1: RNS.log("Channel: Out of order packet received", RNS.LOG_DEBUG) return is_new = self._emplace_envelope(envelope, self._rx_ring) self._prune_rx_ring() if not is_new: RNS.log("Channel: Duplicate message received", RNS.LOG_DEBUG) return RNS.log(f"Message received: {message}", RNS.LOG_DEBUG) threading.Thread(target=self._run_callbacks, name="Message Callback", args=[message], daemon=True).start() except Exception as ex: RNS.log(f"Channel: Error receiving data: {ex}") def is_ready_to_send(self) -> bool: """ Check if ``Channel`` is ready to send. :return: True if ready """ if not self._outlet.is_usable: RNS.log("Channel: Link is not usable.", RNS.LOG_EXTREME) return False with self._lock: for envelope in self._tx_ring: if envelope.outlet == self._outlet and (not envelope.packet or self._outlet.get_packet_state(envelope.packet) == MessageState.MSGSTATE_SENT): # TODO: Check if this should be enabled with some kind of # rate limiting, since it currently floods log output when # messages are waiting. # RNS.log("Channel: Link has a pending message.", RNS.LOG_EXTREME) return False return True def _packet_tx_op(self, packet: TPacket, op: Callable[[TPacket], bool]): with self._lock: envelope = next(filter(lambda e: self._outlet.get_packet_id(e.packet) == self._outlet.get_packet_id(packet), self._tx_ring), None) if envelope and op(envelope): envelope.tracked = False if envelope in self._tx_ring: self._tx_ring.remove(envelope) else: RNS.log("Channel: Envelope not found in TX ring", RNS.LOG_DEBUG) if not envelope: RNS.log("Channel: Spurious message received.", RNS.LOG_EXTREME) def _packet_delivered(self, packet: TPacket): self._packet_tx_op(packet, lambda env: True) def _get_packet_timeout_time(self, tries: int) -> float: return pow(2, tries - 1) * max(self._outlet.rtt, 0.01) * 5 def _packet_timeout(self, packet: TPacket): def retry_envelope(envelope: Envelope) -> bool: if envelope.tries >= self._max_tries: RNS.log("Channel: Retry count exceeded, tearing down Link.", RNS.LOG_ERROR) self._shutdown() # start on separate thread? self._outlet.timed_out() return True envelope.tries += 1 self._outlet.resend(envelope.packet) self._outlet.set_packet_timeout_callback(envelope.packet, self._packet_timeout, self._get_packet_timeout_time(envelope.tries)) return False if self._outlet.get_packet_state(packet) != MessageState.MSGSTATE_DELIVERED: self._packet_tx_op(packet, retry_envelope) def send(self, message: MessageBase) -> Envelope: """ Send a message. If a message send is attempted and ``Channel`` is not ready, an exception is thrown. :param message: an instance of a ``MessageBase`` subclass """ envelope: Envelope | None = None with self._lock: if not self.is_ready_to_send(): raise ChannelException(CEType.ME_LINK_NOT_READY, f"Link is not ready") envelope = Envelope(self._outlet, message=message, sequence=self._next_sequence) self._next_sequence = (self._next_sequence + 1) % 0x10000 self._emplace_envelope(envelope, self._tx_ring) if envelope is None: raise BlockingIOError() envelope.pack() if len(envelope.raw) > self._outlet.mdu: raise ChannelException(CEType.ME_TOO_BIG, f"Packed message too big for packet: {len(envelope.raw)} > {self._outlet.mdu}") envelope.packet = self._outlet.send(envelope.raw) envelope.tries += 1 self._outlet.set_packet_delivered_callback(envelope.packet, self._packet_delivered) self._outlet.set_packet_timeout_callback(envelope.packet, self._packet_timeout, self._get_packet_timeout_time(envelope.tries)) return envelope @property def MDU(self): """ Maximum Data Unit: the number of bytes available for a message to consume in a single send. This value is adjusted from the ``Link`` MDU to accommodate message header information. :return: number of bytes available """ return self._outlet.mdu - 6 # sizeof(msgtype) + sizeof(length) + sizeof(sequence) class LinkChannelOutlet(ChannelOutletBase): """ An implementation of ChannelOutletBase for RNS.Link. Allows Channel to send packets over an RNS Link with Packets. :param link: RNS Link to wrap """ def __init__(self, link: RNS.Link): self.link = link def send(self, raw: bytes) -> RNS.Packet: packet = RNS.Packet(self.link, raw, context=RNS.Packet.CHANNEL) if self.link.status == RNS.Link.ACTIVE: packet.send() return packet def resend(self, packet: RNS.Packet) -> RNS.Packet: RNS.log("Resending packet " + RNS.prettyhexrep(packet.packet_hash), RNS.LOG_DEBUG) if not packet.resend(): RNS.log("Failed to resend packet", RNS.LOG_ERROR) return packet @property def mdu(self): return self.link.MDU @property def rtt(self): return self.link.rtt @property def is_usable(self): return True # had issues looking at Link.status def get_packet_state(self, packet: TPacket) -> MessageState: if packet.receipt == None: return MessageState.MSGSTATE_FAILED status = packet.receipt.get_status() if status == RNS.PacketReceipt.SENT: return MessageState.MSGSTATE_SENT if status == RNS.PacketReceipt.DELIVERED: return MessageState.MSGSTATE_DELIVERED if status == RNS.PacketReceipt.FAILED: return MessageState.MSGSTATE_FAILED else: raise Exception(f"Unexpected receipt state: {status}") def timed_out(self): self.link.teardown() def __str__(self): return f"{self.__class__.__name__}({self.link})" def set_packet_timeout_callback(self, packet: RNS.Packet, callback: Callable[[RNS.Packet], None] | None, timeout: float | None = None): if timeout and packet.receipt: packet.receipt.set_timeout(timeout) def inner(receipt: RNS.PacketReceipt): callback(packet) if packet and packet.receipt: packet.receipt.set_timeout_callback(inner if callback else None) def set_packet_delivered_callback(self, packet: RNS.Packet, callback: Callable[[RNS.Packet], None] | None): def inner(receipt: RNS.PacketReceipt): callback(packet) if packet and packet.receipt: packet.receipt.set_delivery_callback(inner if callback else None) def get_packet_id(self, packet: RNS.Packet) -> any: return packet.get_hash()