mirror of
https://github.com/markqvist/Reticulum.git
synced 2024-11-26 15:30:18 +00:00
Compare commits
No commits in common. "e36312a3cbdeabcf09e61fee97934b19e0b9a172" and "58004d7c0568da7ca52521c912675ce03b268dc8" have entirely different histories.
e36312a3cb
...
58004d7c05
@ -1,323 +0,0 @@
|
|||||||
##########################################################
|
|
||||||
# This RNS example demonstrates how to set up a link to #
|
|
||||||
# a destination, and pass binary data over it using a #
|
|
||||||
# using a channel buffer. #
|
|
||||||
##########################################################
|
|
||||||
from __future__ import annotations
|
|
||||||
import os
|
|
||||||
import sys
|
|
||||||
import time
|
|
||||||
import argparse
|
|
||||||
from datetime import datetime
|
|
||||||
|
|
||||||
import RNS
|
|
||||||
from RNS.vendor import umsgpack
|
|
||||||
|
|
||||||
# Let's define an app name. We'll use this for all
|
|
||||||
# destinations we create. Since this echo example
|
|
||||||
# is part of a range of example utilities, we'll put
|
|
||||||
# them all within the app namespace "example_utilities"
|
|
||||||
APP_NAME = "example_utilities"
|
|
||||||
|
|
||||||
|
|
||||||
##########################################################
|
|
||||||
#### Server Part #########################################
|
|
||||||
##########################################################
|
|
||||||
|
|
||||||
# A reference to the latest client link that connected
|
|
||||||
latest_client_link = None
|
|
||||||
|
|
||||||
# A reference to the latest buffer object
|
|
||||||
latest_buffer = None
|
|
||||||
|
|
||||||
# This initialisation is executed when the users chooses
|
|
||||||
# to run as a server
|
|
||||||
def server(configpath):
|
|
||||||
# We must first initialise Reticulum
|
|
||||||
reticulum = RNS.Reticulum(configpath)
|
|
||||||
|
|
||||||
# Randomly create a new identity for our link example
|
|
||||||
server_identity = RNS.Identity()
|
|
||||||
|
|
||||||
# We create a destination that clients can connect to. We
|
|
||||||
# want clients to create links to this destination, so we
|
|
||||||
# need to create a "single" destination type.
|
|
||||||
server_destination = RNS.Destination(
|
|
||||||
server_identity,
|
|
||||||
RNS.Destination.IN,
|
|
||||||
RNS.Destination.SINGLE,
|
|
||||||
APP_NAME,
|
|
||||||
"bufferexample"
|
|
||||||
)
|
|
||||||
|
|
||||||
# We configure a function that will get called every time
|
|
||||||
# a new client creates a link to this destination.
|
|
||||||
server_destination.set_link_established_callback(client_connected)
|
|
||||||
|
|
||||||
# Everything's ready!
|
|
||||||
# Let's Wait for client requests or user input
|
|
||||||
server_loop(server_destination)
|
|
||||||
|
|
||||||
def server_loop(destination):
|
|
||||||
# Let the user know that everything is ready
|
|
||||||
RNS.log(
|
|
||||||
"Link example "+
|
|
||||||
RNS.prettyhexrep(destination.hash)+
|
|
||||||
" running, waiting for a connection."
|
|
||||||
)
|
|
||||||
|
|
||||||
RNS.log("Hit enter to manually send an announce (Ctrl-C to quit)")
|
|
||||||
|
|
||||||
# We enter a loop that runs until the users exits.
|
|
||||||
# If the user hits enter, we will announce our server
|
|
||||||
# destination on the network, which will let clients
|
|
||||||
# know how to create messages directed towards it.
|
|
||||||
while True:
|
|
||||||
entered = input()
|
|
||||||
destination.announce()
|
|
||||||
RNS.log("Sent announce from "+RNS.prettyhexrep(destination.hash))
|
|
||||||
|
|
||||||
# When a client establishes a link to our server
|
|
||||||
# destination, this function will be called with
|
|
||||||
# a reference to the link.
|
|
||||||
def client_connected(link):
|
|
||||||
global latest_client_link, latest_buffer
|
|
||||||
latest_client_link = link
|
|
||||||
|
|
||||||
RNS.log("Client connected")
|
|
||||||
link.set_link_closed_callback(client_disconnected)
|
|
||||||
|
|
||||||
# If a new connection is received, the old reader
|
|
||||||
# needs to be disconnected.
|
|
||||||
if latest_buffer:
|
|
||||||
latest_buffer.close()
|
|
||||||
|
|
||||||
|
|
||||||
# Create buffer objects.
|
|
||||||
# The stream_id parameter to these functions is
|
|
||||||
# a bit like a file descriptor, except that it
|
|
||||||
# is unique to the *receiver*.
|
|
||||||
#
|
|
||||||
# In this example, both the reader and the writer
|
|
||||||
# use stream_id = 0, but there are actually two
|
|
||||||
# separate unidirectional streams flowing in
|
|
||||||
# opposite directions.
|
|
||||||
#
|
|
||||||
channel = link.get_channel()
|
|
||||||
latest_buffer = RNS.Buffer.create_bidirectional_buffer(0, 0, channel, server_buffer_ready)
|
|
||||||
|
|
||||||
def client_disconnected(link):
|
|
||||||
RNS.log("Client disconnected")
|
|
||||||
|
|
||||||
def server_buffer_ready(ready_bytes: int):
|
|
||||||
"""
|
|
||||||
Callback from buffer when buffer has data available
|
|
||||||
|
|
||||||
:param ready_bytes: The number of bytes ready to read
|
|
||||||
"""
|
|
||||||
global latest_buffer
|
|
||||||
|
|
||||||
data = latest_buffer.read(ready_bytes)
|
|
||||||
data = data.decode("utf-8")
|
|
||||||
|
|
||||||
RNS.log("Received data on the link: " + data)
|
|
||||||
|
|
||||||
reply_message = "I received \""+data+"\" over the buffer"
|
|
||||||
reply_message = reply_message.encode("utf-8")
|
|
||||||
latest_buffer.write(reply_message)
|
|
||||||
latest_buffer.flush()
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
##########################################################
|
|
||||||
#### Client Part #########################################
|
|
||||||
##########################################################
|
|
||||||
|
|
||||||
# A reference to the server link
|
|
||||||
server_link = None
|
|
||||||
|
|
||||||
# A reference to the buffer object, needed to share the
|
|
||||||
# object from the link connected callback to the client
|
|
||||||
# loop.
|
|
||||||
buffer = None
|
|
||||||
|
|
||||||
# This initialisation is executed when the users chooses
|
|
||||||
# to run as a client
|
|
||||||
def client(destination_hexhash, configpath):
|
|
||||||
# We need a binary representation of the destination
|
|
||||||
# hash that was entered on the command line
|
|
||||||
try:
|
|
||||||
dest_len = (RNS.Reticulum.TRUNCATED_HASHLENGTH//8)*2
|
|
||||||
if len(destination_hexhash) != dest_len:
|
|
||||||
raise ValueError(
|
|
||||||
"Destination length is invalid, must be {hex} hexadecimal characters ({byte} bytes).".format(hex=dest_len, byte=dest_len//2)
|
|
||||||
)
|
|
||||||
|
|
||||||
destination_hash = bytes.fromhex(destination_hexhash)
|
|
||||||
except:
|
|
||||||
RNS.log("Invalid destination entered. Check your input!\n")
|
|
||||||
exit()
|
|
||||||
|
|
||||||
# We must first initialise Reticulum
|
|
||||||
reticulum = RNS.Reticulum(configpath)
|
|
||||||
|
|
||||||
# Check if we know a path to the destination
|
|
||||||
if not RNS.Transport.has_path(destination_hash):
|
|
||||||
RNS.log("Destination is not yet known. Requesting path and waiting for announce to arrive...")
|
|
||||||
RNS.Transport.request_path(destination_hash)
|
|
||||||
while not RNS.Transport.has_path(destination_hash):
|
|
||||||
time.sleep(0.1)
|
|
||||||
|
|
||||||
# Recall the server identity
|
|
||||||
server_identity = RNS.Identity.recall(destination_hash)
|
|
||||||
|
|
||||||
# Inform the user that we'll begin connecting
|
|
||||||
RNS.log("Establishing link with server...")
|
|
||||||
|
|
||||||
# When the server identity is known, we set
|
|
||||||
# up a destination
|
|
||||||
server_destination = RNS.Destination(
|
|
||||||
server_identity,
|
|
||||||
RNS.Destination.OUT,
|
|
||||||
RNS.Destination.SINGLE,
|
|
||||||
APP_NAME,
|
|
||||||
"bufferexample"
|
|
||||||
)
|
|
||||||
|
|
||||||
# And create a link
|
|
||||||
link = RNS.Link(server_destination)
|
|
||||||
|
|
||||||
# We'll also set up functions to inform the
|
|
||||||
# user when the link is established or closed
|
|
||||||
link.set_link_established_callback(link_established)
|
|
||||||
link.set_link_closed_callback(link_closed)
|
|
||||||
|
|
||||||
# Everything is set up, so let's enter a loop
|
|
||||||
# for the user to interact with the example
|
|
||||||
client_loop()
|
|
||||||
|
|
||||||
def client_loop():
|
|
||||||
global server_link
|
|
||||||
|
|
||||||
# Wait for the link to become active
|
|
||||||
while not server_link:
|
|
||||||
time.sleep(0.1)
|
|
||||||
|
|
||||||
should_quit = False
|
|
||||||
while not should_quit:
|
|
||||||
try:
|
|
||||||
print("> ", end=" ")
|
|
||||||
text = input()
|
|
||||||
|
|
||||||
# Check if we should quit the example
|
|
||||||
if text == "quit" or text == "q" or text == "exit":
|
|
||||||
should_quit = True
|
|
||||||
server_link.teardown()
|
|
||||||
else:
|
|
||||||
# Otherwise, encode the text and write it to the buffer.
|
|
||||||
text = text.encode("utf-8")
|
|
||||||
buffer.write(text)
|
|
||||||
# Flush the buffer to force the data to be sent.
|
|
||||||
buffer.flush()
|
|
||||||
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
RNS.log("Error while sending data over the link: "+str(e))
|
|
||||||
should_quit = True
|
|
||||||
server_link.teardown()
|
|
||||||
|
|
||||||
# This function is called when a link
|
|
||||||
# has been established with the server
|
|
||||||
def link_established(link):
|
|
||||||
# We store a reference to the link
|
|
||||||
# instance for later use
|
|
||||||
global server_link, buffer
|
|
||||||
server_link = link
|
|
||||||
|
|
||||||
# Create buffer, see server_client_connected() for
|
|
||||||
# more detail about setting up the buffer.
|
|
||||||
channel = link.get_channel()
|
|
||||||
buffer = RNS.Buffer.create_bidirectional_buffer(0, 0, channel, client_buffer_ready)
|
|
||||||
|
|
||||||
# Inform the user that the server is
|
|
||||||
# connected
|
|
||||||
RNS.log("Link established with server, enter some text to send, or \"quit\" to quit")
|
|
||||||
|
|
||||||
# When a link is closed, we'll inform the
|
|
||||||
# user, and exit the program
|
|
||||||
def link_closed(link):
|
|
||||||
if link.teardown_reason == RNS.Link.TIMEOUT:
|
|
||||||
RNS.log("The link timed out, exiting now")
|
|
||||||
elif link.teardown_reason == RNS.Link.DESTINATION_CLOSED:
|
|
||||||
RNS.log("The link was closed by the server, exiting now")
|
|
||||||
else:
|
|
||||||
RNS.log("Link closed, exiting now")
|
|
||||||
|
|
||||||
RNS.Reticulum.exit_handler()
|
|
||||||
time.sleep(1.5)
|
|
||||||
os._exit(0)
|
|
||||||
|
|
||||||
# When the buffer has new data, read it and write it to the terminal.
|
|
||||||
def client_buffer_ready(ready_bytes: int):
|
|
||||||
global buffer
|
|
||||||
data = buffer.read(ready_bytes)
|
|
||||||
RNS.log("Received data on the link: " + data.decode("utf-8"))
|
|
||||||
print("> ", end=" ")
|
|
||||||
sys.stdout.flush()
|
|
||||||
|
|
||||||
|
|
||||||
##########################################################
|
|
||||||
#### Program Startup #####################################
|
|
||||||
##########################################################
|
|
||||||
|
|
||||||
# This part of the program runs at startup,
|
|
||||||
# and parses input of from the user, and then
|
|
||||||
# starts up the desired program mode.
|
|
||||||
if __name__ == "__main__":
|
|
||||||
try:
|
|
||||||
parser = argparse.ArgumentParser(description="Simple buffer example")
|
|
||||||
|
|
||||||
parser.add_argument(
|
|
||||||
"-s",
|
|
||||||
"--server",
|
|
||||||
action="store_true",
|
|
||||||
help="wait for incoming link requests from clients"
|
|
||||||
)
|
|
||||||
|
|
||||||
parser.add_argument(
|
|
||||||
"--config",
|
|
||||||
action="store",
|
|
||||||
default=None,
|
|
||||||
help="path to alternative Reticulum config directory",
|
|
||||||
type=str
|
|
||||||
)
|
|
||||||
|
|
||||||
parser.add_argument(
|
|
||||||
"destination",
|
|
||||||
nargs="?",
|
|
||||||
default=None,
|
|
||||||
help="hexadecimal hash of the server destination",
|
|
||||||
type=str
|
|
||||||
)
|
|
||||||
|
|
||||||
args = parser.parse_args()
|
|
||||||
|
|
||||||
if args.config:
|
|
||||||
configarg = args.config
|
|
||||||
else:
|
|
||||||
configarg = None
|
|
||||||
|
|
||||||
if args.server:
|
|
||||||
server(configarg)
|
|
||||||
else:
|
|
||||||
if (args.destination == None):
|
|
||||||
print("")
|
|
||||||
parser.print_help()
|
|
||||||
print("")
|
|
||||||
else:
|
|
||||||
client(args.destination, configarg)
|
|
||||||
|
|
||||||
except KeyboardInterrupt:
|
|
||||||
print("")
|
|
||||||
exit()
|
|
@ -243,6 +243,11 @@ def client(destination_hexhash, configpath):
|
|||||||
# And create a link
|
# And create a link
|
||||||
link = RNS.Link(server_destination)
|
link = RNS.Link(server_destination)
|
||||||
|
|
||||||
|
# We set a callback that will get executed
|
||||||
|
# every time a packet is received over the
|
||||||
|
# link
|
||||||
|
link.set_packet_callback(client_message_received)
|
||||||
|
|
||||||
# We'll also set up functions to inform the
|
# We'll also set up functions to inform the
|
||||||
# user when the link is established or closed
|
# user when the link is established or closed
|
||||||
link.set_link_established_callback(link_established)
|
link.set_link_established_callback(link_established)
|
||||||
@ -325,7 +330,7 @@ def link_closed(link):
|
|||||||
time.sleep(1.5)
|
time.sleep(1.5)
|
||||||
os._exit(0)
|
os._exit(0)
|
||||||
|
|
||||||
# When a packet is received over the channel, we
|
# When a packet is received over the link, we
|
||||||
# simply print out the data.
|
# simply print out the data.
|
||||||
def client_message_received(message):
|
def client_message_received(message):
|
||||||
if isinstance(message, StringMessage):
|
if isinstance(message, StringMessage):
|
||||||
@ -343,7 +348,7 @@ def client_message_received(message):
|
|||||||
# starts up the desired program mode.
|
# starts up the desired program mode.
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
try:
|
try:
|
||||||
parser = argparse.ArgumentParser(description="Simple channel example")
|
parser = argparse.ArgumentParser(description="Simple link example")
|
||||||
|
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"-s",
|
"-s",
|
||||||
|
305
RNS/Buffer.py
305
RNS/Buffer.py
@ -1,305 +0,0 @@
|
|||||||
from __future__ import annotations
|
|
||||||
import sys
|
|
||||||
from threading import RLock
|
|
||||||
from RNS.vendor import umsgpack
|
|
||||||
from RNS.Channel import Channel, MessageBase, SystemMessageTypes
|
|
||||||
import RNS
|
|
||||||
from io import RawIOBase, BufferedRWPair, BufferedReader, BufferedWriter
|
|
||||||
from typing import Callable
|
|
||||||
from contextlib import AbstractContextManager
|
|
||||||
|
|
||||||
|
|
||||||
class StreamDataMessage(MessageBase):
|
|
||||||
MSGTYPE = SystemMessageTypes.SMT_STREAM_DATA
|
|
||||||
"""
|
|
||||||
Message type for ``Channel``. ``StreamDataMessage``
|
|
||||||
uses a system-reserved message type.
|
|
||||||
"""
|
|
||||||
|
|
||||||
STREAM_ID_MAX = 65535
|
|
||||||
"""
|
|
||||||
While not essential for the current message packing
|
|
||||||
method (umsgpack), the stream id is clamped to the
|
|
||||||
size of a UInt16 for future struct packing.
|
|
||||||
"""
|
|
||||||
|
|
||||||
OVERHEAD = 0
|
|
||||||
"""
|
|
||||||
The number of bytes used by this messa
|
|
||||||
|
|
||||||
When the Buffer package is imported, this value is
|
|
||||||
calculated based on the value of RNS.Link.MDU.
|
|
||||||
"""
|
|
||||||
|
|
||||||
MAX_DATA_LEN = 0
|
|
||||||
"""
|
|
||||||
When the Buffer package is imported, this value is
|
|
||||||
calculcated based on the value of OVERHEAD
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, stream_id: int = None, data: bytes = None, eof: bool = False):
|
|
||||||
"""
|
|
||||||
This class is used to encapsulate binary stream
|
|
||||||
data to be sent over a ``Channel``.
|
|
||||||
|
|
||||||
:param stream_id: id of stream relative to receiver
|
|
||||||
:param data: binary data
|
|
||||||
:param eof: set to True if signalling End of File
|
|
||||||
"""
|
|
||||||
super().__init__()
|
|
||||||
if stream_id is not None and stream_id > self.STREAM_ID_MAX:
|
|
||||||
raise ValueError("stream_id must be 0-65535")
|
|
||||||
self.stream_id = stream_id
|
|
||||||
self.data = data or bytes()
|
|
||||||
self.eof = eof
|
|
||||||
|
|
||||||
def pack(self) -> bytes:
|
|
||||||
if self.stream_id is None:
|
|
||||||
raise ValueError("stream_id")
|
|
||||||
return umsgpack.packb((self.stream_id, self.eof, bytes(self.data)))
|
|
||||||
|
|
||||||
def unpack(self, raw):
|
|
||||||
self.stream_id, self.eof, self.data = umsgpack.unpackb(raw)
|
|
||||||
|
|
||||||
|
|
||||||
_link_sized_bytes = ("\0"*RNS.Link.MDU).encode("utf-8")
|
|
||||||
StreamDataMessage.OVERHEAD = len(StreamDataMessage(stream_id=StreamDataMessage.STREAM_ID_MAX,
|
|
||||||
data=_link_sized_bytes,
|
|
||||||
eof=True).pack()) - len(_link_sized_bytes) - 6 # 6 is envelope overhead
|
|
||||||
StreamDataMessage.MAX_DATA_LEN = RNS.Link.MDU - StreamDataMessage.OVERHEAD
|
|
||||||
_link_sized_bytes = None
|
|
||||||
|
|
||||||
|
|
||||||
class RawChannelReader(RawIOBase, AbstractContextManager):
|
|
||||||
"""
|
|
||||||
An implementation of RawIOBase that receives
|
|
||||||
binary stream data sent over a ``Channel``.
|
|
||||||
|
|
||||||
This class generally need not be instantiated directly.
|
|
||||||
Use :func:`RNS.Buffer.create_reader`,
|
|
||||||
:func:`RNS.Buffer.create_writer`, and
|
|
||||||
:func:`RNS.Buffer.create_bidirectional_buffer` functions
|
|
||||||
to create buffered streams with optional callbacks.
|
|
||||||
|
|
||||||
For additional information on the API of this
|
|
||||||
object, see the Python documentation for
|
|
||||||
``RawIOBase``.
|
|
||||||
"""
|
|
||||||
def __init__(self, stream_id: int, channel: Channel):
|
|
||||||
"""
|
|
||||||
Create a raw channel reader.
|
|
||||||
|
|
||||||
:param stream_id: local stream id to receive at
|
|
||||||
:param channel: ``Channel`` object to receive from
|
|
||||||
"""
|
|
||||||
self._stream_id = stream_id
|
|
||||||
self._channel = channel
|
|
||||||
self._lock = RLock()
|
|
||||||
self._buffer = bytearray()
|
|
||||||
self._eof = False
|
|
||||||
self._channel._register_message_type(StreamDataMessage, is_system_type=True)
|
|
||||||
self._channel.add_message_handler(self._handle_message)
|
|
||||||
self._listeners: [Callable[[int], None]] = []
|
|
||||||
|
|
||||||
def add_ready_callback(self, cb: Callable[[int], None]):
|
|
||||||
"""
|
|
||||||
Add a function to be called when new data is available.
|
|
||||||
The function should have the signature ``(ready_bytes: int) -> None``
|
|
||||||
|
|
||||||
:param cb: function to call
|
|
||||||
"""
|
|
||||||
with self._lock:
|
|
||||||
self._listeners.append(cb)
|
|
||||||
|
|
||||||
def remove_ready_callback(self, cb: Callable[[int], None]):
|
|
||||||
"""
|
|
||||||
Remove a function added with :func:`RNS.RawChannelReader.add_ready_callback()`
|
|
||||||
|
|
||||||
:param cb: function to remove
|
|
||||||
"""
|
|
||||||
with self._lock:
|
|
||||||
self._listeners.remove(cb)
|
|
||||||
|
|
||||||
def _handle_message(self, message: MessageBase):
|
|
||||||
if isinstance(message, StreamDataMessage):
|
|
||||||
if message.stream_id == self._stream_id:
|
|
||||||
with self._lock:
|
|
||||||
if message.data is not None:
|
|
||||||
self._buffer.extend(message.data)
|
|
||||||
if message.eof:
|
|
||||||
self._eof = True
|
|
||||||
for listener in self._listeners:
|
|
||||||
try:
|
|
||||||
listener(len(self._buffer))
|
|
||||||
except Exception as ex:
|
|
||||||
RNS.log("Error calling RawChannelReader(" + str(self._stream_id) + ") callback: " + str(ex))
|
|
||||||
return True
|
|
||||||
return False
|
|
||||||
|
|
||||||
def _read(self, __size: int) -> bytes | None:
|
|
||||||
with self._lock:
|
|
||||||
result = self._buffer[:__size]
|
|
||||||
self._buffer = self._buffer[__size:]
|
|
||||||
return result if len(result) > 0 or self._eof else None
|
|
||||||
|
|
||||||
def readinto(self, __buffer: bytearray) -> int | None:
|
|
||||||
ready = self._read(len(__buffer))
|
|
||||||
if ready:
|
|
||||||
__buffer[:len(ready)] = ready
|
|
||||||
return len(ready) if ready else None
|
|
||||||
|
|
||||||
def writable(self) -> bool:
|
|
||||||
return False
|
|
||||||
|
|
||||||
def seekable(self) -> bool:
|
|
||||||
return False
|
|
||||||
|
|
||||||
def readable(self) -> bool:
|
|
||||||
return True
|
|
||||||
|
|
||||||
def close(self):
|
|
||||||
with self._lock:
|
|
||||||
self._channel.remove_message_handler(self._handle_message)
|
|
||||||
self._listeners.clear()
|
|
||||||
|
|
||||||
def __enter__(self):
|
|
||||||
return self
|
|
||||||
|
|
||||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
||||||
self.close()
|
|
||||||
return False
|
|
||||||
|
|
||||||
|
|
||||||
class RawChannelWriter(RawIOBase, AbstractContextManager):
|
|
||||||
"""
|
|
||||||
An implementation of RawIOBase that receives
|
|
||||||
binary stream data sent over a channel.
|
|
||||||
|
|
||||||
This class generally need not be instantiated directly.
|
|
||||||
Use :func:`RNS.Buffer.create_reader`,
|
|
||||||
:func:`RNS.Buffer.create_writer`, and
|
|
||||||
:func:`RNS.Buffer.create_bidirectional_buffer` functions
|
|
||||||
to create buffered streams with optional callbacks.
|
|
||||||
|
|
||||||
For additional information on the API of this
|
|
||||||
object, see the Python documentation for
|
|
||||||
``RawIOBase``.
|
|
||||||
"""
|
|
||||||
def __init__(self, stream_id: int, channel: Channel):
|
|
||||||
"""
|
|
||||||
Create a raw channel writer.
|
|
||||||
|
|
||||||
:param stream_id: remote stream id to sent do
|
|
||||||
:param channel: ``Channel`` object to send on
|
|
||||||
"""
|
|
||||||
self._stream_id = stream_id
|
|
||||||
self._channel = channel
|
|
||||||
self._eof = False
|
|
||||||
|
|
||||||
def write(self, __b: bytes) -> int | None:
|
|
||||||
try:
|
|
||||||
if self._channel.is_ready_to_send():
|
|
||||||
chunk = __b[:StreamDataMessage.MAX_DATA_LEN]
|
|
||||||
message = StreamDataMessage(self._stream_id, chunk, self._eof)
|
|
||||||
self._channel.send(message)
|
|
||||||
return len(chunk)
|
|
||||||
except RNS.Channel.ChannelException as cex:
|
|
||||||
if cex.type != RNS.Channel.CEType.ME_LINK_NOT_READY:
|
|
||||||
raise
|
|
||||||
return 0
|
|
||||||
|
|
||||||
def close(self):
|
|
||||||
self._eof = True
|
|
||||||
self.write(bytes())
|
|
||||||
|
|
||||||
def __enter__(self):
|
|
||||||
return self
|
|
||||||
|
|
||||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
||||||
self.close()
|
|
||||||
return False
|
|
||||||
|
|
||||||
def seekable(self) -> bool:
|
|
||||||
return False
|
|
||||||
|
|
||||||
def readable(self) -> bool:
|
|
||||||
return False
|
|
||||||
|
|
||||||
def writable(self) -> bool:
|
|
||||||
return True
|
|
||||||
|
|
||||||
class Buffer:
|
|
||||||
"""
|
|
||||||
Static functions for creating buffered streams that send
|
|
||||||
and receive over a ``Channel``.
|
|
||||||
|
|
||||||
These functions use ``BufferedReader``, ``BufferedWriter``,
|
|
||||||
and ``BufferedRWPair`` to add buffering to
|
|
||||||
``RawChannelReader`` and ``RawChannelWriter``.
|
|
||||||
"""
|
|
||||||
@staticmethod
|
|
||||||
def create_reader(stream_id: int, channel: Channel,
|
|
||||||
ready_callback: Callable[[int], None] | None = None) -> BufferedReader:
|
|
||||||
"""
|
|
||||||
Create a buffered reader that reads binary data sent
|
|
||||||
over a ``Channel``, with an optional callback when
|
|
||||||
new data is available.
|
|
||||||
|
|
||||||
Callback signature: ``(ready_bytes: int) -> None``
|
|
||||||
|
|
||||||
For more information on the reader-specific functions
|
|
||||||
of this object, see the Python documentation for
|
|
||||||
``BufferedReader``
|
|
||||||
|
|
||||||
:param stream_id: the local stream id to receive from
|
|
||||||
:param channel: the channel to receive on
|
|
||||||
:param ready_callback: function to call when new data is available
|
|
||||||
:return: a BufferedReader object
|
|
||||||
"""
|
|
||||||
reader = RawChannelReader(stream_id, channel)
|
|
||||||
if ready_callback:
|
|
||||||
reader.add_ready_callback(ready_callback)
|
|
||||||
return BufferedReader(reader)
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def create_writer(stream_id: int, channel: Channel) -> BufferedWriter:
|
|
||||||
"""
|
|
||||||
Create a buffered writer that writes binary data over
|
|
||||||
a ``Channel``.
|
|
||||||
|
|
||||||
For more information on the writer-specific functions
|
|
||||||
of this object, see the Python documentation for
|
|
||||||
``BufferedWriter``
|
|
||||||
|
|
||||||
:param stream_id: the remote stream id to send to
|
|
||||||
:param channel: the channel to send on
|
|
||||||
:return: a BufferedWriter object
|
|
||||||
"""
|
|
||||||
writer = RawChannelWriter(stream_id, channel)
|
|
||||||
return BufferedWriter(writer)
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def create_bidirectional_buffer(receive_stream_id: int, send_stream_id: int, channel: Channel,
|
|
||||||
ready_callback: Callable[[int], None] | None = None) -> BufferedRWPair:
|
|
||||||
"""
|
|
||||||
Create a buffered reader/writer pair that reads and
|
|
||||||
writes binary data over a ``Channel``, with an
|
|
||||||
optional callback when new data is available.
|
|
||||||
|
|
||||||
Callback signature: ``(ready_bytes: int) -> None``
|
|
||||||
|
|
||||||
For more information on the reader-specific functions
|
|
||||||
of this object, see the Python documentation for
|
|
||||||
``BufferedRWPair``
|
|
||||||
|
|
||||||
:param receive_stream_id: the local stream id to receive at
|
|
||||||
:param send_stream_id: the remote stream id to send to
|
|
||||||
:param channel: the channel to send and receive on
|
|
||||||
:param ready_callback: function to call when new data is available
|
|
||||||
:return: a BufferedRWPair object
|
|
||||||
"""
|
|
||||||
reader = RawChannelReader(receive_stream_id, channel)
|
|
||||||
if ready_callback:
|
|
||||||
reader.add_ready_callback(ready_callback)
|
|
||||||
writer = RawChannelWriter(send_stream_id, channel)
|
|
||||||
return BufferedRWPair(reader, writer)
|
|
@ -34,8 +34,6 @@ import RNS
|
|||||||
from abc import ABC, abstractmethod
|
from abc import ABC, abstractmethod
|
||||||
TPacket = TypeVar("TPacket")
|
TPacket = TypeVar("TPacket")
|
||||||
|
|
||||||
class SystemMessageTypes(enum.IntEnum):
|
|
||||||
SMT_STREAM_DATA = 0xff00
|
|
||||||
|
|
||||||
class ChannelOutletBase(ABC, Generic[TPacket]):
|
class ChannelOutletBase(ABC, Generic[TPacket]):
|
||||||
"""
|
"""
|
||||||
|
@ -33,7 +33,6 @@ from .Reticulum import Reticulum
|
|||||||
from .Identity import Identity
|
from .Identity import Identity
|
||||||
from .Link import Link, RequestReceipt
|
from .Link import Link, RequestReceipt
|
||||||
from .Channel import MessageBase
|
from .Channel import MessageBase
|
||||||
from .Buffer import Buffer, RawChannelReader, RawChannelWriter
|
|
||||||
from .Transport import Transport
|
from .Transport import Transport
|
||||||
from .Destination import Destination
|
from .Destination import Destination
|
||||||
from .Packet import Packet
|
from .Packet import Packet
|
||||||
|
@ -104,16 +104,6 @@ data between peers of a ``Link``.
|
|||||||
|
|
||||||
This example can also be found at `<https://github.com/markqvist/Reticulum/blob/master/Examples/Channel.py>`_.
|
This example can also be found at `<https://github.com/markqvist/Reticulum/blob/master/Examples/Channel.py>`_.
|
||||||
|
|
||||||
Buffer
|
|
||||||
======
|
|
||||||
|
|
||||||
The *Buffer* example explores using buffered readers and writers to send
|
|
||||||
binary data between peers of a ``Link``.
|
|
||||||
|
|
||||||
.. literalinclude:: ../../Examples/Buffer.py
|
|
||||||
|
|
||||||
This example can also be found at `<https://github.com/markqvist/Reticulum/blob/master/Examples/Buffer.py>`_.
|
|
||||||
|
|
||||||
.. _example-filetransfer:
|
.. _example-filetransfer:
|
||||||
|
|
||||||
Filetransfer
|
Filetransfer
|
||||||
|
@ -149,48 +149,6 @@ This chapter lists and explains all classes exposed by the Reticulum Network Sta
|
|||||||
.. autoclass:: RNS.MessageBase()
|
.. autoclass:: RNS.MessageBase()
|
||||||
:members:
|
:members:
|
||||||
|
|
||||||
.. _api-buffer:
|
|
||||||
|
|
||||||
.. only:: html
|
|
||||||
|
|
||||||
|start-h3| Buffer |end-h3|
|
|
||||||
|
|
||||||
.. only:: latex
|
|
||||||
|
|
||||||
Buffer
|
|
||||||
------
|
|
||||||
|
|
||||||
.. autoclass:: RNS.Buffer
|
|
||||||
:members:
|
|
||||||
|
|
||||||
.. _api-rawchannelreader:
|
|
||||||
|
|
||||||
.. only:: html
|
|
||||||
|
|
||||||
|start-h3| RawChannelReader |end-h3|
|
|
||||||
|
|
||||||
.. only:: latex
|
|
||||||
|
|
||||||
RawChannelReader
|
|
||||||
----------------
|
|
||||||
|
|
||||||
.. autoclass:: RNS.RawChannelReader
|
|
||||||
:members: __init__, add_ready_callback, remove_ready_callback
|
|
||||||
|
|
||||||
.. _api-rawchannelwriter:
|
|
||||||
|
|
||||||
.. only:: html
|
|
||||||
|
|
||||||
|start-h3| RawChannelWriter |end-h3|
|
|
||||||
|
|
||||||
.. only:: latex
|
|
||||||
|
|
||||||
RawChannelWriter
|
|
||||||
----------------
|
|
||||||
|
|
||||||
.. autoclass:: RNS.RawChannelWriter
|
|
||||||
:members: __init__
|
|
||||||
|
|
||||||
.. _api-transport:
|
.. _api-transport:
|
||||||
|
|
||||||
.. only:: html
|
.. only:: html
|
||||||
|
103
tests/channel.py
103
tests/channel.py
@ -2,7 +2,6 @@ from __future__ import annotations
|
|||||||
import threading
|
import threading
|
||||||
import RNS
|
import RNS
|
||||||
from RNS.Channel import MessageState, ChannelOutletBase, Channel, MessageBase
|
from RNS.Channel import MessageState, ChannelOutletBase, Channel, MessageBase
|
||||||
import RNS.Buffer
|
|
||||||
from RNS.vendor import umsgpack
|
from RNS.vendor import umsgpack
|
||||||
from typing import Callable
|
from typing import Callable
|
||||||
import contextlib
|
import contextlib
|
||||||
@ -92,18 +91,15 @@ class ChannelOutletTest(ChannelOutletBase):
|
|||||||
self._rtt = rtt
|
self._rtt = rtt
|
||||||
self._usable = True
|
self._usable = True
|
||||||
self.packets = []
|
self.packets = []
|
||||||
self.lock = threading.RLock()
|
|
||||||
self.packet_callback: Callable[[ChannelOutletBase, bytes], None] | None = None
|
self.packet_callback: Callable[[ChannelOutletBase, bytes], None] | None = None
|
||||||
|
|
||||||
def send(self, raw: bytes) -> Packet:
|
def send(self, raw: bytes) -> Packet:
|
||||||
with self.lock:
|
|
||||||
packet = Packet(raw)
|
packet = Packet(raw)
|
||||||
packet.send()
|
packet.send()
|
||||||
self.packets.append(packet)
|
self.packets.append(packet)
|
||||||
return packet
|
return packet
|
||||||
|
|
||||||
def resend(self, packet: Packet) -> Packet:
|
def resend(self, packet: Packet) -> Packet:
|
||||||
with self.lock:
|
|
||||||
packet.send()
|
packet.send()
|
||||||
return packet
|
return packet
|
||||||
|
|
||||||
@ -374,105 +370,6 @@ class TestChannel(unittest.TestCase):
|
|||||||
|
|
||||||
self.eat_own_dog_food(message, check)
|
self.eat_own_dog_food(message, check)
|
||||||
|
|
||||||
def test_buffer_small_bidirectional(self):
|
|
||||||
data = "Hello\n"
|
|
||||||
with RNS.Buffer.create_bidirectional_buffer(0, 0, self.h.channel) as buffer:
|
|
||||||
count = buffer.write(data.encode("utf-8"))
|
|
||||||
buffer.flush()
|
|
||||||
|
|
||||||
self.assertEqual(len(data), count)
|
|
||||||
self.assertEqual(1, len(self.h.outlet.packets))
|
|
||||||
|
|
||||||
packet = self.h.outlet.packets[0]
|
|
||||||
self.h.channel._receive(packet.raw)
|
|
||||||
result = buffer.readline()
|
|
||||||
|
|
||||||
self.assertIsNotNone(result)
|
|
||||||
self.assertEqual(len(result), len(data))
|
|
||||||
|
|
||||||
decoded = result.decode("utf-8")
|
|
||||||
|
|
||||||
self.assertEqual(data, decoded)
|
|
||||||
|
|
||||||
def test_buffer_big(self):
|
|
||||||
writer = RNS.Buffer.create_writer(15, self.h.channel)
|
|
||||||
reader = RNS.Buffer.create_reader(15, self.h.channel)
|
|
||||||
data = "01234556789"*1024 # 10 KB
|
|
||||||
count = 0
|
|
||||||
write_finished = False
|
|
||||||
|
|
||||||
def write_thread():
|
|
||||||
nonlocal count, write_finished
|
|
||||||
count = writer.write(data.encode("utf-8"))
|
|
||||||
writer.flush()
|
|
||||||
writer.close()
|
|
||||||
write_finished = True
|
|
||||||
threading.Thread(target=write_thread, name="Write Thread", daemon=True).start()
|
|
||||||
|
|
||||||
while not write_finished or next(filter(lambda x: x.state != MessageState.MSGSTATE_DELIVERED,
|
|
||||||
self.h.outlet.packets), None) is not None:
|
|
||||||
with self.h.outlet.lock:
|
|
||||||
for packet in self.h.outlet.packets:
|
|
||||||
if packet.state != MessageState.MSGSTATE_DELIVERED:
|
|
||||||
self.h.channel._receive(packet.raw)
|
|
||||||
packet.delivered()
|
|
||||||
time.sleep(0.0001)
|
|
||||||
|
|
||||||
self.assertEqual(len(data), count)
|
|
||||||
|
|
||||||
read_finished = False
|
|
||||||
result = bytes()
|
|
||||||
|
|
||||||
def read_thread():
|
|
||||||
nonlocal read_finished, result
|
|
||||||
result = reader.read()
|
|
||||||
read_finished = True
|
|
||||||
threading.Thread(target=read_thread, name="Read Thread", daemon=True).start()
|
|
||||||
|
|
||||||
timeout_at = time.time() + 7
|
|
||||||
while not read_finished and time.time() < timeout_at:
|
|
||||||
time.sleep(0.001)
|
|
||||||
|
|
||||||
self.assertTrue(read_finished)
|
|
||||||
self.assertEqual(len(data), len(result))
|
|
||||||
|
|
||||||
decoded = result.decode("utf-8")
|
|
||||||
|
|
||||||
self.assertSequenceEqual(data, decoded)
|
|
||||||
|
|
||||||
def test_buffer_small_with_callback(self):
|
|
||||||
callbacks = 0
|
|
||||||
last_cb_value = None
|
|
||||||
|
|
||||||
def callback(ready: int):
|
|
||||||
nonlocal callbacks, last_cb_value
|
|
||||||
callbacks += 1
|
|
||||||
last_cb_value = ready
|
|
||||||
|
|
||||||
data = "Hello\n"
|
|
||||||
with RNS.RawChannelWriter(0, self.h.channel) as writer, RNS.RawChannelReader(0, self.h.channel) as reader:
|
|
||||||
reader.add_ready_callback(callback)
|
|
||||||
count = writer.write(data.encode("utf-8"))
|
|
||||||
writer.flush()
|
|
||||||
|
|
||||||
self.assertEqual(len(data), count)
|
|
||||||
self.assertEqual(1, len(self.h.outlet.packets))
|
|
||||||
|
|
||||||
packet = self.h.outlet.packets[0]
|
|
||||||
self.h.channel._receive(packet.raw)
|
|
||||||
|
|
||||||
self.assertEqual(1, callbacks)
|
|
||||||
self.assertEqual(len(data), last_cb_value)
|
|
||||||
|
|
||||||
result = reader.readline()
|
|
||||||
|
|
||||||
self.assertIsNotNone(result)
|
|
||||||
self.assertEqual(len(result), len(data))
|
|
||||||
|
|
||||||
decoded = result.decode("utf-8")
|
|
||||||
|
|
||||||
self.assertEqual(data, decoded)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
unittest.main(verbosity=2)
|
unittest.main(verbosity=2)
|
||||||
|
@ -396,48 +396,6 @@ class TestLink(unittest.TestCase):
|
|||||||
self.assertEqual(l1.status, RNS.Link.CLOSED)
|
self.assertEqual(l1.status, RNS.Link.CLOSED)
|
||||||
self.assertEqual(0, len(l1._channel._rx_ring))
|
self.assertEqual(0, len(l1._channel._rx_ring))
|
||||||
|
|
||||||
def test_11_buffer_round_trip(self):
|
|
||||||
global c_rns
|
|
||||||
init_rns(self)
|
|
||||||
print("")
|
|
||||||
print("Buffer round trip test")
|
|
||||||
|
|
||||||
# TODO: Load this from public bytes only
|
|
||||||
id1 = RNS.Identity.from_bytes(bytes.fromhex(fixed_keys[0][0]))
|
|
||||||
self.assertEqual(id1.hash, bytes.fromhex(fixed_keys[0][1]))
|
|
||||||
|
|
||||||
dest = RNS.Destination(id1, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "link", "establish")
|
|
||||||
|
|
||||||
self.assertEqual(dest.hash, bytes.fromhex("fb48da0e82e6e01ba0c014513f74540d"))
|
|
||||||
|
|
||||||
l1 = RNS.Link(dest)
|
|
||||||
time.sleep(1)
|
|
||||||
self.assertEqual(l1.status, RNS.Link.ACTIVE)
|
|
||||||
buffer = None
|
|
||||||
|
|
||||||
received = []
|
|
||||||
def handle_data(ready_bytes: int):
|
|
||||||
data = buffer.read(ready_bytes)
|
|
||||||
received.append(data)
|
|
||||||
|
|
||||||
channel = l1.get_channel()
|
|
||||||
buffer = RNS.Buffer.create_bidirectional_buffer(0, 0, channel, handle_data)
|
|
||||||
|
|
||||||
buffer.write("Hi there".encode("utf-8"))
|
|
||||||
buffer.flush()
|
|
||||||
|
|
||||||
time.sleep(0.5)
|
|
||||||
|
|
||||||
self.assertEqual(1 , len(received))
|
|
||||||
|
|
||||||
rx_message = received[0].decode("utf-8")
|
|
||||||
|
|
||||||
self.assertEqual("Hi there back at you", rx_message)
|
|
||||||
|
|
||||||
l1.teardown()
|
|
||||||
time.sleep(0.5)
|
|
||||||
self.assertEqual(l1.status, RNS.Link.CLOSED)
|
|
||||||
|
|
||||||
|
|
||||||
def size_str(self, num, suffix='B'):
|
def size_str(self, num, suffix='B'):
|
||||||
units = ['','K','M','G','T','P','E','Z']
|
units = ['','K','M','G','T','P','E','Z']
|
||||||
@ -504,15 +462,6 @@ def targets(yp=False):
|
|||||||
channel.register_message_type(MessageTest)
|
channel.register_message_type(MessageTest)
|
||||||
channel.add_message_handler(handle_message)
|
channel.add_message_handler(handle_message)
|
||||||
|
|
||||||
buffer = None
|
|
||||||
|
|
||||||
def handle_buffer(ready_bytes: int):
|
|
||||||
data = buffer.read(ready_bytes)
|
|
||||||
buffer.write((data.decode("utf-8") + " back at you").encode("utf-8"))
|
|
||||||
buffer.flush()
|
|
||||||
|
|
||||||
buffer = RNS.Buffer.create_bidirectional_buffer(0, 0, channel, handle_buffer)
|
|
||||||
|
|
||||||
m_rns = RNS.Reticulum("./tests/rnsconfig")
|
m_rns = RNS.Reticulum("./tests/rnsconfig")
|
||||||
id1 = RNS.Identity.from_bytes(bytes.fromhex(fixed_keys[0][0]))
|
id1 = RNS.Identity.from_bytes(bytes.fromhex(fixed_keys[0][0]))
|
||||||
d1 = RNS.Destination(id1, RNS.Destination.IN, RNS.Destination.SINGLE, APP_NAME, "link", "establish")
|
d1 = RNS.Destination(id1, RNS.Destination.IN, RNS.Destination.SINGLE, APP_NAME, "link", "establish")
|
||||||
|
Loading…
Reference in New Issue
Block a user