From 7a1fdfaeab18dad57e522110066a4a250fc270e4 Mon Sep 17 00:00:00 2001 From: mantaohuang Date: Sat, 1 Aug 2020 23:28:39 -0400 Subject: [PATCH] refactoring process manager --- o_manager.py | 3 +- openvpn.py | 76 ++++++++--------------------- process_manager.py | 118 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 140 insertions(+), 57 deletions(-) create mode 100644 process_manager.py diff --git a/o_manager.py b/o_manager.py index 9fb6b97..b7c6253 100644 --- a/o_manager.py +++ b/o_manager.py @@ -147,8 +147,7 @@ class OManager: "pids": i["op"].pids, "status": i["op"].status, "log": i["op"].get_log(lines=10), - "name": i["op"].name, - "io_mean": io_tas.get_f(numpy.mean), + "io_mean": io_tas.get_f(io_mean), "ping_mean": ping_tas.get_f(ping_mean), "ping_rate": ping_tas.get_f(ping_rate) }) diff --git a/openvpn.py b/openvpn.py index 64e7380..1d0562d 100644 --- a/openvpn.py +++ b/openvpn.py @@ -16,6 +16,8 @@ import stat import shutil import tailer +from process_manager import ProcessManager + def generate_config(in_fp, cfg): def change_item(config, item_key, item_content): @@ -49,6 +51,7 @@ class Openvpn: name: name of this openvpn instance additional_cfg: additional changes to be made to the existing ovpn file, usually as a way to feed in auth details enabled: whether this instance should be started right away, default: False + restart_interval: time to wait before restarting openvpn after it exits, unit is second, default: 300 ping_timeout: ping timeout for quality detection, unit is ms, default: 100 n_up: number of consecutive successful pings to bring connection status up, default: 5 n_down: number of consecutive failed pings to bring connection status down, default: 2 @@ -60,11 +63,12 @@ class Openvpn: proxycfg_template_fp: filepath of 3proxy config } """ + openvpn_pm: ProcessManager + proxy_pm: ProcessLookupError def __init__(self, idx, ovpn_config, env_config, loop=None): self.idx = idx self.interface = f"tun{idx}" - self.pids = [] self.status = IDLE self.proc = None self.exit_future = None @@ -85,7 +89,8 @@ class Openvpn: self.n_down = int(ovpn_config.get("n_down", 2)) assert self.ping_timeout > 0, "invalid ping_timeout value" assert self.n_up >= 0 and self.n_down >= 0, "invalid n_up or n_down value" - + self.restart_interval = int(ovpn_config.get("restart_interval", 300)) + assert self.restart_interval > 0, "invalid restart_interval value" self.monitor_interval = env_config.get("monitor_interval", 5) self.folder_path = env_config["folder_path"] self.script_template_fp = env_config["script_template_fp"] @@ -95,6 +100,10 @@ class Openvpn: self.ping_stat_fp = os.path.join(self.folder_path, "ping_stat.txt") enabled = ovpn_config.get("enabled", False) + self.openvpn_pm = ProcessManager( + self.restart_interval, group="openvpn", loop=self.loop) + self.proxy_pm = ProcessManager( + 5, group=f"vpn{self.idx}", loop=self.loop) if enabled: self.start() @@ -177,11 +186,11 @@ class Openvpn: openvpn_cmd = " ".join(["openvpn", "--config", config_fp, "--route-noexec", "--route-up", script_fp, "--script-security", "2", "--mute-replay-warnings"]) # TODO: remove --mute-replay-warnings - self.run_task.append(self.loop.create_task( - self.run_cmd(openvpn_cmd, group="openvpn"))) proxy_cmd = " ".join(["3proxy", proxycfg_fp]) - self.run_task.append(self.loop.create_task( - self.run_cmd(proxy_cmd, group=f"vpn{self.idx}"))) + + self.openvpn_pm.start(openvpn_cmd) + self.proxy_pm.start(proxy_cmd) + self.run_task.append(self.loop.create_task(self.monitor_task())) def get_log(self, lines=10): @@ -208,19 +217,19 @@ class Openvpn: def stop(self, clear_folder=True): if self.status == RUNNING: - for pid in self.pids: - try: - os.kill(pid, signal.SIGINT) - except Exception as err: - print("kill failed:", err) + self.proxy_pm.stop() + self.openvpn_pm.stop() self.status = IDLE for task in self.run_task: task.cancel() - self.pids = [] self.run_task = [] if clear_folder: self.clear_folder() + @property + def pids(self): + return [self.openvpn_pm.pid, self.proxy_pm.pid] + def restart(self): self.stop() self.start() @@ -290,46 +299,3 @@ class Openvpn: except Exception as e: print("monitoring error:", e) await asyncio.sleep(self.monitor_interval) - - async def run_cmd(self, cmd, group): - # print(f"run: {cmd}") - while self.status == RUNNING: - # print("create proc") - print(self.status) - pid_fp = os.path.join(self.folder_path, f"{cmd[0]}_pid.txt") - try: - os.remove(pid_fp) - except: - pass - shell = f"sg {group} -c \"echo \\$\\$ > {pid_fp}; exec {cmd}\"" - # print(shell) - proc = await asyncio.create_subprocess_shell( - shell, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE) - print("openpvn started") - self.proc = proc - for _ in range(3): - await asyncio.sleep(1) - got_pid = False - try: - PID = int(open(pid_fp, "r").read()) - got_pid = True - except: - got_pid = False - if got_pid: - break - if not got_pid: - print("error, cannot get pid") - break - print(f"openvpn pid: {PID}") - self.pids.append(PID) - stdout, stderr = await proc.communicate() - print(f'[{cmd!r} exited with {proc.returncode}]') - if stdout: - print(f'[stdout]\n{stdout.decode()}') - if stderr: - print(f'[stderr]\n{stderr.decode()}') - if PID in self.pids: - self.pids.remove(PID) - await asyncio.sleep(5*60) diff --git a/process_manager.py b/process_manager.py new file mode 100644 index 0000000..07888c4 --- /dev/null +++ b/process_manager.py @@ -0,0 +1,118 @@ +import asyncio +import os +import signal +import tempfile +import time +import logging +from enum import Enum + + +class ProcessManagerState(Enum): + IDLE = 0 + RUNNING = 1 + + +class ProcessManager: + """ + Manages a process and auto restart if process exits. Running as group is supported. + + cmd: command of the process + restart_interval: seconds to wait after process exits before restarting the process + group: run command as group. group will not be changed if None is used. + loop: asyncio event loop. default event loop will be used if None is provided. + """ + cmd: str + task: asyncio.Task + state: ProcessManagerState + pid: int + restart_interval: int + + def __init__(self, restart_interval, group=None, loop=None): + self.cmd = None + self.group = group + self.restart_interval = restart_interval + + self.state = ProcessManagerState.IDLE + self.pid = 0 + if loop: + self.loop = loop + else: + self.loop = asyncio.get_event_loop() + + def set_cmd(self, cmd): + self.cmd = cmd + + async def run_cmd(self): + logging.debug(f"Process Runner starting, cmd: {self.cmd}") + while self.state == ProcessManagerState.RUNNING: + if self.group: + with tempfile.TemporaryDirectory() as tmpdirname: + pid_fp = os.path.join(tmpdirname, "pid.txt") + try: + os.remove(pid_fp) + except: + pass + shell = f"sg {self.group} -c \"echo \\$\\$ > {pid_fp}; exec {self.cmd}\"" + proc = await asyncio.create_subprocess_shell( + shell, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE) + logging.info(f"Process started, cmd: {self.cmd}") + self.proc = proc + for _ in range(3): + await asyncio.sleep(1) + got_pid = False + try: + PID = int(open(pid_fp, "r").read()) + got_pid = True + except: + got_pid = False + if got_pid: + break + if not got_pid: + logging.error("cannot get pid") + break + else: + proc = await asyncio.create_subprocess_shell( + self.cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE) + PID = proc.pid + logging.info(f"pid: {PID}") + self.pid = PID + stdout, stderr = await proc.communicate() + logging.info(f'[{self.cmd!r} exited with {proc.returncode}]') + if stdout: + logging.info(f'[stdout]\n{stdout.decode()}') + if stderr: + logging.info(f'[stderr]\n{stderr.decode()}') + self.pid = 0 + await asyncio.sleep(self.restart_interval) + + def start(self, cmd=None): + assert self.state == ProcessManagerState.IDLE, "process manager should be idle before start" + if cmd: + self.cmd = cmd + assert self.cmd, "cmd has not been assigned" + self.state = ProcessManagerState.RUNNING + self.task = self.loop.create_task(self.run_cmd()) + + def stop(self): + assert self.state == ProcessManagerState.RUNNING, "process manager is not running" + self.state = ProcessManagerState.IDLE + self.task.cancel() + try: + os.kill(self.pid, signal.SIGINT) + except Exception as err: + logging.warning(f"kill failed: {err}") + + self.task.cancel() + self.pid = 0 + self.task = None + + +if __name__ == "__main__": + logging.basicConfig(level=logging.DEBUG) + pm = ProcessManager(5, group="mantao") + pm.start("echo haha") + asyncio.get_event_loop().run_until_complete(asyncio.sleep(5))