From ca92c831de3853487fd22257b91cb6677945cebd Mon Sep 17 00:00:00 2001 From: mantaohuang Date: Sun, 2 Aug 2020 00:03:25 -0400 Subject: [PATCH] refactor process manager of o_manager --- o_manager.py | 59 +++++++++++++++++----------------------------- openvpn.py | 11 +++++---- process_manager.py | 4 ++++ 3 files changed, 32 insertions(+), 42 deletions(-) diff --git a/o_manager.py b/o_manager.py index b7c6253..5be0eaa 100644 --- a/o_manager.py +++ b/o_manager.py @@ -12,6 +12,8 @@ import numpy from plot_gen import TimeSeriesAccumulator from datetime import datetime, timedelta +from process_manager import ProcessManager + def get_ip_address(ifname): s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) @@ -23,6 +25,9 @@ def get_ip_address(ifname): class OManager: + lb1_pm: ProcessManager + lb2_pm: ProcessManager + def __init__(self, base_folder, interface, base_port=8001, loop=None): self.monitor_interval = 5 self.base_folder = base_folder @@ -34,12 +39,18 @@ class OManager: self.weights = [] self.run_task = [] self.dynamic_weight_fp = None - self.pids = [] if loop: self.loop = loop else: self.loop = asyncio.get_event_loop() + self.lb1_pm = ProcessManager(restart_interval=5, loop=self.loop) + self.lb2_pm = ProcessManager(restart_interval=5, loop=self.loop) + + @property + def pids(self): + return [self.lb1_pm.pid, self.lb2_pm.pid] + def new_op(self, ovpn_config): folder_path = os.path.join(self.base_folder, f"session{self.new_idx}") if not os.path.isdir(folder_path): @@ -239,14 +250,10 @@ class OManager: self.reset_lb() def reset_lb(self): - for pid in self.pids: - try: - os.kill(pid, signal.SIGINT) - except Exception as err: - print("kill failed:", err) + self.lb1_pm.stop() + self.lb2_pm.stop() for task in self.run_task: task.cancel() - self.pids = [] self.run_task = [] # find running instances @@ -261,15 +268,13 @@ class OManager: lb_cmd = ["go-socks-lb/go-socks-lb", "-config", lb_cfg_fp, "-bind", "0.0.0.0:7000"] print("lb_cmd 0", lb_cmd) - self.run_task.append(self.loop.create_task( - self.run_cmd(lb_cmd))) + self.lb1_pm.start(cmd=lb_cmd) lb_cfg_fp = self.generate_lb_cfg("cached-shuffle") lb_cmd = ["go-socks-lb/go-socks-lb", "-config", lb_cfg_fp, "-bind", "0.0.0.0:7001"] print("lb_cmd 1", lb_cmd) - self.run_task.append(self.loop.create_task( - self.run_cmd(lb_cmd))) + self.lb2_pm.start(cmd=lb_cmd) #print("pre calc weights:", self.calc_weights()) self.run_task.append(self.loop.create_task( self.keep_applying_weights())) @@ -315,37 +320,17 @@ class OManager: old_weights = new_weights print("weight changed, new weights:", new_weights) self.generate_dynamic_weight(new_weights) - for pid in self.pids: + for pm in [self.lb1_pm, self.lb2_pm]: try: - os.kill(pid, signal.SIGUSR1) + pm.signal(signal.SIGUSR1) except: - print("error applying weights to", pid) + print("error applying weights to", pm.pid) # else: #print("weight not changed, weights:", new_weights) def clear_cache(self): - for pid in self.pids: + for pm in [self.lb1_pm, self.lb2_pm]: try: - os.kill(pid, signal.SIGUSR2) + pm.signal(signal.SIGUSR2) except: - print("error clear cache of", pid) - - async def run_cmd(self, cmd): - while True: - print("Manager trying to start go") - proc = await asyncio.create_subprocess_exec( - cmd[0], *cmd[1:], - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE) - self.proc = proc - self.pids.append(proc.pid) - print(f"Load balancer started, pids: {self.pids}") - stdout, stderr = await proc.communicate() - print(f'[{cmd!r} exited with {proc.returncode}]') - if proc.pid in self.pids: - self.pids.remove(proc.pid) - if stdout: - print(f'[stdout]\n{stdout.decode()}') - if stderr: - print(f'[stderr]\n{stderr.decode()}') - await asyncio.sleep(5) + print("error clear cache of", pm.pid) diff --git a/openvpn.py b/openvpn.py index 1d0562d..bde971a 100644 --- a/openvpn.py +++ b/openvpn.py @@ -15,7 +15,7 @@ from tornado.template import Template import stat import shutil import tailer - +import logging from process_manager import ProcessManager @@ -213,7 +213,7 @@ class Openvpn: filepath = os.path.join(dirpath, filename) os.remove(filepath) except Exception as err: - print("cannot remove log file, error:", err) + logging.error(f"cannot remove log file, error:{err}") def stop(self, clear_folder=True): if self.status == RUNNING: @@ -274,7 +274,9 @@ class Openvpn: TODO: seperate module for monitoring """ io_count = psutil.net_io_counters(pernic=True) - assert self.interface in io_count, "cannot find interface, nic is not there" + if self.interface not in io_count: + logging.debug(f"interface {self.interface} is not ready.") + return io_count = io_count[self.interface] open(self.io_stat_fp, "a").write( f"{datetime.utcnow().timestamp()},{io_count.bytes_recv},{io_count.bytes_sent}\n") @@ -292,10 +294,9 @@ class Openvpn: f"{datetime.utcnow().timestamp()},{result}\n") async def monitor_task(self): - # print("running monitor task") while self.status == RUNNING: try: self.monitor_action() except Exception as e: - print("monitoring error:", e) + logging.error(f"monitoring error: {e}") await asyncio.sleep(self.monitor_interval) diff --git a/process_manager.py b/process_manager.py index 07888c4..d8650ab 100644 --- a/process_manager.py +++ b/process_manager.py @@ -110,6 +110,10 @@ class ProcessManager: self.pid = 0 self.task = None + def signal(self, signal): + if self.state == ProcessManagerState.RUNNING: + os.kill(self.pid, signal) + if __name__ == "__main__": logging.basicConfig(level=logging.DEBUG)