ovpn-lb-socks5/o_manager.py
mantaohuang 02bfc93351 fix bug
2020-07-28 15:04:27 -04:00

339 lines
11 KiB
Python

import os
from openvpn import Openvpn, RUNNING
import asyncio
from tornado.template import Template
import signal
import json
import socket
import fcntl
import struct
import math
import numpy
from plot_gen import TimeSeriesAccumulator
from datetime import datetime, timedelta
def get_ip_address(ifname):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
return socket.inet_ntoa(fcntl.ioctl(
s.fileno(),
0x8915, # SIOCGIFADDR
struct.pack(b'256s', ifname[:15].encode())
)[20:24])
class OManager:
def __init__(self, base_folder, interface, base_port=8001, loop=None):
self.monitor_interval = 5
self.base_folder = base_folder
self.base_port = base_port
self.interface = interface
self.ip = get_ip_address(self.interface)
self.instances = []
self.new_idx = 0
self.weights = []
self.run_task = []
self.dynamic_weight_fp = None
self.pids = []
if loop:
self.loop = loop
else:
self.loop = asyncio.get_event_loop()
def new_op(self, ovpn_config):
folder_path = os.path.join(self.base_folder, f"session{self.new_idx}")
if not os.path.isdir(folder_path):
os.makedirs(folder_path)
os.system(f"groupadd vpn{self.new_idx}")
env_config = {
"monitor_interval": self.monitor_interval,
"folder_path": folder_path,
"script_template_fp": "script.sh.template",
"proxycfg_template_fp": "3proxy.cfg.template",
}
op = Openvpn(self.new_idx, ovpn_config=ovpn_config,
env_config=env_config)
self.instances.append({
"op": op,
"idx": self.new_idx,
"weight": 1
})
self.new_idx += 1
return op
def new_op_old(self, cfg_fp, name=None, additional_cfg={}):
if not name:
name = f"openvpn-{self.new_idx}"
ovpn_config = {
"cfg_fp": cfg_fp,
"name": name,
"additional_cfg": additional_cfg,
}
return self.new_op(ovpn_config)
def serialize(self):
instances = [
{
"idx": i["idx"],
"name": i["op"].name,
"weight": i["weight"],
"pids": i["op"].pids,
"status": i["op"].status,
"log": i["op"].get_log(lines=10),
"io_stat": i["op"].get_io_stat(lines=5),
"ping_stat": i["op"].get_ping_stat(lines=5)
} for i in self.instances
]
state = {
"instances": instances,
"lb_pids": self.pids
}
return state
def get_stat(self, time_range=5, n_bins=60):
stat = []
# align time range to grid defined by monitoring interval
now = datetime.now()
for i in self.instances:
lines = math.ceil(time_range*60/5)
io_stat = i["op"].get_io_stat(lines=lines)
first_line = True
prev_value = None
io_tas = TimeSeriesAccumulator(
start_time=now-timedelta(minutes=time_range), end_time=now, n_bins=n_bins, alignment=self.monitor_interval)
for line in io_stat:
time, value_down, value_up = line.split(",")
time = datetime.utcfromtimestamp(float(time))
value = float(value_down)+float(value_up)
if first_line:
first_line = False
else:
delta = max(0, value - prev_value)
io_tas.add(time, delta/5)
prev_value = value
ping_stat = i["op"].get_ping_stat(lines=lines)
ping_tas = TimeSeriesAccumulator(
start_time=now-timedelta(minutes=time_range), end_time=now, n_bins=n_bins, alignment=self.monitor_interval)
for line in ping_stat:
time, value = line.split(",")
time = datetime.utcfromtimestamp(float(time))
value = float(value)
ping_tas.add(time, delta)
def ping_mean(x):
x = numpy.array(x)
return numpy.mean(x[x > 0])
def ping_rate(x):
x = numpy.array(x)
if len(x) == 0:
return 0
return numpy.count_nonzero(x > 0)/len(x)
stat.append({
"idx": i["idx"],
"name": i["op"].name,
"weight": i["weight"],
"pids": i["op"].pids,
"status": i["op"].status,
"log": i["op"].get_log(lines=10),
"name": i["op"].name,
"io_mean": io_tas.get_f(numpy.mean),
"ping_mean": ping_tas.get_f(ping_mean),
"ping_rate": ping_tas.get_f(ping_rate)
})
if self.instances:
xs = [j.isoformat() for j in ping_tas.get_ts()]
return {
"time": xs,
"instances": stat,
"lb_pids": self.pids
}
return {}
def generate_lb_cfg(self, load_balance_mode):
lb_cfg_fp = os.path.join(
self.base_folder, f"go-socks-lb-{load_balance_mode}.yml")
self.dynamic_weight_fp = os.path.join(self.base_folder, "weights.json")
with open(f"go-socks-lb.yml.template", "r") as template_f:
buf = template_f.read()
template = Template(buf)
items = {
"ip": self.ip,
"instances": self.instances,
"dynamic_weight_fp": self.dynamic_weight_fp,
"load_balance_mode": load_balance_mode
}
lb_cfg = template.generate(**items)
with open(lb_cfg_fp, "wb") as lb_cfg_f:
lb_cfg_f.write(lb_cfg)
return lb_cfg_fp
def generate_dynamic_weight(self, weights):
with open(self.dynamic_weight_fp, "w") as dynamic_weight_f:
dynamic_weight_f.write(json.dumps({
"weights": weights
}))
def get_instance(self, idx):
for instance in self.instances:
if instance["idx"] == idx:
return instance
return None
def start_op(self, idx):
instance = self.get_instance(idx)
if not instance:
return
instance["op"].start()
instance["weight"] = 1
self.reset_lb()
def stop_op(self, idx):
instance = self.get_instance(idx)
if not instance:
return
instance["op"].stop()
instance["weight"] = 0
self.reset_lb()
def remove_op(self, idx):
instance = self.get_instance(idx)
if not instance:
return
instance["op"].stop()
instance["weight"] = 0
self.instances.remove(instance)
if len(self.instances) == 0:
self.new_idx = 0
self.reset_lb()
def remove_all_op(self):
for instance in self.instances:
instance["op"].stop()
instance["weight"] = 0
self.instances = []
self.new_idx = 0
self.reset_lb()
def start_all(self):
for instance in self.instances:
instance["op"].start()
instance["weight"] = 1
self.reset_lb()
def stop_all(self):
for instance in self.instances:
instance["op"].stop()
instance["weight"] = 0
self.reset_lb()
def reset_lb(self):
for pid in self.pids:
try:
os.kill(pid, signal.SIGINT)
except Exception as err:
print("kill failed:", err)
for task in self.run_task:
task.cancel()
self.pids = []
self.run_task = []
# find running instances
exist_running_instance = False
for instance in self.instances:
if instance["op"].status == RUNNING:
exist_running_instance = True
break
if exist_running_instance:
# two modes on different ports
lb_cfg_fp = self.generate_lb_cfg("fallback")
lb_cmd = ["go-socks-lb/go-socks-lb", "-config",
lb_cfg_fp, "-bind", "0.0.0.0:7000"]
print("lb_cmd 0", lb_cmd)
self.run_task.append(self.loop.create_task(
self.run_cmd(lb_cmd)))
lb_cfg_fp = self.generate_lb_cfg("cached-shuffle")
lb_cmd = ["go-socks-lb/go-socks-lb", "-config",
lb_cfg_fp, "-bind", "0.0.0.0:7001"]
print("lb_cmd 1", lb_cmd)
self.run_task.append(self.loop.create_task(
self.run_cmd(lb_cmd)))
#print("pre calc weights:", self.calc_weights())
self.run_task.append(self.loop.create_task(
self.keep_applying_weights()))
def calc_weights(self):
# n_up = 5
# n_down = 2
weights = []
for i in self.instances:
n_up = i["op"].n_up
n_down = i["op"].n_down
ping_stat = i["op"].get_ping_stat(lines=max(n_up, n_down))
ping_stat = [i.split(",")[-1] != "-1" for i in ping_stat]
#print("ping status:", ping_stat)
def all_equal(n, status):
for j in range(min(n, len(ping_stat))):
if ping_stat[-(j+1)] != status:
return False
return True
# all up
if all_equal(n_up, True):
i["weight"] = 1
# print("all_up")
if all_equal(n_down, False):
i["weight"] = 0
# print("all_down")
weights.append(i["weight"])
return weights
async def keep_applying_weights(self):
old_weights = []
while True:
await asyncio.sleep(5)
#print("calculating weights")
new_weights = self.calc_weights()
#print("finished calc")
if old_weights != new_weights:
old_weights = new_weights
print("weight changed, new weights:", new_weights)
self.generate_dynamic_weight(new_weights)
for pid in self.pids:
try:
os.kill(pid, signal.SIGUSR1)
except:
print("error applying weights to", pid)
# else:
#print("weight not changed, weights:", new_weights)
def clear_cache(self):
for pid in self.pids:
try:
os.kill(pid, signal.SIGUSR2)
except:
print("error clear cache of", pid)
async def run_cmd(self, cmd):
while True:
print("Manager trying to start go")
proc = await asyncio.create_subprocess_exec(
cmd[0], *cmd[1:],
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE)
self.proc = proc
self.pids.append(proc.pid)
print(f"Load balancer started, pids: {self.pids}")
stdout, stderr = await proc.communicate()
print(f'[{cmd!r} exited with {proc.returncode}]')
if proc.pid in self.pids:
self.pids.remove(proc.pid)
if stdout:
print(f'[stdout]\n{stdout.decode()}')
if stderr:
print(f'[stderr]\n{stderr.decode()}')
await asyncio.sleep(5)