refactoring process manager

This commit is contained in:
mantaohuang 2020-08-01 23:28:39 -04:00
parent 783b8bef8c
commit 7a1fdfaeab
3 changed files with 140 additions and 57 deletions

View File

@ -147,8 +147,7 @@ class OManager:
"pids": i["op"].pids, "pids": i["op"].pids,
"status": i["op"].status, "status": i["op"].status,
"log": i["op"].get_log(lines=10), "log": i["op"].get_log(lines=10),
"name": i["op"].name, "io_mean": io_tas.get_f(io_mean),
"io_mean": io_tas.get_f(numpy.mean),
"ping_mean": ping_tas.get_f(ping_mean), "ping_mean": ping_tas.get_f(ping_mean),
"ping_rate": ping_tas.get_f(ping_rate) "ping_rate": ping_tas.get_f(ping_rate)
}) })

View File

@ -16,6 +16,8 @@ import stat
import shutil import shutil
import tailer import tailer
from process_manager import ProcessManager
def generate_config(in_fp, cfg): def generate_config(in_fp, cfg):
def change_item(config, item_key, item_content): def change_item(config, item_key, item_content):
@ -49,6 +51,7 @@ class Openvpn:
name: name of this openvpn instance name: name of this openvpn instance
additional_cfg: additional changes to be made to the existing ovpn file, usually as a way to feed in auth details additional_cfg: additional changes to be made to the existing ovpn file, usually as a way to feed in auth details
enabled: whether this instance should be started right away, default: False enabled: whether this instance should be started right away, default: False
restart_interval: time to wait before restarting openvpn after it exits, unit is second, default: 300
ping_timeout: ping timeout for quality detection, unit is ms, default: 100 ping_timeout: ping timeout for quality detection, unit is ms, default: 100
n_up: number of consecutive successful pings to bring connection status up, default: 5 n_up: number of consecutive successful pings to bring connection status up, default: 5
n_down: number of consecutive failed pings to bring connection status down, default: 2 n_down: number of consecutive failed pings to bring connection status down, default: 2
@ -60,11 +63,12 @@ class Openvpn:
proxycfg_template_fp: filepath of 3proxy config proxycfg_template_fp: filepath of 3proxy config
} }
""" """
openvpn_pm: ProcessManager
proxy_pm: ProcessLookupError
def __init__(self, idx, ovpn_config, env_config, loop=None): def __init__(self, idx, ovpn_config, env_config, loop=None):
self.idx = idx self.idx = idx
self.interface = f"tun{idx}" self.interface = f"tun{idx}"
self.pids = []
self.status = IDLE self.status = IDLE
self.proc = None self.proc = None
self.exit_future = None self.exit_future = None
@ -85,7 +89,8 @@ class Openvpn:
self.n_down = int(ovpn_config.get("n_down", 2)) self.n_down = int(ovpn_config.get("n_down", 2))
assert self.ping_timeout > 0, "invalid ping_timeout value" assert self.ping_timeout > 0, "invalid ping_timeout value"
assert self.n_up >= 0 and self.n_down >= 0, "invalid n_up or n_down value" assert self.n_up >= 0 and self.n_down >= 0, "invalid n_up or n_down value"
self.restart_interval = int(ovpn_config.get("restart_interval", 300))
assert self.restart_interval > 0, "invalid restart_interval value"
self.monitor_interval = env_config.get("monitor_interval", 5) self.monitor_interval = env_config.get("monitor_interval", 5)
self.folder_path = env_config["folder_path"] self.folder_path = env_config["folder_path"]
self.script_template_fp = env_config["script_template_fp"] self.script_template_fp = env_config["script_template_fp"]
@ -95,6 +100,10 @@ class Openvpn:
self.ping_stat_fp = os.path.join(self.folder_path, "ping_stat.txt") self.ping_stat_fp = os.path.join(self.folder_path, "ping_stat.txt")
enabled = ovpn_config.get("enabled", False) enabled = ovpn_config.get("enabled", False)
self.openvpn_pm = ProcessManager(
self.restart_interval, group="openvpn", loop=self.loop)
self.proxy_pm = ProcessManager(
5, group=f"vpn{self.idx}", loop=self.loop)
if enabled: if enabled:
self.start() self.start()
@ -177,11 +186,11 @@ class Openvpn:
openvpn_cmd = " ".join(["openvpn", "--config", config_fp, "--route-noexec", openvpn_cmd = " ".join(["openvpn", "--config", config_fp, "--route-noexec",
"--route-up", script_fp, "--script-security", "--route-up", script_fp, "--script-security",
"2", "--mute-replay-warnings"]) # TODO: remove --mute-replay-warnings "2", "--mute-replay-warnings"]) # TODO: remove --mute-replay-warnings
self.run_task.append(self.loop.create_task(
self.run_cmd(openvpn_cmd, group="openvpn")))
proxy_cmd = " ".join(["3proxy", proxycfg_fp]) proxy_cmd = " ".join(["3proxy", proxycfg_fp])
self.run_task.append(self.loop.create_task(
self.run_cmd(proxy_cmd, group=f"vpn{self.idx}"))) self.openvpn_pm.start(openvpn_cmd)
self.proxy_pm.start(proxy_cmd)
self.run_task.append(self.loop.create_task(self.monitor_task())) self.run_task.append(self.loop.create_task(self.monitor_task()))
def get_log(self, lines=10): def get_log(self, lines=10):
@ -208,19 +217,19 @@ class Openvpn:
def stop(self, clear_folder=True): def stop(self, clear_folder=True):
if self.status == RUNNING: if self.status == RUNNING:
for pid in self.pids: self.proxy_pm.stop()
try: self.openvpn_pm.stop()
os.kill(pid, signal.SIGINT)
except Exception as err:
print("kill failed:", err)
self.status = IDLE self.status = IDLE
for task in self.run_task: for task in self.run_task:
task.cancel() task.cancel()
self.pids = []
self.run_task = [] self.run_task = []
if clear_folder: if clear_folder:
self.clear_folder() self.clear_folder()
@property
def pids(self):
return [self.openvpn_pm.pid, self.proxy_pm.pid]
def restart(self): def restart(self):
self.stop() self.stop()
self.start() self.start()
@ -290,46 +299,3 @@ class Openvpn:
except Exception as e: except Exception as e:
print("monitoring error:", e) print("monitoring error:", e)
await asyncio.sleep(self.monitor_interval) await asyncio.sleep(self.monitor_interval)
async def run_cmd(self, cmd, group):
# print(f"run: {cmd}")
while self.status == RUNNING:
# print("create proc")
print(self.status)
pid_fp = os.path.join(self.folder_path, f"{cmd[0]}_pid.txt")
try:
os.remove(pid_fp)
except:
pass
shell = f"sg {group} -c \"echo \\$\\$ > {pid_fp}; exec {cmd}\""
# print(shell)
proc = await asyncio.create_subprocess_shell(
shell,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE)
print("openpvn started")
self.proc = proc
for _ in range(3):
await asyncio.sleep(1)
got_pid = False
try:
PID = int(open(pid_fp, "r").read())
got_pid = True
except:
got_pid = False
if got_pid:
break
if not got_pid:
print("error, cannot get pid")
break
print(f"openvpn pid: {PID}")
self.pids.append(PID)
stdout, stderr = await proc.communicate()
print(f'[{cmd!r} exited with {proc.returncode}]')
if stdout:
print(f'[stdout]\n{stdout.decode()}')
if stderr:
print(f'[stderr]\n{stderr.decode()}')
if PID in self.pids:
self.pids.remove(PID)
await asyncio.sleep(5*60)

118
process_manager.py Normal file
View File

@ -0,0 +1,118 @@
import asyncio
import os
import signal
import tempfile
import time
import logging
from enum import Enum
class ProcessManagerState(Enum):
IDLE = 0
RUNNING = 1
class ProcessManager:
"""
Manages a process and auto restart if process exits. Running as group is supported.
cmd: command of the process
restart_interval: seconds to wait after process exits before restarting the process
group: run command as group. group will not be changed if None is used.
loop: asyncio event loop. default event loop will be used if None is provided.
"""
cmd: str
task: asyncio.Task
state: ProcessManagerState
pid: int
restart_interval: int
def __init__(self, restart_interval, group=None, loop=None):
self.cmd = None
self.group = group
self.restart_interval = restart_interval
self.state = ProcessManagerState.IDLE
self.pid = 0
if loop:
self.loop = loop
else:
self.loop = asyncio.get_event_loop()
def set_cmd(self, cmd):
self.cmd = cmd
async def run_cmd(self):
logging.debug(f"Process Runner starting, cmd: {self.cmd}")
while self.state == ProcessManagerState.RUNNING:
if self.group:
with tempfile.TemporaryDirectory() as tmpdirname:
pid_fp = os.path.join(tmpdirname, "pid.txt")
try:
os.remove(pid_fp)
except:
pass
shell = f"sg {self.group} -c \"echo \\$\\$ > {pid_fp}; exec {self.cmd}\""
proc = await asyncio.create_subprocess_shell(
shell,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE)
logging.info(f"Process started, cmd: {self.cmd}")
self.proc = proc
for _ in range(3):
await asyncio.sleep(1)
got_pid = False
try:
PID = int(open(pid_fp, "r").read())
got_pid = True
except:
got_pid = False
if got_pid:
break
if not got_pid:
logging.error("cannot get pid")
break
else:
proc = await asyncio.create_subprocess_shell(
self.cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE)
PID = proc.pid
logging.info(f"pid: {PID}")
self.pid = PID
stdout, stderr = await proc.communicate()
logging.info(f'[{self.cmd!r} exited with {proc.returncode}]')
if stdout:
logging.info(f'[stdout]\n{stdout.decode()}')
if stderr:
logging.info(f'[stderr]\n{stderr.decode()}')
self.pid = 0
await asyncio.sleep(self.restart_interval)
def start(self, cmd=None):
assert self.state == ProcessManagerState.IDLE, "process manager should be idle before start"
if cmd:
self.cmd = cmd
assert self.cmd, "cmd has not been assigned"
self.state = ProcessManagerState.RUNNING
self.task = self.loop.create_task(self.run_cmd())
def stop(self):
assert self.state == ProcessManagerState.RUNNING, "process manager is not running"
self.state = ProcessManagerState.IDLE
self.task.cancel()
try:
os.kill(self.pid, signal.SIGINT)
except Exception as err:
logging.warning(f"kill failed: {err}")
self.task.cancel()
self.pid = 0
self.task = None
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
pm = ProcessManager(5, group="mantao")
pm.start("echo haha")
asyncio.get_event_loop().run_until_complete(asyncio.sleep(5))