ovpn-lb-socks5/o_manager.py
2023-12-09 23:18:16 -05:00

356 lines
12 KiB
Python

import os
from openvpn import Openvpn, RUNNING
import asyncio
from tornado.template import Template
import signal
import json
import socket
import fcntl
import struct
import math
import numpy
from plot_gen import TimeSeriesAccumulator
from datetime import datetime, timedelta
from process_manager import ProcessManager
def get_ip_address(ifname):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
return socket.inet_ntoa(fcntl.ioctl(
s.fileno(),
0x8915, # SIOCGIFADDR
struct.pack(b'256s', ifname[:15].encode())
)[20:24])
class OManager:
lb1_pm: ProcessManager
lb2_pm: ProcessManager
def __init__(self, base_folder, interface, base_port=8001, loop=None):
self.monitor_interval = 5
self.base_folder = base_folder
self.base_port = base_port
self.interface = interface
self.ip = get_ip_address(self.interface)
self.instances = []
self.new_idx = 0
self.weights = []
self.run_task = []
self.dynamic_weight_fp = None
if loop:
self.loop = loop
else:
self.loop = asyncio.get_event_loop()
self.lb1_pm = ProcessManager(restart_interval=5, loop=self.loop, group="root") # specify group as the non-group pid seems wrong, needs fix in processmanager
self.lb2_pm = ProcessManager(restart_interval=5, loop=self.loop, group="root")
def load_instance_config(self, instance_config):
"""
[
ovpn_config
]
"""
for ovpn_config in instance_config:
self.new_op(ovpn_config)
def serialize_instance_config(self):
return [i["op"].export_cfg() for i in self.instances]
@property
def pids(self):
return [self.lb1_pm.pid, self.lb2_pm.pid]
def new_op(self, ovpn_config):
folder_path = os.path.join(self.base_folder, f"session{self.new_idx}")
if not os.path.isdir(folder_path):
os.makedirs(folder_path)
os.system(f"groupadd vpn{self.new_idx}")
env_config = {
"monitor_interval": self.monitor_interval,
"folder_path": folder_path,
"script_template_fp": "script.sh.template",
"proxycfg_template_fp": "3proxy.cfg.template",
}
op = Openvpn(self.new_idx, ovpn_config=ovpn_config,
env_config=env_config)
self.instances.append({
"op": op,
"idx": self.new_idx,
"weight": 1
})
self.new_idx += 1
# reset lb if the new op is started
new_op_enabled = ovpn_config.get("enabled", False)
if new_op_enabled:
self.reset_lb()
return op
def new_op_old(self, cfg_fp, name=None, additional_cfg={}):
if not name:
name = f"openvpn-{self.new_idx}"
ovpn_config = {
"cfg_fp": cfg_fp,
"name": name,
"additional_cfg": additional_cfg,
}
return self.new_op(ovpn_config)
def serialize(self):
instances = [
{
"idx": i["idx"],
"name": i["op"].name,
"weight": i["weight"],
"pids": i["op"].pids,
"status": i["op"].status,
"log": i["op"].get_log(lines=10),
"io_stat": i["op"].get_io_stat(lines=5),
"ping_stat": i["op"].get_ping_stat(lines=5)
} for i in self.instances
]
state = {
"instances": instances,
"lb_pids": self.pids
}
return state
def get_stat(self, time_range=5, n_bins=60):
stat = []
# align time range to grid defined by monitoring interval
now = datetime.now()
for i in self.instances:
lines = math.ceil(time_range*60/5)+1
io_stat = i["op"].get_io_stat(lines=lines)
first_line = True
prev_value = None
io_tas = TimeSeriesAccumulator(
start_time=now-timedelta(minutes=time_range), end_time=now, n_bins=n_bins, alignment=self.monitor_interval)
for line in io_stat:
time, value_down, value_up = line.split(",")
time = datetime.utcfromtimestamp(float(time))
value = float(value_down)+float(value_up)
if first_line:
first_line = False
else:
delta = max(0, value - prev_value)
io_tas.add(time, delta/5)
prev_value = value
ping_stat = i["op"].get_ping_stat(lines=lines)
ping_tas = TimeSeriesAccumulator(
start_time=now-timedelta(minutes=time_range), end_time=now, n_bins=n_bins, alignment=self.monitor_interval)
for line in ping_stat:
time, value = line.split(",")
time = datetime.utcfromtimestamp(float(time))
value = float(value)
ping_tas.add(time, value)
def io_mean(x):
if len(x):
return numpy.mean(x)
else:
return numpy.nan
def ping_mean(x):
x = numpy.array(x)
x = x[x > 0]
if len(x):
return numpy.mean(x)
else:
return numpy.nan
def ping_rate(x):
x = numpy.array(x)
if len(x) == 0:
return 0
return numpy.count_nonzero(x > 0)/len(x)
stat.append({
"idx": i["idx"],
"name": i["op"].name,
"weight": i["weight"],
"pids": i["op"].pids,
"status": i["op"].status,
"log": i["op"].get_log(lines=10),
"io_mean": io_tas.get_f(io_mean),
"ping_mean": ping_tas.get_f(ping_mean),
"ping_rate": ping_tas.get_f(ping_rate)
})
if self.instances:
xs = [j.isoformat() for j in ping_tas.get_ts()]
return {
"time": xs,
"instances": stat,
"lb_pids": self.pids
}
return {}
def generate_lb_cfg(self, load_balance_mode):
lb_cfg_fp = os.path.join(
self.base_folder, f"go-socks-lb-{load_balance_mode}.yml")
self.dynamic_weight_fp = os.path.join(self.base_folder, "weights.json")
with open(f"go-socks-lb.yml.template", "r") as template_f:
buf = template_f.read()
template = Template(buf)
items = {
"ip": self.ip,
"instances": self.instances,
"dynamic_weight_fp": self.dynamic_weight_fp,
"load_balance_mode": load_balance_mode
}
lb_cfg = template.generate(**items)
with open(lb_cfg_fp, "wb") as lb_cfg_f:
lb_cfg_f.write(lb_cfg)
return lb_cfg_fp
def generate_dynamic_weight(self, weights):
with open(self.dynamic_weight_fp, "w") as dynamic_weight_f:
dynamic_weight_f.write(json.dumps({
"weights": weights
}))
def get_instance(self, idx):
for instance in self.instances:
if instance["idx"] == idx:
return instance
return None
def start_op(self, idx):
instance = self.get_instance(idx)
if not instance:
return
instance["op"].start()
instance["weight"] = 0
self.reset_lb()
def stop_op(self, idx):
instance = self.get_instance(idx)
if not instance:
return
instance["op"].stop()
instance["weight"] = 0
self.reset_lb()
def remove_op(self, idx):
instance = self.get_instance(idx)
if not instance:
return
instance["op"].stop()
instance["weight"] = 0
self.instances.remove(instance)
if len(self.instances) == 0:
self.new_idx = 0
self.reset_lb()
def remove_all_op(self):
for instance in self.instances:
instance["op"].stop()
instance["weight"] = 0
self.instances = []
self.new_idx = 0
self.reset_lb()
def start_all(self):
for instance in self.instances:
instance["op"].start()
instance["weight"] = 1
self.reset_lb()
def stop_all(self):
for instance in self.instances:
instance["op"].stop()
instance["weight"] = 0
self.reset_lb()
def reset_lb(self):
self.lb1_pm.stop()
self.lb2_pm.stop()
for task in self.run_task:
task.cancel()
self.run_task = []
# find running instances
exist_running_instance = False
for instance in self.instances:
if instance["op"].status == RUNNING:
exist_running_instance = True
break
if exist_running_instance:
# two modes on different ports
lb_cfg_fp = self.generate_lb_cfg("fallback")
lb_cmd = " ".join(["go-socks-lb/go-socks-lb", "-config",
lb_cfg_fp, "-bind", "0.0.0.0:7000"])
print("lb_cmd 0", lb_cmd)
self.lb1_pm.start(cmd=lb_cmd)
lb_cfg_fp = self.generate_lb_cfg("cached-shuffle")
lb_cmd = " ".join(["go-socks-lb/go-socks-lb", "-config",
lb_cfg_fp, "-bind", "0.0.0.0:7001"])
print("lb_cmd 1", lb_cmd)
self.lb2_pm.start(cmd=lb_cmd)
#print("pre calc weights:", self.calc_weights())
self.run_task.append(self.loop.create_task(
self.keep_applying_weights()))
def calc_weights(self):
# n_up = 5
# n_down = 2
weights = []
for i in self.instances:
if i["op"].status != RUNNING or (not i["op"].interface_up):
i["weight"] = 0
weights.append(i["weight"])
continue
n_up = i["op"].n_up
n_down = i["op"].n_down
ping_stat = i["op"].get_ping_stat(lines=max(n_up, n_down))
ping_stat = [i.split(",")[-1] != "-1" for i in ping_stat]
#print("ping status:", ping_stat)
def all_equal(n, status):
for j in range(min(n, len(ping_stat))):
if ping_stat[-(j+1)] != status:
return False
return True
# all up
if all_equal(n_up, True):
i["weight"] = 1
# print("all_up")
if all_equal(n_down, False):
i["weight"] = 0
# print("all_down")
weights.append(i["weight"])
return weights
async def keep_applying_weights(self):
old_weights = []
while True:
await asyncio.sleep(5)
if self.lb1_pm.pid==0 or self.lb2_pm.pid==0:
# pid not acquired yet, wait a bit more
await asyncio.sleep(3)
#print("calculating weights")
new_weights = self.calc_weights()
#print("finished calc")
if old_weights != new_weights:
old_weights = new_weights
print("weight changed, new weights:", new_weights)
self.generate_dynamic_weight(new_weights)
for pm in [self.lb1_pm, self.lb2_pm]:
try:
pm.signal(signal.SIGUSR1)
except:
print("error applying weights to", pm.pid)
# else:
#print("weight not changed, weights:", new_weights)
def clear_cache(self):
for pm in [self.lb1_pm, self.lb2_pm]:
try:
pm.signal(signal.SIGUSR2)
except:
print("error clear cache of", pm.pid)