Added recovery to local shared interfaces if master RNS instance is restarted

This commit is contained in:
Mark Qvist 2021-12-10 18:32:24 +01:00
parent e1e31692d7
commit df39cff520
5 changed files with 82 additions and 22 deletions

View File

@ -299,8 +299,6 @@ class AX25KISSInterface(Interface):
escape = False
data_buffer = data_buffer+bytes([byte])
elif (command == KISS.CMD_READY):
# TODO: add timeout and reset if ready
# command never arrives
self.process_queue()
else:
time_since_last = int(time.time()*1000) - last_read_ms

View File

@ -245,8 +245,6 @@ class AutoInterface(Interface):
def refresh_peer(self, addr):
self.peers[addr][1] = time.time()
# TODO: Remove at some point
# RNS.log(str(self)+" refreshed peer "+str(addr)+" on "+str(self.peers[addr][0]), RNS.LOG_EXTREME)
def processIncoming(self, data):
self.rxb += len(data)

View File

@ -22,6 +22,7 @@ class ThreadingTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
pass
class LocalClientInterface(Interface):
RECONNECT_WAIT = 3
def __init__(self, owner, name, target_port = None, connected_socket=None):
self.rxb = 0
@ -32,6 +33,9 @@ class LocalClientInterface(Interface):
self.OUT = False
self.socket = None
self.parent_interface = None
self.reconnecting = False
self.never_connected = True
self.detached = False
self.name = name
if connected_socket != None:
@ -46,11 +50,7 @@ class LocalClientInterface(Interface):
self.receives = True
self.target_ip = "127.0.0.1"
self.target_port = target_port
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.connect((self.target_ip, self.target_port))
self.is_connected_to_shared_instance = True
self.connect()
self.owner = owner
self.online = True
@ -61,6 +61,47 @@ class LocalClientInterface(Interface):
thread.setDaemon(True)
thread.start()
def connect(self):
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.connect((self.target_ip, self.target_port))
self.online = True
self.is_connected_to_shared_instance = True
self.never_connected = False
return True
def reconnect(self):
if self.is_connected_to_shared_instance:
if not self.reconnecting:
self.reconnecting = True
attempts = 0
while not self.online:
time.sleep(LocalClientInterface.RECONNECT_WAIT)
attempts += 1
try:
self.connect()
except Exception as e:
RNS.log("Connection attempt for "+str(self)+" failed: "+str(e), RNS.LOG_DEBUG)
if not self.never_connected:
RNS.log("Reconnected TCP socket for "+str(self)+".", RNS.LOG_INFO)
self.reconnecting = False
thread = threading.Thread(target=self.read_loop)
thread.setDaemon(True)
thread.start()
RNS.Transport.shared_connection_reappeared()
else:
RNS.log("Attempt to reconnect on a non-initiator shared local interface. This should not happen.", RNS.LOG_ERROR)
raise IOError("Attempt to reconnect on a non-initiator local interface")
def processIncoming(self, data):
self.rxb += len(data)
if hasattr(self, "parent_interface") and self.parent_interface != None:
@ -68,6 +109,7 @@ class LocalClientInterface(Interface):
self.owner.inbound(data, self)
def processOutgoing(self, data):
if self.online:
while self.writing:
@ -119,8 +161,14 @@ class LocalClientInterface(Interface):
escape = False
data_buffer = data_buffer+bytes([byte])
else:
RNS.log("Socket for "+str(self)+" was closed, tearing down interface", RNS.LOG_VERBOSE)
self.online = False
if self.is_connected_to_shared_instance and not self.detached:
RNS.log("Socket for "+str(self)+" was closed, attempting to reconnect...", RNS.LOG_WARNING)
RNS.Transport.shared_connection_disappeared()
self.reconnect()
else:
self.teardown(nowarning=True)
break
@ -168,13 +216,8 @@ class LocalClientInterface(Interface):
RNS.panic()
if self.is_connected_to_shared_instance:
# TODO: Maybe add automatic recovery here.
# Needs thinking through, since user needs
# to now that all connectivity has been cut
# while service is recovering. Better for
# now to take down entire stack.
if nowarning == False:
RNS.log("Lost connection to local shared RNS instance. Exiting now.", RNS.LOG_CRITICAL)
RNS.log("Permanently lost connection to local shared RNS instance. Exiting now.", RNS.LOG_CRITICAL)
RNS.exit()

View File

@ -324,10 +324,6 @@ class Resource:
self.request_next()
def get_map_hash(self, data):
# TODO: This will break if running unencrypted,
# uncompressed transfers on streams with long blocks
# of identical bytes. Doing so would be very silly
# anyways but maybe it should be handled gracefully.
return RNS.Identity.full_hash(data+self.random_hash)[:Resource.MAPHASH_LEN]
def advertise(self):

View File

@ -1490,6 +1490,31 @@ class Transport:
interface.detach()
@staticmethod
def shared_connection_disappeared():
for link in Transport.active_links:
link.teardown()
for link in Transport.pending_links:
link.teardown()
Transport.announce_table = {}
Transport.destination_table = {}
Transport.reverse_table = {}
Transport.link_table = {}
Transport.held_announces = {}
Transport.announce_handlers = []
Transport.tunnels = {}
@staticmethod
def shared_connection_reappeared():
if Transport.owner.is_connected_to_shared_instance:
for registered_destination in Transport.destinations:
if registered_destination.type == RNS.Destination.SINGLE:
registered_destination.announce(path_response=True)
@staticmethod
def exit_handler():
try: