refactor process manager of o_manager
This commit is contained in:
parent
7a1fdfaeab
commit
ca92c831de
59
o_manager.py
59
o_manager.py
@ -12,6 +12,8 @@ import numpy
|
||||
from plot_gen import TimeSeriesAccumulator
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from process_manager import ProcessManager
|
||||
|
||||
|
||||
def get_ip_address(ifname):
|
||||
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
@ -23,6 +25,9 @@ def get_ip_address(ifname):
|
||||
|
||||
|
||||
class OManager:
|
||||
lb1_pm: ProcessManager
|
||||
lb2_pm: ProcessManager
|
||||
|
||||
def __init__(self, base_folder, interface, base_port=8001, loop=None):
|
||||
self.monitor_interval = 5
|
||||
self.base_folder = base_folder
|
||||
@ -34,12 +39,18 @@ class OManager:
|
||||
self.weights = []
|
||||
self.run_task = []
|
||||
self.dynamic_weight_fp = None
|
||||
self.pids = []
|
||||
if loop:
|
||||
self.loop = loop
|
||||
else:
|
||||
self.loop = asyncio.get_event_loop()
|
||||
|
||||
self.lb1_pm = ProcessManager(restart_interval=5, loop=self.loop)
|
||||
self.lb2_pm = ProcessManager(restart_interval=5, loop=self.loop)
|
||||
|
||||
@property
|
||||
def pids(self):
|
||||
return [self.lb1_pm.pid, self.lb2_pm.pid]
|
||||
|
||||
def new_op(self, ovpn_config):
|
||||
folder_path = os.path.join(self.base_folder, f"session{self.new_idx}")
|
||||
if not os.path.isdir(folder_path):
|
||||
@ -239,14 +250,10 @@ class OManager:
|
||||
self.reset_lb()
|
||||
|
||||
def reset_lb(self):
|
||||
for pid in self.pids:
|
||||
try:
|
||||
os.kill(pid, signal.SIGINT)
|
||||
except Exception as err:
|
||||
print("kill failed:", err)
|
||||
self.lb1_pm.stop()
|
||||
self.lb2_pm.stop()
|
||||
for task in self.run_task:
|
||||
task.cancel()
|
||||
self.pids = []
|
||||
self.run_task = []
|
||||
|
||||
# find running instances
|
||||
@ -261,15 +268,13 @@ class OManager:
|
||||
lb_cmd = ["go-socks-lb/go-socks-lb", "-config",
|
||||
lb_cfg_fp, "-bind", "0.0.0.0:7000"]
|
||||
print("lb_cmd 0", lb_cmd)
|
||||
self.run_task.append(self.loop.create_task(
|
||||
self.run_cmd(lb_cmd)))
|
||||
self.lb1_pm.start(cmd=lb_cmd)
|
||||
|
||||
lb_cfg_fp = self.generate_lb_cfg("cached-shuffle")
|
||||
lb_cmd = ["go-socks-lb/go-socks-lb", "-config",
|
||||
lb_cfg_fp, "-bind", "0.0.0.0:7001"]
|
||||
print("lb_cmd 1", lb_cmd)
|
||||
self.run_task.append(self.loop.create_task(
|
||||
self.run_cmd(lb_cmd)))
|
||||
self.lb2_pm.start(cmd=lb_cmd)
|
||||
#print("pre calc weights:", self.calc_weights())
|
||||
self.run_task.append(self.loop.create_task(
|
||||
self.keep_applying_weights()))
|
||||
@ -315,37 +320,17 @@ class OManager:
|
||||
old_weights = new_weights
|
||||
print("weight changed, new weights:", new_weights)
|
||||
self.generate_dynamic_weight(new_weights)
|
||||
for pid in self.pids:
|
||||
for pm in [self.lb1_pm, self.lb2_pm]:
|
||||
try:
|
||||
os.kill(pid, signal.SIGUSR1)
|
||||
pm.signal(signal.SIGUSR1)
|
||||
except:
|
||||
print("error applying weights to", pid)
|
||||
print("error applying weights to", pm.pid)
|
||||
# else:
|
||||
#print("weight not changed, weights:", new_weights)
|
||||
|
||||
def clear_cache(self):
|
||||
for pid in self.pids:
|
||||
for pm in [self.lb1_pm, self.lb2_pm]:
|
||||
try:
|
||||
os.kill(pid, signal.SIGUSR2)
|
||||
pm.signal(signal.SIGUSR2)
|
||||
except:
|
||||
print("error clear cache of", pid)
|
||||
|
||||
async def run_cmd(self, cmd):
|
||||
while True:
|
||||
print("Manager trying to start go")
|
||||
proc = await asyncio.create_subprocess_exec(
|
||||
cmd[0], *cmd[1:],
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE)
|
||||
self.proc = proc
|
||||
self.pids.append(proc.pid)
|
||||
print(f"Load balancer started, pids: {self.pids}")
|
||||
stdout, stderr = await proc.communicate()
|
||||
print(f'[{cmd!r} exited with {proc.returncode}]')
|
||||
if proc.pid in self.pids:
|
||||
self.pids.remove(proc.pid)
|
||||
if stdout:
|
||||
print(f'[stdout]\n{stdout.decode()}')
|
||||
if stderr:
|
||||
print(f'[stderr]\n{stderr.decode()}')
|
||||
await asyncio.sleep(5)
|
||||
print("error clear cache of", pm.pid)
|
||||
|
||||
11
openvpn.py
11
openvpn.py
@ -15,7 +15,7 @@ from tornado.template import Template
|
||||
import stat
|
||||
import shutil
|
||||
import tailer
|
||||
|
||||
import logging
|
||||
from process_manager import ProcessManager
|
||||
|
||||
|
||||
@ -213,7 +213,7 @@ class Openvpn:
|
||||
filepath = os.path.join(dirpath, filename)
|
||||
os.remove(filepath)
|
||||
except Exception as err:
|
||||
print("cannot remove log file, error:", err)
|
||||
logging.error(f"cannot remove log file, error:{err}")
|
||||
|
||||
def stop(self, clear_folder=True):
|
||||
if self.status == RUNNING:
|
||||
@ -274,7 +274,9 @@ class Openvpn:
|
||||
TODO: seperate module for monitoring
|
||||
"""
|
||||
io_count = psutil.net_io_counters(pernic=True)
|
||||
assert self.interface in io_count, "cannot find interface, nic is not there"
|
||||
if self.interface not in io_count:
|
||||
logging.debug(f"interface {self.interface} is not ready.")
|
||||
return
|
||||
io_count = io_count[self.interface]
|
||||
open(self.io_stat_fp, "a").write(
|
||||
f"{datetime.utcnow().timestamp()},{io_count.bytes_recv},{io_count.bytes_sent}\n")
|
||||
@ -292,10 +294,9 @@ class Openvpn:
|
||||
f"{datetime.utcnow().timestamp()},{result}\n")
|
||||
|
||||
async def monitor_task(self):
|
||||
# print("running monitor task")
|
||||
while self.status == RUNNING:
|
||||
try:
|
||||
self.monitor_action()
|
||||
except Exception as e:
|
||||
print("monitoring error:", e)
|
||||
logging.error(f"monitoring error: {e}")
|
||||
await asyncio.sleep(self.monitor_interval)
|
||||
|
||||
@ -110,6 +110,10 @@ class ProcessManager:
|
||||
self.pid = 0
|
||||
self.task = None
|
||||
|
||||
def signal(self, signal):
|
||||
if self.state == ProcessManagerState.RUNNING:
|
||||
os.kill(self.pid, signal)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user