diff --git a/go-socks-lb.yml.template b/go-socks-lb.yml.template index 3ee11ca..96e73eb 100644 --- a/go-socks-lb.yml.template +++ b/go-socks-lb.yml.template @@ -1,6 +1,7 @@ -proxy:{% for idx in running_idx %} - - url: "socks5://192.168.122.128:108{{idx}}" - weight: 5 +proxy:{% for i in instances %} + - url: "socks5://192.168.122.128:{{1080+i["idx"]}}" + weight: {{i["weight"]}} {% end %} load-balance-mode: "cached-shuffle" cache-clean-interval: 0 +dynamic-weight-file: "{{dynamic_weight_fp}}" diff --git a/o_manager.py b/o_manager.py index 8e90618..6d496f7 100644 --- a/o_manager.py +++ b/o_manager.py @@ -1,18 +1,20 @@ import os -from openvpn import Openvpn +from openvpn import Openvpn, RUNNING import asyncio from tornado.template import Template import signal +import json class OManager: def __init__(self, base_folder, base_port=8001, loop=None): self.base_folder = base_folder self.base_port = base_port - self.ops = [] + self.instances = [] self.new_idx = 0 - self.running_idx = [] + self.weights = [] self.run_task = [] + self.dynamic_weight_fp = None self.PID = None if loop: self.loop = loop @@ -29,51 +31,69 @@ class OManager: op = Openvpn(cfg_fp, self.new_idx, folder_path, f"{self.base_port + self.new_idx}", "script.sh.template", "3proxy.cfg.template", name=name, additional_cfg=additional_cfg) - self.ops.append(op) + self.instances.append({ + "op": op, + "idx": self.new_idx, + "weight": 1 + }) self.new_idx += 1 return op def generate_lb_cfg(self): lb_cfg_fp = os.path.join( self.base_folder, "go-socks-lb.yml") + + self.dynamic_weight_fp = os.path.join(self.base_folder, "weights.json") with open("go-socks-lb.yml.template", "r") as template_f: buf = template_f.read() template = Template(buf) items = { - "running_idx": self.running_idx + "instances": self.instances, + "dynamic_weight_fp": self.dynamic_weight_fp } lb_cfg = template.generate(**items) with open(lb_cfg_fp, "wb") as lb_cfg_f: lb_cfg_f.write(lb_cfg) return lb_cfg_fp + def generate_dynamic_weight(self, weights): + with open(self.dynamic_weight_fp, "w") as dynamic_weight_f: + dynamic_weight_f.write(json.dumps({ + "weights": weights + })) + + def get_instance(self, idx): + for instance in self.instances: + if instance["idx"] == idx: + return instance + return None + def start_op(self, idx): - self.ops[idx].start() - if idx not in self.running_idx: - self.running_idx.append(idx) + instance = self.get_instance(idx) + if not instance: + return + instance["op"].start() + instance["weight"] = 1 self.reset_lb() def stop_op(self, idx): - self.loop.create_task(self.ops[idx].stop()) - if idx in self.running_idx: - self.running_idx.remove(idx) + instance = self.get_instance(idx) + if not instance: + return + instance["op"].stop() + instance["weight"] = 0 self.reset_lb() - def get_all_ops(self): - return self.ops - def start_all(self): - for op in self.ops: - op.start() - idx = op.idx - if idx not in self.running_idx: - self.running_idx.append(idx) + for instance in self.instances: + instance["op"].start() + instance["weight"] = 1 self.reset_lb() def stop_all(self): - for op in self.ops: - op.stop() - self.running_idx = [] + for instance in self.instances: + instance["op"].stop() + instance["weight"] = 0 self.reset_lb() def reset_lb(self): @@ -87,12 +107,59 @@ class OManager: self.pids = [] self.run_task = [] lb_cfg_fp = self.generate_lb_cfg() - if len(self.running_idx): + + # find running instances + exist_running_instance = False + for instance in self.instances: + if instance["op"].status == RUNNING: + exist_running_instance = True + break + if exist_running_instance: lb_cmd = ["go-socks-lb/go-socks-lb", "-config", lb_cfg_fp, "-bind", "0.0.0.0:7000"] print("lb_cmd", lb_cmd) self.run_task.append(self.loop.create_task( self.run_cmd(lb_cmd))) + self.run_task.append(self.loop.create_task( + self.keep_applying_weights())) + + def calc_weights(self): + n_up = 5 + n_down = 2 + weights = [] + for i in self.instances: + ping_stat = i["op"].get_ping_stat( + ).splitlines()[-max(n_up, n_down):] + ping_stat = [i.split(",")[-1] != "-1" for i in ping_stat] + + def all_equal(n, status): + for j in range(n): + if ping_stat[-j] != status: + return False + return True + # all up + if all_equal(n_up, True): + i["weight"] = 1 + if all_equal(n_down, False): + i["weight"] = 0 + weights.append(i["weight"]) + return weights + + async def keep_applying_weights(self): + old_weights = [] + while True: + new_weights = self.calc_weights() + if old_weights != new_weights: + print("weight changed, new weights:", new_weights) + try: + self.generate_dynamic_weight( + new_weights) + os.kill(self.PID, signal.SIGUSR1) + except: + print("error applying weights") + else: + print("weight not changed, weights:", new_weights) + await asyncio.sleep(5) async def run_cmd(self, cmd): while True: diff --git a/test.py b/test.py index 0be4cff..edef45f 100644 --- a/test.py +++ b/test.py @@ -17,11 +17,14 @@ class MainHandler(tornado.web.RequestHandler): def get(self): buf = "\n
\n" buf += "\n" - buf += f"ops len: {len(om.ops)}\n"