import configparser config = configparser.ConfigParser() config.read('config.ini') config = { 'endpoint' : config.get('General', 'endpoint') } import logging import subprocess import threading import time import requests # index being the scan_id ping_is_in_progress = { 0: False } # use persistent connection, this is MUCH faster session = requests.Session() # holds the result of all pings that are waiting to be sent to the server completed_pings = [] # used to prevent race conditions lock = threading.Lock() def main(): global completed_pings global session global lock logging.basicConfig(level=logging.INFO, format='[%(asctime)s] %(levelname)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S') logging.info("Started") last_loop_time = 0 time_since_last_status_update = 0.0 scans_in_progress = [] while True: last_loop_time = time.time() try: response = session.get(config['endpoint'] + '?p=daemon_poll') if scans_in_progress != response.json(): prev_scan_ids = [] for scan in scans_in_progress: prev_scan_ids.append(scan['id']) scans_in_progress = response.json() for scan in scans_in_progress: if scan['id'] not in prev_scan_ids: logging.info(f"Found job for host: {scan['host']}, ref_uuid: {scan['ref_uuid']}") except: logging.error("Failed to retrieve jobs!") time.sleep(5) continue for scan in scans_in_progress: ping_thread = threading.Thread( args=[scan], target=ping ) ping_thread.start() if completed_pings: lock.acquire() session.post(config['endpoint'] + '?p=daemon_bulk_update', json=completed_pings) completed_pings = [] lock.release() if time.time() > time_since_last_status_update + 10: time_since_last_status_update = time.time() logging.info(f"Summary: [Active scans: {len(scans_in_progress)}] [Active threads: {threading.active_count()}]") # make sure it stays around 5 seconds each loop time_to_sleep = last_loop_time - time.time() + 5 if time_to_sleep < 0: continue time.sleep(time_to_sleep) def report_ping_error(scan_id, error_message, pkt_transmitted = '0', pkt_received = '0'): global completed_pings global lock logging.debug('Ping error; scan_id: ' + str(scan_id) + ' Error message: ' + error_message) lock.acquire() completed_pings.append({ 'scan_id' : str(scan_id), 'pkt_transmitted' : pkt_transmitted, 'pkt_received' : pkt_received, }) lock.release() def ping(scan): global ping_is_in_progress global completed_pings global lock if scan['id'] in ping_is_in_progress and ping_is_in_progress[scan['id']] == True: return logging.debug('Ping command is already in progress, exiting thread for host: ' + scan['host']) lock.acquire() ping_is_in_progress[scan['id']] = True lock.release() pkts_to_send = '10' ping_cmd_output = subprocess.run( ['ping', '-c', pkts_to_send, '-i', '0.5', scan['host'], '-w', '5'], capture_output=True ) lock.acquire() ping_is_in_progress[scan['id']] = False lock.release() if ping_cmd_output.returncode == 2: return report_ping_error(scan['id'], 'Ping command exited with status code 2; most likely failed to resolve host', pkts_to_send ) cmd_lines = ( ping_cmd_output.stdout.decode() ).split('\n') ping_avg = '0' pkt_transmitted = '0' pkt_received = '0' try: ping_stats = cmd_lines[-3].split() ping_avgs = cmd_lines[-2].split()[3].split('/') ping_avg = ping_avgs[1] pkt_transmitted = ping_stats[0] pkt_received = ping_stats[3] except: return report_ping_error(scan['id'], 'Failed to parse ping output: \n' + ping_cmd_output.stdout.decode(), pkts_to_send ) lock.acquire() completed_pings.append({ 'scan_id' : str(scan['id']), 'pkt_transmitted' : pkt_transmitted, 'pkt_received' : pkt_received, 'ping_avg' : ping_avg, }) lock.release() if __name__ == "__main__": main()