import os from openvpn import Openvpn, RUNNING import asyncio from tornado.template import Template import signal import json class OManager: def __init__(self, base_folder, base_port=8001, loop=None): self.base_folder = base_folder self.base_port = base_port self.instances = [] self.new_idx = 0 self.weights = [] self.run_task = [] self.dynamic_weight_fp = None self.pids = [] if loop: self.loop = loop else: self.loop = asyncio.get_event_loop() def new_op(self, cfg_fp, name=None, additional_cfg={}): folder_path = os.path.join(self.base_folder, f"session{self.new_idx}") if not os.path.isdir(folder_path): os.makedirs(folder_path) if not name: name = f"openvpn-{self.new_idx}" os.system(f"groupadd vpn{self.new_idx}") op = Openvpn(cfg_fp, self.new_idx, folder_path, f"{self.base_port + self.new_idx}", "script.sh.template", "3proxy.cfg.template", name=name, additional_cfg=additional_cfg) self.instances.append({ "op": op, "idx": self.new_idx, "weight": 1 }) self.new_idx += 1 return op def generate_lb_cfg(self, load_balance_mode): lb_cfg_fp = os.path.join( self.base_folder, f"go-socks-lb-{load_balance_mode}.yml") self.dynamic_weight_fp = os.path.join(self.base_folder, "weights.json") with open(f"go-socks-lb.yml.template", "r") as template_f: buf = template_f.read() template = Template(buf) items = { "instances": self.instances, "dynamic_weight_fp": self.dynamic_weight_fp, "load_balance_mode": load_balance_mode } lb_cfg = template.generate(**items) with open(lb_cfg_fp, "wb") as lb_cfg_f: lb_cfg_f.write(lb_cfg) return lb_cfg_fp def generate_dynamic_weight(self, weights): with open(self.dynamic_weight_fp, "w") as dynamic_weight_f: dynamic_weight_f.write(json.dumps({ "weights": weights })) def get_instance(self, idx): for instance in self.instances: if instance["idx"] == idx: return instance return None def start_op(self, idx): instance = self.get_instance(idx) if not instance: return instance["op"].start() instance["weight"] = 1 self.reset_lb() def stop_op(self, idx): instance = self.get_instance(idx) if not instance: return instance["op"].stop() instance["weight"] = 0 self.reset_lb() def remove_op(self, idx): instance = self.get_instance(idx) if not instance: return instance["op"].stop() instance["weight"] = 0 self.instances.remove(instance) if len(self.instances) == 0: self.new_idx = 0 self.reset_lb() def remove_all_op(self): for instance in self.instances: instance["op"].stop() instance["weight"] = 0 self.instances = [] self.new_idx = 0 self.reset_lb() def start_all(self): for instance in self.instances: instance["op"].start() instance["weight"] = 1 self.reset_lb() def stop_all(self): for instance in self.instances: instance["op"].stop() instance["weight"] = 0 self.reset_lb() def reset_lb(self): for pid in self.pids: try: os.kill(pid, signal.SIGINT) except Exception as err: print("kill failed:", err) for task in self.run_task: task.cancel() self.pids = [] self.run_task = [] # find running instances exist_running_instance = False for instance in self.instances: if instance["op"].status == RUNNING: exist_running_instance = True break if exist_running_instance: # two modes on different ports lb_cfg_fp = self.generate_lb_cfg("fallback") lb_cmd = ["go-socks-lb/go-socks-lb", "-config", lb_cfg_fp, "-bind", "0.0.0.0:7000"] print("lb_cmd 0", lb_cmd) self.run_task.append(self.loop.create_task( self.run_cmd(lb_cmd))) lb_cfg_fp = self.generate_lb_cfg("cached-shuffle") lb_cmd = ["go-socks-lb/go-socks-lb", "-config", lb_cfg_fp, "-bind", "0.0.0.0:7001"] print("lb_cmd 1", lb_cmd) self.run_task.append(self.loop.create_task( self.run_cmd(lb_cmd))) #print("pre calc weights:", self.calc_weights()) self.run_task.append(self.loop.create_task( self.keep_applying_weights())) def calc_weights(self): n_up = 5 n_down = 2 weights = [] for i in self.instances: ping_stat = i["op"].get_ping_stat(lines=max(n_up, n_down)) ping_stat = [i.split(",")[-1] != "-1" for i in ping_stat] #print("ping status:", ping_stat) def all_equal(n, status): for j in range(min(n, len(ping_stat))): if ping_stat[-(j+1)] != status: return False return True # all up if all_equal(n_up, True): i["weight"] = 1 # print("all_up") if all_equal(n_down, False): i["weight"] = 0 # print("all_down") weights.append(i["weight"]) return weights async def keep_applying_weights(self): old_weights = [] while True: await asyncio.sleep(5) #print("calculating weights") new_weights = self.calc_weights() #print("finished calc") if old_weights != new_weights: old_weights = new_weights print("weight changed, new weights:", new_weights) self.generate_dynamic_weight(new_weights) for pid in self.pids: try: os.kill(pid, signal.SIGUSR1) except: print("error applying weights to", pid) # else: #print("weight not changed, weights:", new_weights) async def run_cmd(self, cmd): while True: print("Manager trying to start go") proc = await asyncio.create_subprocess_exec( cmd[0], *cmd[1:], stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) self.proc = proc self.pids.append(proc.pid) print(f"Load balancer started, pids: {self.pids}") stdout, stderr = await proc.communicate() print(f'[{cmd!r} exited with {proc.returncode}]') if proc.pid in self.pids: self.pids.remove(proc.pid) if stdout: print(f'[stdout]\n{stdout.decode()}') if stderr: print(f'[stderr]\n{stderr.decode()}') await asyncio.sleep(5) if __name__ == "__main__": folder_fp = "/home/mantao/Desktop/t/" cfg_fp = "/home/mantao/Desktop/t/TCP_Files/UK2-TCP.ovpn" om = OManager(folder_fp) om.new_op(cfg_fp, "op1", { "auth-user-pass": "/home/mantao/Desktop/t/fast.txt"})