diff --git a/RNS/Interfaces/AX25KISSInterface.py b/RNS/Interfaces/AX25KISSInterface.py index 350a1a1..005f2d5 100644 --- a/RNS/Interfaces/AX25KISSInterface.py +++ b/RNS/Interfaces/AX25KISSInterface.py @@ -299,8 +299,6 @@ class AX25KISSInterface(Interface): escape = False data_buffer = data_buffer+bytes([byte]) elif (command == KISS.CMD_READY): - # TODO: add timeout and reset if ready - # command never arrives self.process_queue() else: time_since_last = int(time.time()*1000) - last_read_ms diff --git a/RNS/Interfaces/AutoInterface.py b/RNS/Interfaces/AutoInterface.py index 0f7832c..7c5b22c 100644 --- a/RNS/Interfaces/AutoInterface.py +++ b/RNS/Interfaces/AutoInterface.py @@ -245,8 +245,6 @@ class AutoInterface(Interface): def refresh_peer(self, addr): self.peers[addr][1] = time.time() - # TODO: Remove at some point - # RNS.log(str(self)+" refreshed peer "+str(addr)+" on "+str(self.peers[addr][0]), RNS.LOG_EXTREME) def processIncoming(self, data): self.rxb += len(data) diff --git a/RNS/Interfaces/LocalInterface.py b/RNS/Interfaces/LocalInterface.py index 3508fcc..78ccba7 100644 --- a/RNS/Interfaces/LocalInterface.py +++ b/RNS/Interfaces/LocalInterface.py @@ -22,6 +22,7 @@ class ThreadingTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): pass class LocalClientInterface(Interface): + RECONNECT_WAIT = 3 def __init__(self, owner, name, target_port = None, connected_socket=None): self.rxb = 0 @@ -32,6 +33,9 @@ class LocalClientInterface(Interface): self.OUT = False self.socket = None self.parent_interface = None + self.reconnecting = False + self.never_connected = True + self.detached = False self.name = name if connected_socket != None: @@ -46,11 +50,7 @@ class LocalClientInterface(Interface): self.receives = True self.target_ip = "127.0.0.1" self.target_port = target_port - - self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.socket.connect((self.target_ip, self.target_port)) - - self.is_connected_to_shared_instance = True + self.connect() self.owner = owner self.online = True @@ -61,6 +61,47 @@ class LocalClientInterface(Interface): thread.setDaemon(True) thread.start() + def connect(self): + self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.socket.connect((self.target_ip, self.target_port)) + + self.online = True + self.is_connected_to_shared_instance = True + self.never_connected = False + + return True + + + def reconnect(self): + if self.is_connected_to_shared_instance: + if not self.reconnecting: + self.reconnecting = True + attempts = 0 + + while not self.online: + time.sleep(LocalClientInterface.RECONNECT_WAIT) + attempts += 1 + + try: + self.connect() + + except Exception as e: + RNS.log("Connection attempt for "+str(self)+" failed: "+str(e), RNS.LOG_DEBUG) + + if not self.never_connected: + RNS.log("Reconnected TCP socket for "+str(self)+".", RNS.LOG_INFO) + + self.reconnecting = False + thread = threading.Thread(target=self.read_loop) + thread.setDaemon(True) + thread.start() + RNS.Transport.shared_connection_reappeared() + + else: + RNS.log("Attempt to reconnect on a non-initiator shared local interface. This should not happen.", RNS.LOG_ERROR) + raise IOError("Attempt to reconnect on a non-initiator local interface") + + def processIncoming(self, data): self.rxb += len(data) if hasattr(self, "parent_interface") and self.parent_interface != None: @@ -68,6 +109,7 @@ class LocalClientInterface(Interface): self.owner.inbound(data, self) + def processOutgoing(self, data): if self.online: while self.writing: @@ -119,8 +161,14 @@ class LocalClientInterface(Interface): escape = False data_buffer = data_buffer+bytes([byte]) else: - RNS.log("Socket for "+str(self)+" was closed, tearing down interface", RNS.LOG_VERBOSE) - self.teardown(nowarning=True) + self.online = False + if self.is_connected_to_shared_instance and not self.detached: + RNS.log("Socket for "+str(self)+" was closed, attempting to reconnect...", RNS.LOG_WARNING) + RNS.Transport.shared_connection_disappeared() + self.reconnect() + else: + self.teardown(nowarning=True) + break @@ -168,14 +216,9 @@ class LocalClientInterface(Interface): RNS.panic() if self.is_connected_to_shared_instance: - # TODO: Maybe add automatic recovery here. - # Needs thinking through, since user needs - # to now that all connectivity has been cut - # while service is recovering. Better for - # now to take down entire stack. if nowarning == False: - RNS.log("Lost connection to local shared RNS instance. Exiting now.", RNS.LOG_CRITICAL) - + RNS.log("Permanently lost connection to local shared RNS instance. Exiting now.", RNS.LOG_CRITICAL) + RNS.exit() diff --git a/RNS/Resource.py b/RNS/Resource.py index 2b3cb64..7399f34 100644 --- a/RNS/Resource.py +++ b/RNS/Resource.py @@ -324,10 +324,6 @@ class Resource: self.request_next() def get_map_hash(self, data): - # TODO: This will break if running unencrypted, - # uncompressed transfers on streams with long blocks - # of identical bytes. Doing so would be very silly - # anyways but maybe it should be handled gracefully. return RNS.Identity.full_hash(data+self.random_hash)[:Resource.MAPHASH_LEN] def advertise(self): diff --git a/RNS/Transport.py b/RNS/Transport.py index fd366d9..0e27eb8 100755 --- a/RNS/Transport.py +++ b/RNS/Transport.py @@ -1490,6 +1490,31 @@ class Transport: interface.detach() + @staticmethod + def shared_connection_disappeared(): + for link in Transport.active_links: + link.teardown() + + for link in Transport.pending_links: + link.teardown() + + Transport.announce_table = {} + Transport.destination_table = {} + Transport.reverse_table = {} + Transport.link_table = {} + Transport.held_announces = {} + Transport.announce_handlers = [] + Transport.tunnels = {} + + + @staticmethod + def shared_connection_reappeared(): + if Transport.owner.is_connected_to_shared_instance: + for registered_destination in Transport.destinations: + if registered_destination.type == RNS.Destination.SINGLE: + registered_destination.announce(path_response=True) + + @staticmethod def exit_handler(): try: