356 lines
12 KiB
Python
356 lines
12 KiB
Python
import os
|
|
from openvpn import Openvpn, RUNNING
|
|
import asyncio
|
|
from tornado.template import Template
|
|
import signal
|
|
import json
|
|
import socket
|
|
import fcntl
|
|
import struct
|
|
import math
|
|
import numpy
|
|
from plot_gen import TimeSeriesAccumulator
|
|
from datetime import datetime, timedelta
|
|
|
|
from process_manager import ProcessManager
|
|
|
|
|
|
def get_ip_address(ifname):
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
return socket.inet_ntoa(fcntl.ioctl(
|
|
s.fileno(),
|
|
0x8915, # SIOCGIFADDR
|
|
struct.pack(b'256s', ifname[:15].encode())
|
|
)[20:24])
|
|
|
|
|
|
class OManager:
|
|
lb1_pm: ProcessManager
|
|
lb2_pm: ProcessManager
|
|
|
|
def __init__(self, base_folder, interface, base_port=8001, loop=None):
|
|
self.monitor_interval = 5
|
|
self.base_folder = base_folder
|
|
self.base_port = base_port
|
|
self.interface = interface
|
|
self.ip = get_ip_address(self.interface)
|
|
self.instances = []
|
|
self.new_idx = 0
|
|
self.weights = []
|
|
self.run_task = []
|
|
self.dynamic_weight_fp = None
|
|
if loop:
|
|
self.loop = loop
|
|
else:
|
|
self.loop = asyncio.get_event_loop()
|
|
|
|
self.lb1_pm = ProcessManager(restart_interval=5, loop=self.loop, group="root") # specify group as the non-group pid seems wrong, needs fix in processmanager
|
|
self.lb2_pm = ProcessManager(restart_interval=5, loop=self.loop, group="root")
|
|
|
|
def load_instance_config(self, instance_config):
|
|
"""
|
|
[
|
|
ovpn_config
|
|
]
|
|
"""
|
|
for ovpn_config in instance_config:
|
|
self.new_op(ovpn_config)
|
|
|
|
def serialize_instance_config(self):
|
|
return [i["op"].export_cfg() for i in self.instances]
|
|
|
|
@property
|
|
def pids(self):
|
|
return [self.lb1_pm.pid, self.lb2_pm.pid]
|
|
|
|
def new_op(self, ovpn_config):
|
|
folder_path = os.path.join(self.base_folder, f"session{self.new_idx}")
|
|
if not os.path.isdir(folder_path):
|
|
os.makedirs(folder_path)
|
|
os.system(f"groupadd vpn{self.new_idx}")
|
|
env_config = {
|
|
"monitor_interval": self.monitor_interval,
|
|
"folder_path": folder_path,
|
|
"script_template_fp": "script.sh.template",
|
|
"proxycfg_template_fp": "3proxy.cfg.template",
|
|
}
|
|
op = Openvpn(self.new_idx, ovpn_config=ovpn_config,
|
|
env_config=env_config)
|
|
self.instances.append({
|
|
"op": op,
|
|
"idx": self.new_idx,
|
|
"weight": 1
|
|
})
|
|
self.new_idx += 1
|
|
# reset lb if the new op is started
|
|
new_op_enabled = ovpn_config.get("enabled", False)
|
|
if new_op_enabled:
|
|
self.reset_lb()
|
|
return op
|
|
|
|
def new_op_old(self, cfg_fp, name=None, additional_cfg={}):
|
|
if not name:
|
|
name = f"openvpn-{self.new_idx}"
|
|
ovpn_config = {
|
|
"cfg_fp": cfg_fp,
|
|
"name": name,
|
|
"additional_cfg": additional_cfg,
|
|
}
|
|
return self.new_op(ovpn_config)
|
|
|
|
def serialize(self):
|
|
instances = [
|
|
{
|
|
"idx": i["idx"],
|
|
"name": i["op"].name,
|
|
"weight": i["weight"],
|
|
"pids": i["op"].pids,
|
|
"status": i["op"].status,
|
|
"log": i["op"].get_log(lines=10),
|
|
"io_stat": i["op"].get_io_stat(lines=5),
|
|
"ping_stat": i["op"].get_ping_stat(lines=5)
|
|
} for i in self.instances
|
|
]
|
|
state = {
|
|
"instances": instances,
|
|
"lb_pids": self.pids
|
|
}
|
|
return state
|
|
|
|
def get_stat(self, time_range=5, n_bins=60):
|
|
stat = []
|
|
# align time range to grid defined by monitoring interval
|
|
now = datetime.now()
|
|
for i in self.instances:
|
|
lines = math.ceil(time_range*60/5)+1
|
|
io_stat = i["op"].get_io_stat(lines=lines)
|
|
first_line = True
|
|
prev_value = None
|
|
io_tas = TimeSeriesAccumulator(
|
|
start_time=now-timedelta(minutes=time_range), end_time=now, n_bins=n_bins, alignment=self.monitor_interval)
|
|
for line in io_stat:
|
|
time, value_down, value_up = line.split(",")
|
|
time = datetime.utcfromtimestamp(float(time))
|
|
value = float(value_down)+float(value_up)
|
|
if first_line:
|
|
first_line = False
|
|
else:
|
|
delta = max(0, value - prev_value)
|
|
io_tas.add(time, delta/5)
|
|
prev_value = value
|
|
ping_stat = i["op"].get_ping_stat(lines=lines)
|
|
ping_tas = TimeSeriesAccumulator(
|
|
start_time=now-timedelta(minutes=time_range), end_time=now, n_bins=n_bins, alignment=self.monitor_interval)
|
|
for line in ping_stat:
|
|
time, value = line.split(",")
|
|
time = datetime.utcfromtimestamp(float(time))
|
|
value = float(value)
|
|
ping_tas.add(time, value)
|
|
|
|
def io_mean(x):
|
|
if len(x):
|
|
return numpy.mean(x)
|
|
else:
|
|
return numpy.nan
|
|
|
|
def ping_mean(x):
|
|
x = numpy.array(x)
|
|
x = x[x > 0]
|
|
if len(x):
|
|
return numpy.mean(x)
|
|
else:
|
|
return numpy.nan
|
|
|
|
def ping_rate(x):
|
|
x = numpy.array(x)
|
|
if len(x) == 0:
|
|
return 0
|
|
return numpy.count_nonzero(x > 0)/len(x)
|
|
|
|
stat.append({
|
|
"idx": i["idx"],
|
|
"name": i["op"].name,
|
|
"weight": i["weight"],
|
|
"pids": i["op"].pids,
|
|
"status": i["op"].status,
|
|
"log": i["op"].get_log(lines=10),
|
|
"io_mean": io_tas.get_f(io_mean),
|
|
"ping_mean": ping_tas.get_f(ping_mean),
|
|
"ping_rate": ping_tas.get_f(ping_rate)
|
|
})
|
|
if self.instances:
|
|
xs = [j.isoformat() for j in ping_tas.get_ts()]
|
|
return {
|
|
"time": xs,
|
|
"instances": stat,
|
|
"lb_pids": self.pids
|
|
}
|
|
return {}
|
|
|
|
def generate_lb_cfg(self, load_balance_mode):
|
|
lb_cfg_fp = os.path.join(
|
|
self.base_folder, f"go-socks-lb-{load_balance_mode}.yml")
|
|
|
|
self.dynamic_weight_fp = os.path.join(self.base_folder, "weights.json")
|
|
with open(f"go-socks-lb.yml.template", "r") as template_f:
|
|
buf = template_f.read()
|
|
template = Template(buf)
|
|
items = {
|
|
"ip": self.ip,
|
|
"instances": self.instances,
|
|
"dynamic_weight_fp": self.dynamic_weight_fp,
|
|
"load_balance_mode": load_balance_mode
|
|
}
|
|
lb_cfg = template.generate(**items)
|
|
with open(lb_cfg_fp, "wb") as lb_cfg_f:
|
|
lb_cfg_f.write(lb_cfg)
|
|
return lb_cfg_fp
|
|
|
|
def generate_dynamic_weight(self, weights):
|
|
with open(self.dynamic_weight_fp, "w") as dynamic_weight_f:
|
|
dynamic_weight_f.write(json.dumps({
|
|
"weights": weights
|
|
}))
|
|
|
|
def get_instance(self, idx):
|
|
for instance in self.instances:
|
|
if instance["idx"] == idx:
|
|
return instance
|
|
return None
|
|
|
|
def start_op(self, idx):
|
|
instance = self.get_instance(idx)
|
|
if not instance:
|
|
return
|
|
instance["op"].start()
|
|
instance["weight"] = 0
|
|
self.reset_lb()
|
|
|
|
def stop_op(self, idx):
|
|
instance = self.get_instance(idx)
|
|
if not instance:
|
|
return
|
|
instance["op"].stop()
|
|
instance["weight"] = 0
|
|
self.reset_lb()
|
|
|
|
def remove_op(self, idx):
|
|
instance = self.get_instance(idx)
|
|
if not instance:
|
|
return
|
|
instance["op"].stop()
|
|
instance["weight"] = 0
|
|
self.instances.remove(instance)
|
|
if len(self.instances) == 0:
|
|
self.new_idx = 0
|
|
self.reset_lb()
|
|
|
|
def remove_all_op(self):
|
|
for instance in self.instances:
|
|
instance["op"].stop()
|
|
instance["weight"] = 0
|
|
self.instances = []
|
|
self.new_idx = 0
|
|
self.reset_lb()
|
|
|
|
def start_all(self):
|
|
for instance in self.instances:
|
|
instance["op"].start()
|
|
instance["weight"] = 1
|
|
self.reset_lb()
|
|
|
|
def stop_all(self):
|
|
for instance in self.instances:
|
|
instance["op"].stop()
|
|
instance["weight"] = 0
|
|
self.reset_lb()
|
|
|
|
def reset_lb(self):
|
|
self.lb1_pm.stop()
|
|
self.lb2_pm.stop()
|
|
for task in self.run_task:
|
|
task.cancel()
|
|
self.run_task = []
|
|
|
|
# find running instances
|
|
exist_running_instance = False
|
|
for instance in self.instances:
|
|
if instance["op"].status == RUNNING:
|
|
exist_running_instance = True
|
|
break
|
|
if exist_running_instance:
|
|
# two modes on different ports
|
|
lb_cfg_fp = self.generate_lb_cfg("fallback")
|
|
lb_cmd = " ".join(["go-socks-lb/go-socks-lb", "-config",
|
|
lb_cfg_fp, "-bind", "0.0.0.0:7000"])
|
|
print("lb_cmd 0", lb_cmd)
|
|
self.lb1_pm.start(cmd=lb_cmd)
|
|
|
|
lb_cfg_fp = self.generate_lb_cfg("cached-shuffle")
|
|
lb_cmd = " ".join(["go-socks-lb/go-socks-lb", "-config",
|
|
lb_cfg_fp, "-bind", "0.0.0.0:7001"])
|
|
print("lb_cmd 1", lb_cmd)
|
|
self.lb2_pm.start(cmd=lb_cmd)
|
|
#print("pre calc weights:", self.calc_weights())
|
|
self.run_task.append(self.loop.create_task(
|
|
self.keep_applying_weights()))
|
|
|
|
def calc_weights(self):
|
|
# n_up = 5
|
|
# n_down = 2
|
|
weights = []
|
|
for i in self.instances:
|
|
if i["op"].status != RUNNING or (not i["op"].interface_up):
|
|
i["weight"] = 0
|
|
weights.append(i["weight"])
|
|
continue
|
|
n_up = i["op"].n_up
|
|
n_down = i["op"].n_down
|
|
ping_stat = i["op"].get_ping_stat(lines=max(n_up, n_down))
|
|
ping_stat = [i.split(",")[-1] != "-1" for i in ping_stat]
|
|
#print("ping status:", ping_stat)
|
|
|
|
def all_equal(n, status):
|
|
for j in range(min(n, len(ping_stat))):
|
|
if ping_stat[-(j+1)] != status:
|
|
return False
|
|
return True
|
|
# all up
|
|
if all_equal(n_up, True):
|
|
i["weight"] = 1
|
|
# print("all_up")
|
|
if all_equal(n_down, False):
|
|
i["weight"] = 0
|
|
# print("all_down")
|
|
weights.append(i["weight"])
|
|
return weights
|
|
|
|
async def keep_applying_weights(self):
|
|
old_weights = []
|
|
while True:
|
|
await asyncio.sleep(5)
|
|
if self.lb1_pm.pid==0 or self.lb2_pm.pid==0:
|
|
# pid not acquired yet, wait a bit more
|
|
await asyncio.sleep(3)
|
|
#print("calculating weights")
|
|
new_weights = self.calc_weights()
|
|
#print("finished calc")
|
|
if old_weights != new_weights:
|
|
old_weights = new_weights
|
|
print("weight changed, new weights:", new_weights)
|
|
self.generate_dynamic_weight(new_weights)
|
|
for pm in [self.lb1_pm, self.lb2_pm]:
|
|
try:
|
|
pm.signal(signal.SIGUSR1)
|
|
except:
|
|
print("error applying weights to", pm.pid)
|
|
# else:
|
|
#print("weight not changed, weights:", new_weights)
|
|
|
|
def clear_cache(self):
|
|
for pm in [self.lb1_pm, self.lb2_pm]:
|
|
try:
|
|
pm.signal(signal.SIGUSR2)
|
|
except:
|
|
print("error clear cache of", pm.pid)
|