""" openvpn supervisor """ import asyncio import tempfile import os import time # import openvpn_api import signal import psutil import subprocess from datetime import datetime import humanize from tornado.template import Template import stat import shutil import tailer import logging from process_manager import ProcessManager def generate_config(in_fp, cfg): def change_item(config, item_key, item_content): output_lines = [] changed = False for i in config.splitlines(): if i.startswith(item_key+" ") or i == item_key: output_lines.append(f"{item_key} {item_content}") changed = True else: output_lines.append(i) if not changed: output_lines.append(f"{item_key} {item_content}") return "\n".join(output_lines) # _, path = tempfile.mkstemp() with open(in_fp, "r") as in_f: config = in_f.read() for key, value in cfg.items(): config = change_item(config, key, value) return config IDLE = "idle" RUNNING = "running" class Openvpn: """ ovpn_config:{ cfg_fp: file path of ovpn configuration name: name of this openvpn instance additional_cfg: additional changes to be made to the existing ovpn file, usually as a way to feed in auth details enabled: whether this instance should be started right away, default: False restart_interval: time to wait before restarting openvpn after it exits, unit is second, default: 300 ping_timeout: ping timeout for quality detection, unit is ms, default: 200 n_up: number of consecutive successful pings to bring connection status up, default: 5 n_down: number of consecutive failed pings to bring connection status down, default: 2 } env_config:{ monitor_interval: time interval between connection status monitoring in second, default: 5 folder_path: path of the session folder script_template_fp: file path of script template proxycfg_template_fp: filepath of 3proxy config } """ openvpn_pm: ProcessManager proxy_pm: ProcessLookupError def __init__(self, idx, ovpn_config, env_config, loop=None): self.idx = idx self.interface = f"tun{idx}" self.status = IDLE self.proc = None self.exit_future = None self.run_task = [] self.pid_fp = None if loop: self.loop = loop else: self.loop = asyncio.get_event_loop() self.interface_up = False self.load_cfg(ovpn_config, env_config) def load_cfg(self, ovpn_config, env_config): self.ovpn_config = ovpn_config self.env_config = env_config self.cfg_fp = ovpn_config["cfg_fp"] self.name = ovpn_config["name"] self.additional_cfg = ovpn_config.get("additional_cfg", {}) self.ping_timeout = int(ovpn_config.get("ping_timeout", 200)) self.n_up = int(ovpn_config.get("n_up", 5)) self.n_down = int(ovpn_config.get("n_down", 2)) assert self.ping_timeout > 0, "invalid ping_timeout value" assert self.n_up >= 0 and self.n_down >= 0, "invalid n_up or n_down value" self.restart_interval = int(ovpn_config.get("restart_interval", 300)) assert self.restart_interval > 0, "invalid restart_interval value" self.monitor_interval = env_config.get("monitor_interval", 5) self.folder_path = env_config["folder_path"] self.script_template_fp = env_config["script_template_fp"] self.proxycfg_template_fp = env_config["proxycfg_template_fp"] self.io_stat_fp = os.path.join(self.folder_path, "io_stat.txt") self.ping_stat_fp = os.path.join(self.folder_path, "ping_stat.txt") enabled = ovpn_config.get("enabled", False) self.openvpn_pm = ProcessManager( self.restart_interval, group="openvpn", loop=self.loop) self.proxy_pm = ProcessManager( 5, group=f"vpn{self.idx}", loop=self.loop) if enabled: self.start() def export_cfg(self): ovpn_config = { "cfg_fp": self.cfg_fp, "name": self.name, "additional_cfg": self.additional_cfg, "enabled": self.status is RUNNING, "ping_timeout": self.ping_timeout, "n_up": self.n_up, "n_down": self.n_down } return ovpn_config def generate_script(self): self.script_fp = os.path.join(self.folder_path, "script.sh") with open(self.script_template_fp, "r") as template_f: buf = template_f.read() template = Template(buf) script_log_fp = os.path.join(self.folder_path, "script_log.txt") items = { "script_log_fp": script_log_fp, "route_table_name": f"vpn{self.idx}", "route_table_id": f"{300+self.idx}", "rule_pref": f"{100+self.idx}" } script = template.generate(**items) with open(self.script_fp, "wb") as script_f: script_f.write(script) # mark executable st = os.stat(self.script_fp) os.chmod(self.script_fp, st.st_mode | stat.S_IEXEC) return self.script_fp def generate_proxycfg(self): # TODO: refactor code: proess manager proxycfg_fp = os.path.join( self.folder_path, "3proxy.cfg") with open(self.proxycfg_template_fp, "r") as template_f: buf = template_f.read() template = Template(buf) proxy_log_fp = os.path.join(self.folder_path, "proxy_log.txt") items = { "proxy_log_fp": proxy_log_fp, "port": f"{1080 + self.idx}" } proxycfg = template.generate(**items) with open(proxycfg_fp, "wb") as proxycfg_f: proxycfg_f.write(proxycfg) return proxycfg_fp def get_cfg(self): self.log_fp = os.path.join(self.folder_path, "log.txt") cfg = { "dev": self.interface, "dev-type": "tun", # TODO: add code to read dev-type # "management": f"localhost {self.management_port}", # disable management since we are not using it now "log-append": self.log_fp } cfg.update(self.additional_cfg) return cfg def generate_config_file(self): cfg = self.get_cfg() self.config_fp = os.path.join(self.folder_path, "cfg.txt") config = generate_config(self.cfg_fp, cfg) with open(self.config_fp, "w") as config_f: config_f.write(config) return self.config_fp def start(self): if self.status == IDLE: self.status = RUNNING config_fp = self.generate_config_file() script_fp = self.generate_script() proxycfg_fp = self.generate_proxycfg() openvpn_cmd = " ".join(["openvpn", "--config", config_fp, "--route-noexec", "--route-up", script_fp, "--script-security", "2", "--mute-replay-warnings"]) # TODO: remove --mute-replay-warnings proxy_cmd = " ".join(["3proxy", proxycfg_fp]) self.openvpn_pm.start(openvpn_cmd) self.proxy_pm.start(proxy_cmd) self.run_task.append(self.loop.create_task(self.monitor_task())) def get_log(self, lines=10): # regenerate log_fp _ = self.get_cfg() try: if not lines: with open(self.log_fp, "r") as log_f: return log_f.readlines() else: return tailer.tail(open(self.log_fp, "r"), lines) except: return "" def clear_folder(self): try: # removing all files in folder path, not removing folders dirpath = self.folder_path for filename in os.listdir(dirpath): filepath = os.path.join(dirpath, filename) os.remove(filepath) except Exception as err: logging.error(f"cannot remove log file, error:{err}") def stop(self, clear_folder=True): if self.status == RUNNING: self.proxy_pm.stop() self.openvpn_pm.stop() self.status = IDLE for task in self.run_task: task.cancel() self.run_task = [] self.interface_up = False if clear_folder: self.clear_folder() @property def pids(self): return [self.openvpn_pm.pid, self.proxy_pm.pid] def restart(self): self.stop() self.start() def get_io_stat(self, lines=5): try: if not lines: return open(self.io_stat_fp, "r").readlines() else: return tailer.tail(open(self.io_stat_fp, "r"), lines) except: return "" def get_ping_stat(self, lines=5): try: if not lines: return open(self.ping_stat_fp, "r").readlines() else: return tailer.tail(open(self.ping_stat_fp, "r"), lines) except: return "" def get_latest_speed(self): try: buf = self.get_io_stat(2) buf = [[float(i) for i in l.split(",")] for l in buf] assert len(buf) >= 2, "not enough points, need >=2" delta_t = buf[1][0] - buf[0][0] down_speed = (buf[1][1] - buf[0][1])/delta_t up_speed = (buf[1][2] - buf[0][2])/delta_t down_speed = humanize.naturalsize(down_speed) up_speed = humanize.naturalsize(up_speed) return f"Down: {down_speed}/s, Up: {up_speed}/s" except: return "no data" def monitor_action(self): """ monitor the I/O statistics and connectivity TODO: seperate module for monitoring """ io_count = psutil.net_io_counters(pernic=True) if self.interface not in io_count: self.interface_up = False logging.debug(f"interface {self.interface} is not ready.") return self.interface_up = True io_count = io_count[self.interface] open(self.io_stat_fp, "a").write( f"{datetime.utcnow().timestamp()},{io_count.bytes_recv},{io_count.bytes_sent}\n") # command = ["ping", "-w", "1", "-c", "1", "8.8.8.8"] command = ["fping", "-C", "1", f"-t{self.ping_timeout}", "-I", self.interface, "-q", "8.8.8.8"] # TODO: configurable ping host p = subprocess.Popen( command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) result = p.communicate() if p.returncode == 0: result = result[1].decode().strip().split(':')[-1] else: result = "-1" open(self.ping_stat_fp, "a").write( f"{datetime.utcnow().timestamp()},{result}\n") async def monitor_task(self): while self.status == RUNNING: try: self.monitor_action() except Exception as e: logging.error(f"monitoring error: {e}") await asyncio.sleep(self.monitor_interval)