""" openvpn supervisor """ import asyncio import tempfile import os import time import openvpn_api import signal import psutil import subprocess from datetime import datetime import humanize from tornado.template import Template import stat def generate_config(in_fp, cfg): def change_item(config, item_key, item_content): output_lines = [] changed = False for i in config.splitlines(): if i.startswith(item_key+" ") or i == item_key: output_lines.append(f"{item_key} {item_content}") changed = True else: output_lines.append(i) if not changed: output_lines.append(f"{item_key} {item_content}") return "\n".join(output_lines) # _, path = tempfile.mkstemp() with open(in_fp, "r") as in_f: config = in_f.read() for key, value in cfg.items(): config = change_item(config, key, value) return config IDLE = "idle" RUNNING = "running" class Openvpn: def __init__(self, cfg_fp, idx, folder_path, management_port, template_fp, name=None, additional_cfg={}, loop=None): self.cfg_fp = cfg_fp self.idx = idx self.interface = f"tun{idx}" self.folder_path = folder_path self.management_port = management_port self.PID = 0 self.status = IDLE self.proc = None self.exit_future = None self.additional_cfg = additional_cfg self.run_task = [] self.openvpn_api = None self.name = name self.pid_fp = None # TODO: update paths function self.io_stat_fp = os.path.join(self.folder_path, "io_stat.txt") self.ping_stat_fp = os.path.join(self.folder_path, "ping_stat.txt") self.template_fp = template_fp if loop: self.loop = loop else: self.loop = asyncio.get_event_loop() def generate_script(self): self.script_fp = os.path.join(self.folder_path, "cfg.txt") with open(self.template_fp, "r") as template_f: buf = template_f.read() template = Template(buf) script_log_fp = os.path.join(self.folder_path, "script_log.txt") items = { "script_log_fp": script_log_fp, "route_table_name": f"vpn{self.idx}", "route_table_id": f"{300+self.idx}", "rule_pref": f"{100+self.idx}" } script = template.generate(**items) with open(self.script_fp, "wb") as script_f: script_f.write(script) # mark executable st = os.stat(self.script_fp) os.chmod(self.script_fp, st.st_mode | stat.S_IEXEC) return self.script_fp def get_cfg(self): self.log_fp = os.path.join(self.folder_path, "log.txt") cfg = { "dev": self.interface, "dev-type": "tun", # TODO: add code to read dev-type "management": f"localhost {self.management_port}", "log-append": self.log_fp } cfg.update(self.additional_cfg) return cfg def generate_config_file(self): cfg = self.get_cfg() self.config_fp = os.path.join(self.folder_path, "cfg.txt") config = generate_config(self.cfg_fp, cfg) with open(self.config_fp, "w") as config_f: config_f.write(config) return self.config_fp def start(self): if self.status == IDLE: self.status = RUNNING config_fp = self.generate_config_file() script_fp = self.generate_script() cmd = " ".join(["openvpn", "--config", config_fp, "--route-noexec", "--route-up", script_fp, "--script-security", "2", "--mute-replay-warnings"]) # TODO: remove --mute-replay-warnings self.run_task.append(self.loop.create_task(self.run(cmd))) self.run_task.append(self.loop.create_task(self.monitor_task())) def get_log(self): # regenerate log_fp _ = self.get_cfg() try: with open(self.log_fp, "r") as log_f: return log_f.read() except: return "" def clear_log(self): os.remove(self.log_fp) async def stop(self): if self.status == RUNNING: try: os.kill(self.PID, signal.SIGINT) except Exception as err: print("kill failed:", err) self.status = IDLE for task in self.run_task: task.cancel() self.run_task = [] try: os.remove(self.log_fp) except Exception as err: print("cannot remove log file, error:", err) async def restart(self): await self.stop() self.start() def get_io_stat(self): try: return open(self.io_stat_fp, "r").read() except: return "" def get_ping_stat(self): try: return open(self.ping_stat_fp, "r").read() except: return "" def get_latest_speed(self): try: buf = open(self.io_stat_fp, "r").readlines() buf = [[float(i) for i in l.split(",")] for l in buf[-2:]] assert len(buf) >= 2, "not enough points, need >=2" delta_t = buf[1][0] - buf[0][0] down_speed = (buf[1][1] - buf[0][1])/delta_t up_speed = (buf[1][2] - buf[0][2])/delta_t down_speed = humanize.naturalsize(down_speed) up_speed = humanize.naturalsize(up_speed) return f"Down: {down_speed}/s, Up: {up_speed}/s" except: return "no data" def monitor_action(self): """ monitor the I/O statistics and connectivity TODO: seperate module for monitoring """ io_count = psutil.net_io_counters(pernic=True) assert self.interface in io_count, "cannot find interface, nic is not there" io_count = io_count[self.interface] open(self.io_stat_fp, "a").write( f"{datetime.utcnow().timestamp()},{io_count.bytes_recv},{io_count.bytes_sent}\n") #command = ["ping", "-w", "1", "-c", "1", "8.8.8.8"] command = ["fping", "-C", "1", "-t200", "-I", self.interface, "-q", "8.8.8.8"] p = subprocess.Popen( command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) result = p.communicate() if p.returncode == 0: result = result[1].decode().strip().split(':')[-1] else: result = "-1" open(self.ping_stat_fp, "a").write( f"{datetime.utcnow().timestamp()},{result}\n") async def monitor_task(self): print("running monitor task") while self.status == RUNNING: try: self.monitor_action() except Exception as e: print("monitoring error:", e) await asyncio.sleep(5) async def run(self, cmd, group="openvpn"): print(f"run: {cmd}") self.exit_future = asyncio.Future() while self.status == RUNNING: print("create proc") print(self.status) self.pid_fp = os.path.join(self.folder_path, "pid.txt") try: os.remove(self.pid_fp) except: pass shell = f"sg {group} -c \"echo \\$\\$ > {self.pid_fp}; exec {cmd}\"" print(shell) proc = await asyncio.create_subprocess_shell( shell, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) print("started") self.proc = proc for i in range(3): await asyncio.sleep(1) got_pid = False try: self.PID = int(open(self.pid_fp, "r").read()) got_pid = True except: got_pid = False if got_pid: break if not got_pid: print("error, cannot get pid") break self.openvpn_api = openvpn_api.VPN( 'localhost', int(self.management_port)) print(f"pid: {self.PID}") stdout, stderr = await proc.communicate() print(f'[{cmd!r} exited with {proc.returncode}]') if stdout: print(f'[stdout]\n{stdout.decode()}') if stderr: print(f'[stderr]\n{stderr.decode()}') await asyncio.sleep(5) self.exit_future.set_result(True) self.status = IDLE if __name__ == "__main__": folder_fp = "/home/mantao/Desktop/t/" cfg_fp = "/home/mantao/Desktop/t/TCP_Files/UK2-TCP.ovpn" o1 = Openvpn(cfg_fp, 0, folder_fp, 8001, "script.sh.template", additional_cfg={ "auth-user-pass": "/home/mantao/Desktop/t/fast.txt"}) # o1.start()