Added ifaddr module

This commit is contained in:
Mark Qvist 2023-05-04 17:46:56 +02:00
parent 29693c6fe2
commit b740e36985
8 changed files with 558 additions and 1 deletions

View File

@ -341,8 +341,8 @@ projects:
- [Curve25519.py](https://gist.github.com/nickovs/cc3c22d15f239a2640c185035c06f8a3#file-curve25519-py) by [Nicko van Someren](https://gist.github.com/nickovs), *Public Domain*
- [I2Plib](https://github.com/l-n-s/i2plib) by [Viktor Villainov](https://github.com/l-n-s)
- [PySerial](https://github.com/pyserial/pyserial) by Chris Liechti, *BSD License*
- [Netifaces](https://github.com/al45tair/netifaces) by [Alastair Houghton](https://github.com/al45tair), *MIT License*
- [Configobj](https://github.com/DiffSK/configobj) by Michael Foord, Nicola Larosa, Rob Dennis & Eli Courtwright, *BSD License*
- [Six](https://github.com/benjaminp/six) by [Benjamin Peterson](https://github.com/benjaminp), *MIT License*
- [Umsgpack.py](https://github.com/vsergeev/u-msgpack-python) by [Ivan A. Sergeev](https://github.com/vsergeev)
- [ifaddr](https://github.com/pydron/ifaddr) by [Pydron](https://github.com/pydron), *MIT License*
- [Python](https://www.python.org)

33
RNS/vendor/ifaddr/__init__.py vendored Normal file
View File

@ -0,0 +1,33 @@
# Copyright (c) 2014 Stefan C. Mueller
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to
# deal in the Software without restriction, including without limitation the
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
# sell copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.
import os
from ifaddr._shared import Adapter, IP
if os.name == "nt":
from ifaddr._win32 import get_adapters
elif os.name == "posix":
from ifaddr._posix import get_adapters
else:
raise RuntimeError("Unsupported Operating System: %s" % os.name)
__all__ = ['Adapter', 'IP', 'get_adapters']

96
RNS/vendor/ifaddr/_posix.py vendored Normal file
View File

@ -0,0 +1,96 @@
# Copyright (c) 2014 Stefan C. Mueller
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to
# deal in the Software without restriction, including without limitation the
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
# sell copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.
import os
import ctypes.util
import ipaddress
import collections
import socket
from typing import Iterable, Optional
import ifaddr._shared as shared
# from ifaddr._shared import sockaddr, Interface, sockaddr_to_ip, ipv6_prefixlength
class ifaddrs(ctypes.Structure):
pass
ifaddrs._fields_ = [
('ifa_next', ctypes.POINTER(ifaddrs)),
('ifa_name', ctypes.c_char_p),
('ifa_flags', ctypes.c_uint),
('ifa_addr', ctypes.POINTER(shared.sockaddr)),
('ifa_netmask', ctypes.POINTER(shared.sockaddr)),
]
libc = ctypes.CDLL(ctypes.util.find_library("socket" if os.uname()[0] == "SunOS" else "c"), use_errno=True) # type: ignore
def get_adapters(include_unconfigured: bool = False) -> Iterable[shared.Adapter]:
addr0 = addr = ctypes.POINTER(ifaddrs)()
retval = libc.getifaddrs(ctypes.byref(addr))
if retval != 0:
eno = ctypes.get_errno()
raise OSError(eno, os.strerror(eno))
ips = collections.OrderedDict()
def add_ip(adapter_name: str, ip: Optional[shared.IP]) -> None:
if adapter_name not in ips:
index = None # type: Optional[int]
try:
# Mypy errors on this when the Windows CI runs:
# error: Module has no attribute "if_nametoindex"
index = socket.if_nametoindex(adapter_name) # type: ignore
except (OSError, AttributeError):
pass
ips[adapter_name] = shared.Adapter(adapter_name, adapter_name, [], index=index)
if ip is not None:
ips[adapter_name].ips.append(ip)
while addr:
name = addr[0].ifa_name.decode(encoding='UTF-8')
ip_addr = shared.sockaddr_to_ip(addr[0].ifa_addr)
if ip_addr:
if addr[0].ifa_netmask and not addr[0].ifa_netmask[0].sa_familiy:
addr[0].ifa_netmask[0].sa_familiy = addr[0].ifa_addr[0].sa_familiy
netmask = shared.sockaddr_to_ip(addr[0].ifa_netmask)
if isinstance(netmask, tuple):
netmaskStr = str(netmask[0])
prefixlen = shared.ipv6_prefixlength(ipaddress.IPv6Address(netmaskStr))
else:
assert netmask is not None, f'sockaddr_to_ip({addr[0].ifa_netmask}) returned None'
netmaskStr = str('0.0.0.0/' + netmask)
prefixlen = ipaddress.IPv4Network(netmaskStr).prefixlen
ip = shared.IP(ip_addr, prefixlen, name)
add_ip(name, ip)
else:
if include_unconfigured:
add_ip(name, None)
addr = addr[0].ifa_next
libc.freeifaddrs(addr0)
return ips.values()

199
RNS/vendor/ifaddr/_shared.py vendored Normal file
View File

@ -0,0 +1,199 @@
# Copyright (c) 2014 Stefan C. Mueller
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to
# deal in the Software without restriction, including without limitation the
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
# sell copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.
import ctypes
import socket
import ipaddress
import platform
from typing import List, Optional, Tuple, Union
class Adapter(object):
"""
Represents a network interface device controller (NIC), such as a
network card. An adapter can have multiple IPs.
On Linux aliasing (multiple IPs per physical NIC) is implemented
by creating 'virtual' adapters, each represented by an instance
of this class. Each of those 'virtual' adapters can have both
a IPv4 and an IPv6 IP address.
"""
def __init__(self, name: str, nice_name: str, ips: List['IP'], index: Optional[int] = None) -> None:
#: Unique name that identifies the adapter in the system.
#: On Linux this is of the form of `eth0` or `eth0:1`, on
#: Windows it is a UUID in string representation, such as
#: `{846EE342-7039-11DE-9D20-806E6F6E6963}`.
self.name = name
#: Human readable name of the adpater. On Linux this
#: is currently the same as :attr:`name`. On Windows
#: this is the name of the device.
self.nice_name = nice_name
#: List of :class:`ifaddr.IP` instances in the order they were
#: reported by the system.
self.ips = ips
#: Adapter index as used by some API (e.g. IPv6 multicast group join).
self.index = index
def __repr__(self) -> str:
return "Adapter(name={name}, nice_name={nice_name}, ips={ips}, index={index})".format(
name=repr(self.name), nice_name=repr(self.nice_name), ips=repr(self.ips), index=repr(self.index)
)
# Type of an IPv4 address (a string in "xxx.xxx.xxx.xxx" format)
_IPv4Address = str
# Type of an IPv6 address (a three-tuple `(ip, flowinfo, scope_id)`)
_IPv6Address = Tuple[str, int, int]
class IP(object):
"""
Represents an IP address of an adapter.
"""
def __init__(self, ip: Union[_IPv4Address, _IPv6Address], network_prefix: int, nice_name: str) -> None:
#: IP address. For IPv4 addresses this is a string in
#: "xxx.xxx.xxx.xxx" format. For IPv6 addresses this
#: is a three-tuple `(ip, flowinfo, scope_id)`, where
#: `ip` is a string in the usual collon separated
#: hex format.
self.ip = ip
#: Number of bits of the IP that represent the
#: network. For a `255.255.255.0` netmask, this
#: number would be `24`.
self.network_prefix = network_prefix
#: Human readable name for this IP.
#: On Linux is this currently the same as the adapter name.
#: On Windows this is the name of the network connection
#: as configured in the system control panel.
self.nice_name = nice_name
@property
def is_IPv4(self) -> bool:
"""
Returns `True` if this IP is an IPv4 address and `False`
if it is an IPv6 address.
"""
return not isinstance(self.ip, tuple)
@property
def is_IPv6(self) -> bool:
"""
Returns `True` if this IP is an IPv6 address and `False`
if it is an IPv4 address.
"""
return isinstance(self.ip, tuple)
def __repr__(self) -> str:
return "IP(ip={ip}, network_prefix={network_prefix}, nice_name={nice_name})".format(
ip=repr(self.ip), network_prefix=repr(self.network_prefix), nice_name=repr(self.nice_name)
)
if platform.system() == "Darwin" or "BSD" in platform.system():
# BSD derived systems use marginally different structures
# than either Linux or Windows.
# I still keep it in `shared` since we can use
# both structures equally.
class sockaddr(ctypes.Structure):
_fields_ = [
('sa_len', ctypes.c_uint8),
('sa_familiy', ctypes.c_uint8),
('sa_data', ctypes.c_uint8 * 14),
]
class sockaddr_in(ctypes.Structure):
_fields_ = [
('sa_len', ctypes.c_uint8),
('sa_familiy', ctypes.c_uint8),
('sin_port', ctypes.c_uint16),
('sin_addr', ctypes.c_uint8 * 4),
('sin_zero', ctypes.c_uint8 * 8),
]
class sockaddr_in6(ctypes.Structure):
_fields_ = [
('sa_len', ctypes.c_uint8),
('sa_familiy', ctypes.c_uint8),
('sin6_port', ctypes.c_uint16),
('sin6_flowinfo', ctypes.c_uint32),
('sin6_addr', ctypes.c_uint8 * 16),
('sin6_scope_id', ctypes.c_uint32),
]
else:
class sockaddr(ctypes.Structure): # type: ignore
_fields_ = [('sa_familiy', ctypes.c_uint16), ('sa_data', ctypes.c_uint8 * 14)]
class sockaddr_in(ctypes.Structure): # type: ignore
_fields_ = [
('sin_familiy', ctypes.c_uint16),
('sin_port', ctypes.c_uint16),
('sin_addr', ctypes.c_uint8 * 4),
('sin_zero', ctypes.c_uint8 * 8),
]
class sockaddr_in6(ctypes.Structure): # type: ignore
_fields_ = [
('sin6_familiy', ctypes.c_uint16),
('sin6_port', ctypes.c_uint16),
('sin6_flowinfo', ctypes.c_uint32),
('sin6_addr', ctypes.c_uint8 * 16),
('sin6_scope_id', ctypes.c_uint32),
]
def sockaddr_to_ip(sockaddr_ptr: 'ctypes.pointer[sockaddr]') -> Optional[Union[_IPv4Address, _IPv6Address]]:
if sockaddr_ptr:
if sockaddr_ptr[0].sa_familiy == socket.AF_INET:
ipv4 = ctypes.cast(sockaddr_ptr, ctypes.POINTER(sockaddr_in))
ippacked = bytes(bytearray(ipv4[0].sin_addr))
ip = str(ipaddress.ip_address(ippacked))
return ip
elif sockaddr_ptr[0].sa_familiy == socket.AF_INET6:
ipv6 = ctypes.cast(sockaddr_ptr, ctypes.POINTER(sockaddr_in6))
flowinfo = ipv6[0].sin6_flowinfo
ippacked = bytes(bytearray(ipv6[0].sin6_addr))
ip = str(ipaddress.ip_address(ippacked))
scope_id = ipv6[0].sin6_scope_id
return (ip, flowinfo, scope_id)
return None
def ipv6_prefixlength(address: ipaddress.IPv6Address) -> int:
prefix_length = 0
for i in range(address.max_prefixlen):
if int(address) >> i & 1:
prefix_length = prefix_length + 1
return prefix_length

145
RNS/vendor/ifaddr/_win32.py vendored Normal file
View File

@ -0,0 +1,145 @@
# Copyright (c) 2014 Stefan C. Mueller
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to
# deal in the Software without restriction, including without limitation the
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
# sell copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.
import ctypes
from ctypes import wintypes
from typing import Iterable, List
import ifaddr._shared as shared
NO_ERROR = 0
ERROR_BUFFER_OVERFLOW = 111
MAX_ADAPTER_NAME_LENGTH = 256
MAX_ADAPTER_DESCRIPTION_LENGTH = 128
MAX_ADAPTER_ADDRESS_LENGTH = 8
AF_UNSPEC = 0
class SOCKET_ADDRESS(ctypes.Structure):
_fields_ = [('lpSockaddr', ctypes.POINTER(shared.sockaddr)), ('iSockaddrLength', wintypes.INT)]
class IP_ADAPTER_UNICAST_ADDRESS(ctypes.Structure):
pass
IP_ADAPTER_UNICAST_ADDRESS._fields_ = [
('Length', wintypes.ULONG),
('Flags', wintypes.DWORD),
('Next', ctypes.POINTER(IP_ADAPTER_UNICAST_ADDRESS)),
('Address', SOCKET_ADDRESS),
('PrefixOrigin', ctypes.c_uint),
('SuffixOrigin', ctypes.c_uint),
('DadState', ctypes.c_uint),
('ValidLifetime', wintypes.ULONG),
('PreferredLifetime', wintypes.ULONG),
('LeaseLifetime', wintypes.ULONG),
('OnLinkPrefixLength', ctypes.c_uint8),
]
class IP_ADAPTER_ADDRESSES(ctypes.Structure):
pass
IP_ADAPTER_ADDRESSES._fields_ = [
('Length', wintypes.ULONG),
('IfIndex', wintypes.DWORD),
('Next', ctypes.POINTER(IP_ADAPTER_ADDRESSES)),
('AdapterName', ctypes.c_char_p),
('FirstUnicastAddress', ctypes.POINTER(IP_ADAPTER_UNICAST_ADDRESS)),
('FirstAnycastAddress', ctypes.c_void_p),
('FirstMulticastAddress', ctypes.c_void_p),
('FirstDnsServerAddress', ctypes.c_void_p),
('DnsSuffix', ctypes.c_wchar_p),
('Description', ctypes.c_wchar_p),
('FriendlyName', ctypes.c_wchar_p),
]
iphlpapi = ctypes.windll.LoadLibrary("Iphlpapi") # type: ignore
def enumerate_interfaces_of_adapter(
nice_name: str, address: IP_ADAPTER_UNICAST_ADDRESS
) -> Iterable[shared.IP]:
# Iterate through linked list and fill list
addresses = [] # type: List[IP_ADAPTER_UNICAST_ADDRESS]
while True:
addresses.append(address)
if not address.Next:
break
address = address.Next[0]
for address in addresses:
ip = shared.sockaddr_to_ip(address.Address.lpSockaddr)
assert ip is not None, f'sockaddr_to_ip({address.Address.lpSockaddr}) returned None'
network_prefix = address.OnLinkPrefixLength
yield shared.IP(ip, network_prefix, nice_name)
def get_adapters(include_unconfigured: bool = False) -> Iterable[shared.Adapter]:
# Call GetAdaptersAddresses() with error and buffer size handling
addressbuffersize = wintypes.ULONG(15 * 1024)
retval = ERROR_BUFFER_OVERFLOW
while retval == ERROR_BUFFER_OVERFLOW:
addressbuffer = ctypes.create_string_buffer(addressbuffersize.value)
retval = iphlpapi.GetAdaptersAddresses(
wintypes.ULONG(AF_UNSPEC),
wintypes.ULONG(0),
None,
ctypes.byref(addressbuffer),
ctypes.byref(addressbuffersize),
)
if retval != NO_ERROR:
raise ctypes.WinError() # type: ignore
# Iterate through adapters fill array
address_infos = [] # type: List[IP_ADAPTER_ADDRESSES]
address_info = IP_ADAPTER_ADDRESSES.from_buffer(addressbuffer)
while True:
address_infos.append(address_info)
if not address_info.Next:
break
address_info = address_info.Next[0]
# Iterate through unicast addresses
result = [] # type: List[shared.Adapter]
for adapter_info in address_infos:
# We don't expect non-ascii characters here, so encoding shouldn't matter
name = adapter_info.AdapterName.decode()
nice_name = adapter_info.Description
index = adapter_info.IfIndex
if adapter_info.FirstUnicastAddress:
ips = enumerate_interfaces_of_adapter(
adapter_info.FriendlyName, adapter_info.FirstUnicastAddress[0]
)
ips = list(ips)
result.append(shared.Adapter(name, nice_name, ips, index=index))
elif include_unconfigured:
result.append(shared.Adapter(name, nice_name, [], index=index))
return result

36
RNS/vendor/ifaddr/netifaces.py vendored Normal file
View File

@ -0,0 +1,36 @@
# netifaces compatibility layer
import ifaddr
import socket
from typing import List
AF_INET6 = socket.AF_INET6.value
AF_INET = socket.AF_INET.value
def interfaces() -> List[str]:
adapters = ifaddr.get_adapters(include_unconfigured=True)
return [a.name for a in adapters]
def ifaddresses(ifname) -> dict:
adapters = ifaddr.get_adapters(include_unconfigured=True)
ifa = {}
for a in adapters:
if a.name == ifname:
ipv4s = []
ipv6s = []
for ip in a.ips:
t = {}
if ip.is_IPv4:
t["addr"] = ip.ip
ipv4s.append(t)
if ip.is_IPv6:
t["addr"] = ip.ip[0]
ipv6s.append(t)
if len(ipv4s) > 0:
ifa[AF_INET] = ipv4s
if len(ipv6s) > 0:
ifa[AF_INET6] = ipv6s
return ifa

0
RNS/vendor/ifaddr/py.typed vendored Normal file
View File

48
RNS/vendor/ifaddr/test_ifaddr.py vendored Normal file
View File

@ -0,0 +1,48 @@
# Copyright (C) 2015 Stefan C. Mueller
import unittest
import pytest
import ifaddr
import ifaddr.netifaces
try:
import netifaces
except ImportError:
skip_netifaces = True
else:
skip_netifaces = False
class TestIfaddr(unittest.TestCase):
"""
Unittests for :mod:`ifaddr`.
There isn't much unit-testing that can be done without making assumptions
on the system or mocking of operating system APIs. So this just contains
a sanity check for the moment.
"""
def test_get_adapters_contains_localhost(self) -> None:
found = False
adapters = ifaddr.get_adapters()
for adapter in adapters:
for ip in adapter.ips:
if ip.ip == "127.0.0.1":
found = True
self.assertTrue(found, "No adapter has IP 127.0.0.1: %s" % str(adapters))
@pytest.mark.skipif(skip_netifaces, reason='netifaces not installed')
def test_netifaces_compatibility() -> None:
interfaces = ifaddr.netifaces.interfaces()
assert interfaces == netifaces.interfaces()
# TODO: implement those as well
# for interface in interfaces:
# print(interface)
# assert ifaddr.netifaces.ifaddresses(interface) == netifaces.ifaddresses(interface)
# assert ifaddr.netifaces.gateways() == netifaces.gateways()