import os from openvpn import Openvpn, RUNNING import asyncio from tornado.template import Template import signal import json import socket import fcntl import struct import math import numpy from plot_gen import TimeSeriesAccumulator from datetime import datetime, timedelta from process_manager import ProcessManager def get_ip_address(ifname): s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) return socket.inet_ntoa(fcntl.ioctl( s.fileno(), 0x8915, # SIOCGIFADDR struct.pack(b'256s', ifname[:15].encode()) )[20:24]) class OManager: lb1_pm: ProcessManager lb2_pm: ProcessManager def __init__(self, base_folder, interface, base_port=8001, loop=None): self.monitor_interval = 5 self.base_folder = base_folder self.base_port = base_port self.interface = interface self.ip = get_ip_address(self.interface) self.instances = [] self.new_idx = 0 self.weights = [] self.run_task = [] self.dynamic_weight_fp = None if loop: self.loop = loop else: self.loop = asyncio.get_event_loop() self.lb1_pm = ProcessManager(restart_interval=5, loop=self.loop, group="root") # specify group as the non-group pid seems wrong, needs fix in processmanager self.lb2_pm = ProcessManager(restart_interval=5, loop=self.loop, group="root") def load_instance_config(self, instance_config): """ [ ovpn_config ] """ for ovpn_config in instance_config: self.new_op(ovpn_config) def serialize_instance_config(self): return [i["op"].export_cfg() for i in self.instances] @property def pids(self): return [self.lb1_pm.pid, self.lb2_pm.pid] def new_op(self, ovpn_config): folder_path = os.path.join(self.base_folder, f"session{self.new_idx}") if not os.path.isdir(folder_path): os.makedirs(folder_path) os.system(f"groupadd vpn{self.new_idx}") env_config = { "monitor_interval": self.monitor_interval, "folder_path": folder_path, "script_template_fp": "script.sh.template", "proxycfg_template_fp": "3proxy.cfg.template", } op = Openvpn(self.new_idx, ovpn_config=ovpn_config, env_config=env_config) self.instances.append({ "op": op, "idx": self.new_idx, "weight": 1 }) self.new_idx += 1 # reset lb if the new op is started new_op_enabled = ovpn_config.get("enabled", False) if new_op_enabled: self.reset_lb() return op def new_op_old(self, cfg_fp, name=None, additional_cfg={}): if not name: name = f"openvpn-{self.new_idx}" ovpn_config = { "cfg_fp": cfg_fp, "name": name, "additional_cfg": additional_cfg, } return self.new_op(ovpn_config) def serialize(self): instances = [ { "idx": i["idx"], "name": i["op"].name, "weight": i["weight"], "pids": i["op"].pids, "status": i["op"].status, "log": i["op"].get_log(lines=10), "io_stat": i["op"].get_io_stat(lines=5), "ping_stat": i["op"].get_ping_stat(lines=5) } for i in self.instances ] state = { "instances": instances, "lb_pids": self.pids } return state def get_stat(self, time_range=5, n_bins=60): stat = [] # align time range to grid defined by monitoring interval now = datetime.now() for i in self.instances: lines = math.ceil(time_range*60/5)+1 io_stat = i["op"].get_io_stat(lines=lines) first_line = True prev_value = None io_tas = TimeSeriesAccumulator( start_time=now-timedelta(minutes=time_range), end_time=now, n_bins=n_bins, alignment=self.monitor_interval) for line in io_stat: time, value_down, value_up = line.split(",") time = datetime.utcfromtimestamp(float(time)) value = float(value_down)+float(value_up) if first_line: first_line = False else: delta = max(0, value - prev_value) io_tas.add(time, delta/5) prev_value = value ping_stat = i["op"].get_ping_stat(lines=lines) ping_tas = TimeSeriesAccumulator( start_time=now-timedelta(minutes=time_range), end_time=now, n_bins=n_bins, alignment=self.monitor_interval) for line in ping_stat: time, value = line.split(",") time = datetime.utcfromtimestamp(float(time)) value = float(value) ping_tas.add(time, value) def io_mean(x): if len(x): return numpy.mean(x) else: return numpy.nan def ping_mean(x): x = numpy.array(x) x = x[x > 0] if len(x): return numpy.mean(x) else: return numpy.nan def ping_rate(x): x = numpy.array(x) if len(x) == 0: return 0 return numpy.count_nonzero(x > 0)/len(x) stat.append({ "idx": i["idx"], "name": i["op"].name, "weight": i["weight"], "pids": i["op"].pids, "status": i["op"].status, "log": i["op"].get_log(lines=10), "io_mean": io_tas.get_f(io_mean), "ping_mean": ping_tas.get_f(ping_mean), "ping_rate": ping_tas.get_f(ping_rate) }) if self.instances: xs = [j.isoformat() for j in ping_tas.get_ts()] return { "time": xs, "instances": stat, "lb_pids": self.pids } return {} def generate_lb_cfg(self, load_balance_mode): lb_cfg_fp = os.path.join( self.base_folder, f"go-socks-lb-{load_balance_mode}.yml") self.dynamic_weight_fp = os.path.join(self.base_folder, "weights.json") with open(f"go-socks-lb.yml.template", "r") as template_f: buf = template_f.read() template = Template(buf) items = { "ip": self.ip, "instances": self.instances, "dynamic_weight_fp": self.dynamic_weight_fp, "load_balance_mode": load_balance_mode } lb_cfg = template.generate(**items) with open(lb_cfg_fp, "wb") as lb_cfg_f: lb_cfg_f.write(lb_cfg) return lb_cfg_fp def generate_dynamic_weight(self, weights): with open(self.dynamic_weight_fp, "w") as dynamic_weight_f: dynamic_weight_f.write(json.dumps({ "weights": weights })) def get_instance(self, idx): for instance in self.instances: if instance["idx"] == idx: return instance return None def start_op(self, idx): instance = self.get_instance(idx) if not instance: return instance["op"].start() instance["weight"] = 0 self.reset_lb() def stop_op(self, idx): instance = self.get_instance(idx) if not instance: return instance["op"].stop() instance["weight"] = 0 self.reset_lb() def remove_op(self, idx): instance = self.get_instance(idx) if not instance: return instance["op"].stop() instance["weight"] = 0 self.instances.remove(instance) if len(self.instances) == 0: self.new_idx = 0 self.reset_lb() def remove_all_op(self): for instance in self.instances: instance["op"].stop() instance["weight"] = 0 self.instances = [] self.new_idx = 0 self.reset_lb() def start_all(self): for instance in self.instances: instance["op"].start() instance["weight"] = 1 self.reset_lb() def stop_all(self): for instance in self.instances: instance["op"].stop() instance["weight"] = 0 self.reset_lb() def reset_lb(self): self.lb1_pm.stop() self.lb2_pm.stop() for task in self.run_task: task.cancel() self.run_task = [] # find running instances exist_running_instance = False for instance in self.instances: if instance["op"].status == RUNNING: exist_running_instance = True break if exist_running_instance: # two modes on different ports lb_cfg_fp = self.generate_lb_cfg("fallback") lb_cmd = " ".join(["go-socks-lb/go-socks-lb", "-config", lb_cfg_fp, "-bind", "0.0.0.0:7000"]) print("lb_cmd 0", lb_cmd) self.lb1_pm.start(cmd=lb_cmd) lb_cfg_fp = self.generate_lb_cfg("cached-shuffle") lb_cmd = " ".join(["go-socks-lb/go-socks-lb", "-config", lb_cfg_fp, "-bind", "0.0.0.0:7001"]) print("lb_cmd 1", lb_cmd) self.lb2_pm.start(cmd=lb_cmd) #print("pre calc weights:", self.calc_weights()) self.run_task.append(self.loop.create_task( self.keep_applying_weights())) def calc_weights(self): # n_up = 5 # n_down = 2 weights = [] for i in self.instances: if i["op"].status != RUNNING or (not i["op"].interface_up): i["weight"] = 0 weights.append(i["weight"]) continue n_up = i["op"].n_up n_down = i["op"].n_down ping_stat = i["op"].get_ping_stat(lines=max(n_up, n_down)) ping_stat = [i.split(",")[-1] != "-1" for i in ping_stat] #print("ping status:", ping_stat) def all_equal(n, status): for j in range(min(n, len(ping_stat))): if ping_stat[-(j+1)] != status: return False return True # all up if all_equal(n_up, True): i["weight"] = 1 # print("all_up") if all_equal(n_down, False): i["weight"] = 0 # print("all_down") weights.append(i["weight"]) return weights async def keep_applying_weights(self): old_weights = [] while True: await asyncio.sleep(5) if self.lb1_pm.pid==0 or self.lb2_pm.pid==0: # pid not acquired yet, wait a bit more await asyncio.sleep(3) #print("calculating weights") new_weights = self.calc_weights() #print("finished calc") if old_weights != new_weights: old_weights = new_weights print("weight changed, new weights:", new_weights) self.generate_dynamic_weight(new_weights) for pm in [self.lb1_pm, self.lb2_pm]: try: pm.signal(signal.SIGUSR1) except: print("error applying weights to", pm.pid) # else: #print("weight not changed, weights:", new_weights) def clear_cache(self): for pm in [self.lb1_pm, self.lb2_pm]: try: pm.signal(signal.SIGUSR2) except: print("error clear cache of", pm.pid)