diff --git a/o_manager.py b/o_manager.py index 5f866dc..5d4bba0 100644 --- a/o_manager.py +++ b/o_manager.py @@ -15,7 +15,7 @@ class OManager: self.weights = [] self.run_task = [] self.dynamic_weight_fp = None - self.PID = None + self.pids = [] if loop: self.loop = loop else: @@ -39,17 +39,18 @@ class OManager: self.new_idx += 1 return op - def generate_lb_cfg(self): + def generate_lb_cfg(self, load_balance_mode): lb_cfg_fp = os.path.join( self.base_folder, "go-socks-lb.yml") self.dynamic_weight_fp = os.path.join(self.base_folder, "weights.json") - with open("go-socks-lb.yml.template", "r") as template_f: + with open(f"go-socks-lb-{load_balance_mode}.yml.template", "r") as template_f: buf = template_f.read() template = Template(buf) items = { "instances": self.instances, - "dynamic_weight_fp": self.dynamic_weight_fp + "dynamic_weight_fp": self.dynamic_weight_fp, + "load_balance_mode": load_balance_mode } lb_cfg = template.generate(**items) with open(lb_cfg_fp, "wb") as lb_cfg_f: @@ -91,7 +92,7 @@ class OManager: instance["op"].stop() instance["weight"] = 0 self.instances.remove(instance) - if len(self.instances)==0: + if len(self.instances) == 0: self.new_idx = 0 self.reset_lb() @@ -116,16 +117,15 @@ class OManager: self.reset_lb() def reset_lb(self): - try: - if self.PID: - os.kill(self.PID, signal.SIGINT) - except Exception as err: - print("kill failed:", err) + for pid in self.pids: + try: + os.kill(pid, signal.SIGINT) + except Exception as err: + print("kill failed:", err) for task in self.run_task: task.cancel() self.pids = [] self.run_task = [] - lb_cfg_fp = self.generate_lb_cfg() # find running instances exist_running_instance = False @@ -134,9 +134,18 @@ class OManager: exist_running_instance = True break if exist_running_instance: + # two modes on different ports + lb_cfg_fp = self.generate_lb_cfg("fallback") lb_cmd = ["go-socks-lb/go-socks-lb", "-config", lb_cfg_fp, "-bind", "0.0.0.0:7000"] - #print("lb_cmd", lb_cmd) + print("lb_cmd 0", lb_cmd) + self.run_task.append(self.loop.create_task( + self.run_cmd(lb_cmd))) + + lb_cfg_fp = self.generate_lb_cfg("cached-shuffle") + lb_cmd = ["go-socks-lb/go-socks-lb", "-config", + lb_cfg_fp, "-bind", "0.0.0.0:7001"] + print("lb_cmd 1", lb_cmd) self.run_task.append(self.loop.create_task( self.run_cmd(lb_cmd))) #print("pre calc weights:", self.calc_weights()) @@ -177,12 +186,12 @@ class OManager: if old_weights != new_weights: old_weights = new_weights print("weight changed, new weights:", new_weights) - try: - self.generate_dynamic_weight( - new_weights) - os.kill(self.PID, signal.SIGUSR1) - except: - print("error applying weights") + self.generate_dynamic_weight(new_weights) + for pid in self.pids: + try: + os.kill(pid, signal.SIGUSR1) + except: + print("error applying weights to", pid) # else: #print("weight not changed, weights:", new_weights) @@ -194,10 +203,12 @@ class OManager: stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) self.proc = proc - self.PID = proc.pid - print(f"Load balancer started, pid: {self.PID}") + self.pids.append(proc.pid) + print(f"Load balancer started, pids: {self.pids}") stdout, stderr = await proc.communicate() print(f'[{cmd!r} exited with {proc.returncode}]') + if proc.pid in self.pids: + self.pids.remove(proc.pid) if stdout: print(f'[stdout]\n{stdout.decode()}') if stderr: diff --git a/test.py b/test.py index 000dae6..5200d25 100644 --- a/test.py +++ b/test.py @@ -17,7 +17,7 @@ class MainHandler(tornado.web.RequestHandler): def get(self): buf = "\n
\n" buf += "\n" - buf += f"ops len: {len(om.instances)}\n"
buf += "\n".join(op.get_log(lines=10))