""" openvpn supervisor """ import asyncio import tempfile import os import time import openvpn_api import signal import psutil import subprocess from datetime import datetime import humanize from tornado.template import Template import stat import shutil import tailer def generate_config(in_fp, cfg): def change_item(config, item_key, item_content): output_lines = [] changed = False for i in config.splitlines(): if i.startswith(item_key+" ") or i == item_key: output_lines.append(f"{item_key} {item_content}") changed = True else: output_lines.append(i) if not changed: output_lines.append(f"{item_key} {item_content}") return "\n".join(output_lines) # _, path = tempfile.mkstemp() with open(in_fp, "r") as in_f: config = in_f.read() for key, value in cfg.items(): config = change_item(config, key, value) return config IDLE = "idle" RUNNING = "running" class Openvpn: def __init__(self, cfg_fp, idx, folder_path, management_port, template_fp, proxycfg_template_fp, name=None, additional_cfg={}, loop=None): self.cfg_fp = cfg_fp self.idx = idx self.interface = f"tun{idx}" self.folder_path = folder_path self.management_port = management_port self.pids = [] self.status = IDLE self.proc = None self.exit_future = None self.additional_cfg = additional_cfg self.run_task = [] self.openvpn_api = None self.name = name self.pid_fp = None # TODO: update paths function self.io_stat_fp = os.path.join(self.folder_path, "io_stat.txt") self.ping_stat_fp = os.path.join(self.folder_path, "ping_stat.txt") self.template_fp = template_fp self.proxycfg_template_fp = proxycfg_template_fp if loop: self.loop = loop else: self.loop = asyncio.get_event_loop() def generate_script(self): self.script_fp = os.path.join(self.folder_path, "script.sh") with open(self.template_fp, "r") as template_f: buf = template_f.read() template = Template(buf) script_log_fp = os.path.join(self.folder_path, "script_log.txt") items = { "script_log_fp": script_log_fp, "route_table_name": f"vpn{self.idx}", "route_table_id": f"{300+self.idx}", "rule_pref": f"{100+self.idx}" } script = template.generate(**items) with open(self.script_fp, "wb") as script_f: script_f.write(script) # mark executable st = os.stat(self.script_fp) os.chmod(self.script_fp, st.st_mode | stat.S_IEXEC) return self.script_fp def generate_proxycfg(self): # TODO: refactor code: proess manager proxycfg_fp = os.path.join( self.folder_path, "3proxy.cfg") with open(self.proxycfg_template_fp, "r") as template_f: buf = template_f.read() template = Template(buf) proxy_log_fp = os.path.join(self.folder_path, "proxy_log.txt") items = { "proxy_log_fp": proxy_log_fp, "port": f"{1080 + self.idx}" } proxycfg = template.generate(**items) with open(proxycfg_fp, "wb") as proxycfg_f: proxycfg_f.write(proxycfg) return proxycfg_fp def get_cfg(self): self.log_fp = os.path.join(self.folder_path, "log.txt") cfg = { "dev": self.interface, "dev-type": "tun", # TODO: add code to read dev-type "management": f"localhost {self.management_port}", "log-append": self.log_fp } cfg.update(self.additional_cfg) return cfg def generate_config_file(self): cfg = self.get_cfg() self.config_fp = os.path.join(self.folder_path, "cfg.txt") config = generate_config(self.cfg_fp, cfg) with open(self.config_fp, "w") as config_f: config_f.write(config) return self.config_fp def start(self): if self.status == IDLE: self.status = RUNNING config_fp = self.generate_config_file() script_fp = self.generate_script() proxycfg_fp = self.generate_proxycfg() openvpn_cmd = " ".join(["openvpn", "--config", config_fp, "--route-noexec", "--route-up", script_fp, "--script-security", "2", "--mute-replay-warnings"]) # TODO: remove --mute-replay-warnings self.run_task.append(self.loop.create_task( self.run_cmd(openvpn_cmd, group="openvpn"))) proxy_cmd = " ".join(["3proxy", proxycfg_fp]) self.run_task.append(self.loop.create_task( self.run_cmd(proxy_cmd, group=f"vpn{self.idx}"))) self.run_task.append(self.loop.create_task(self.monitor_task())) def get_log(self, lines=10): # regenerate log_fp _ = self.get_cfg() try: if not lines: with open(self.log_fp, "r") as log_f: return log_f.readlines() else: return tailer.tail(open(self.log_fp, "r"), lines) except: return "" def clear_folder(self): try: # removing all files in folder path, not removing folders dirpath = self.folder_path for filename in os.listdir(dirpath): filepath = os.path.join(dirpath, filename) os.remove(filepath) except Exception as err: print("cannot remove log file, error:", err) def stop(self, clear_folder=True): if self.status == RUNNING: for pid in self.pids: try: os.kill(pid, signal.SIGINT) except Exception as err: print("kill failed:", err) self.status = IDLE for task in self.run_task: task.cancel() self.pids = [] self.run_task = [] if clear_folder: self.clear_folder() def restart(self): self.stop() self.start() def get_io_stat(self, lines=5): try: if not lines: return open(self.io_stat_fp, "r").readlines() else: return tailer.tail(open(self.io_stat_fp, "r"), lines) except: return "" def get_ping_stat(self, lines=5): try: if not lines: return open(self.ping_stat_fp, "r").readlines() else: return tailer.tail(open(self.ping_stat_fp, "r"), lines) except: return "" def get_latest_speed(self): try: buf = self.get_io_stat(2) buf = [[float(i) for i in l.split(",")] for l in buf] assert len(buf) >= 2, "not enough points, need >=2" delta_t = buf[1][0] - buf[0][0] down_speed = (buf[1][1] - buf[0][1])/delta_t up_speed = (buf[1][2] - buf[0][2])/delta_t down_speed = humanize.naturalsize(down_speed) up_speed = humanize.naturalsize(up_speed) return f"Down: {down_speed}/s, Up: {up_speed}/s" except: return "no data" def monitor_action(self): """ monitor the I/O statistics and connectivity TODO: seperate module for monitoring """ io_count = psutil.net_io_counters(pernic=True) assert self.interface in io_count, "cannot find interface, nic is not there" io_count = io_count[self.interface] open(self.io_stat_fp, "a").write( f"{datetime.utcnow().timestamp()},{io_count.bytes_recv},{io_count.bytes_sent}\n") #command = ["ping", "-w", "1", "-c", "1", "8.8.8.8"] command = ["fping", "-C", "1", "-t200", "-I", self.interface, "-q", "8.8.8.8"] p = subprocess.Popen( command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) result = p.communicate() if p.returncode == 0: result = result[1].decode().strip().split(':')[-1] else: result = "-1" open(self.ping_stat_fp, "a").write( f"{datetime.utcnow().timestamp()},{result}\n") async def monitor_task(self): #print("running monitor task") while self.status == RUNNING: try: self.monitor_action() except Exception as e: print("monitoring error:", e) await asyncio.sleep(5) async def run_cmd(self, cmd, group): #print(f"run: {cmd}") while self.status == RUNNING: #print("create proc") print(self.status) pid_fp = os.path.join(self.folder_path, f"{cmd[0]}_pid.txt") try: os.remove(pid_fp) except: pass shell = f"sg {group} -c \"echo \\$\\$ > {pid_fp}; exec {cmd}\"" # print(shell) proc = await asyncio.create_subprocess_shell( shell, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) print("openpvn started") self.proc = proc for i in range(3): await asyncio.sleep(1) got_pid = False try: PID = int(open(pid_fp, "r").read()) got_pid = True except: got_pid = False if got_pid: break if not got_pid: print("error, cannot get pid") break print(f"openvpn pid: {PID}") self.pids.append(PID) stdout, stderr = await proc.communicate() print(f'[{cmd!r} exited with {proc.returncode}]') if stdout: print(f'[stdout]\n{stdout.decode()}') if stderr: print(f'[stderr]\n{stderr.decode()}') if PID in self.pids: self.pids.remove(PID) await asyncio.sleep(5*60) if __name__ == "__main__": folder_fp = "/home/mantao/Desktop/t/" cfg_fp = "/home/mantao/Desktop/t/TCP_Files/UK2-TCP.ovpn" o1 = Openvpn(cfg_fp, 0, folder_fp, 8001, "script.sh.template", "3proxy.cfg", additional_cfg={ "auth-user-pass": "/home/mantao/Desktop/t/fast.txt"}) # o1.start()