add dynamic weight update fuction to the superviser

This commit is contained in:
mantaohuang 2020-04-10 00:29:08 -04:00
parent 916646b1e2
commit 25c4218ff6
3 changed files with 101 additions and 30 deletions

View File

@ -1,6 +1,7 @@
proxy:{% for idx in running_idx %} proxy:{% for i in instances %}
- url: "socks5://192.168.122.128:108{{idx}}" - url: "socks5://192.168.122.128:{{1080+i["idx"]}}"
weight: 5 weight: {{i["weight"]}}
{% end %} {% end %}
load-balance-mode: "cached-shuffle" load-balance-mode: "cached-shuffle"
cache-clean-interval: 0 cache-clean-interval: 0
dynamic-weight-file: "{{dynamic_weight_fp}}"

View File

@ -1,18 +1,20 @@
import os import os
from openvpn import Openvpn from openvpn import Openvpn, RUNNING
import asyncio import asyncio
from tornado.template import Template from tornado.template import Template
import signal import signal
import json
class OManager: class OManager:
def __init__(self, base_folder, base_port=8001, loop=None): def __init__(self, base_folder, base_port=8001, loop=None):
self.base_folder = base_folder self.base_folder = base_folder
self.base_port = base_port self.base_port = base_port
self.ops = [] self.instances = []
self.new_idx = 0 self.new_idx = 0
self.running_idx = [] self.weights = []
self.run_task = [] self.run_task = []
self.dynamic_weight_fp = None
self.PID = None self.PID = None
if loop: if loop:
self.loop = loop self.loop = loop
@ -29,51 +31,69 @@ class OManager:
op = Openvpn(cfg_fp, self.new_idx, folder_path, op = Openvpn(cfg_fp, self.new_idx, folder_path,
f"{self.base_port + self.new_idx}", "script.sh.template", "3proxy.cfg.template", name=name, f"{self.base_port + self.new_idx}", "script.sh.template", "3proxy.cfg.template", name=name,
additional_cfg=additional_cfg) additional_cfg=additional_cfg)
self.ops.append(op) self.instances.append({
"op": op,
"idx": self.new_idx,
"weight": 1
})
self.new_idx += 1 self.new_idx += 1
return op return op
def generate_lb_cfg(self): def generate_lb_cfg(self):
lb_cfg_fp = os.path.join( lb_cfg_fp = os.path.join(
self.base_folder, "go-socks-lb.yml") self.base_folder, "go-socks-lb.yml")
self.dynamic_weight_fp = os.path.join(self.base_folder, "weights.json")
with open("go-socks-lb.yml.template", "r") as template_f: with open("go-socks-lb.yml.template", "r") as template_f:
buf = template_f.read() buf = template_f.read()
template = Template(buf) template = Template(buf)
items = { items = {
"running_idx": self.running_idx "instances": self.instances,
"dynamic_weight_fp": self.dynamic_weight_fp
} }
lb_cfg = template.generate(**items) lb_cfg = template.generate(**items)
with open(lb_cfg_fp, "wb") as lb_cfg_f: with open(lb_cfg_fp, "wb") as lb_cfg_f:
lb_cfg_f.write(lb_cfg) lb_cfg_f.write(lb_cfg)
return lb_cfg_fp return lb_cfg_fp
def generate_dynamic_weight(self, weights):
with open(self.dynamic_weight_fp, "w") as dynamic_weight_f:
dynamic_weight_f.write(json.dumps({
"weights": weights
}))
def get_instance(self, idx):
for instance in self.instances:
if instance["idx"] == idx:
return instance
return None
def start_op(self, idx): def start_op(self, idx):
self.ops[idx].start() instance = self.get_instance(idx)
if idx not in self.running_idx: if not instance:
self.running_idx.append(idx) return
instance["op"].start()
instance["weight"] = 1
self.reset_lb() self.reset_lb()
def stop_op(self, idx): def stop_op(self, idx):
self.loop.create_task(self.ops[idx].stop()) instance = self.get_instance(idx)
if idx in self.running_idx: if not instance:
self.running_idx.remove(idx) return
instance["op"].stop()
instance["weight"] = 0
self.reset_lb() self.reset_lb()
def get_all_ops(self):
return self.ops
def start_all(self): def start_all(self):
for op in self.ops: for instance in self.instances:
op.start() instance["op"].start()
idx = op.idx instance["weight"] = 1
if idx not in self.running_idx:
self.running_idx.append(idx)
self.reset_lb() self.reset_lb()
def stop_all(self): def stop_all(self):
for op in self.ops: for instance in self.instances:
op.stop() instance["op"].stop()
self.running_idx = [] instance["weight"] = 0
self.reset_lb() self.reset_lb()
def reset_lb(self): def reset_lb(self):
@ -87,12 +107,59 @@ class OManager:
self.pids = [] self.pids = []
self.run_task = [] self.run_task = []
lb_cfg_fp = self.generate_lb_cfg() lb_cfg_fp = self.generate_lb_cfg()
if len(self.running_idx):
# find running instances
exist_running_instance = False
for instance in self.instances:
if instance["op"].status == RUNNING:
exist_running_instance = True
break
if exist_running_instance:
lb_cmd = ["go-socks-lb/go-socks-lb", "-config", lb_cmd = ["go-socks-lb/go-socks-lb", "-config",
lb_cfg_fp, "-bind", "0.0.0.0:7000"] lb_cfg_fp, "-bind", "0.0.0.0:7000"]
print("lb_cmd", lb_cmd) print("lb_cmd", lb_cmd)
self.run_task.append(self.loop.create_task( self.run_task.append(self.loop.create_task(
self.run_cmd(lb_cmd))) self.run_cmd(lb_cmd)))
self.run_task.append(self.loop.create_task(
self.keep_applying_weights()))
def calc_weights(self):
n_up = 5
n_down = 2
weights = []
for i in self.instances:
ping_stat = i["op"].get_ping_stat(
).splitlines()[-max(n_up, n_down):]
ping_stat = [i.split(",")[-1] != "-1" for i in ping_stat]
def all_equal(n, status):
for j in range(n):
if ping_stat[-j] != status:
return False
return True
# all up
if all_equal(n_up, True):
i["weight"] = 1
if all_equal(n_down, False):
i["weight"] = 0
weights.append(i["weight"])
return weights
async def keep_applying_weights(self):
old_weights = []
while True:
new_weights = self.calc_weights()
if old_weights != new_weights:
print("weight changed, new weights:", new_weights)
try:
self.generate_dynamic_weight(
new_weights)
os.kill(self.PID, signal.SIGUSR1)
except:
print("error applying weights")
else:
print("weight not changed, weights:", new_weights)
await asyncio.sleep(5)
async def run_cmd(self, cmd): async def run_cmd(self, cmd):
while True: while True:

11
test.py
View File

@ -17,11 +17,14 @@ class MainHandler(tornado.web.RequestHandler):
def get(self): def get(self):
buf = "<html>\n<head><meta http-equiv=\"refresh\" content=\"5\"></head>\n" buf = "<html>\n<head><meta http-equiv=\"refresh\" content=\"5\"></head>\n"
buf += "<body>\n" buf += "<body>\n"
buf += f"ops len: {len(om.ops)} <br/>\n" buf += f"ops len: {len(om.instances)} <br/>\n"
if len(om.ops): if len(om.instances):
for idx, op in enumerate(om.ops): for i in om.instances:
op = i["op"]
idx = i["idx"]
weight = i["weight"]
buf += f"op #{idx}, name: {op.name} <br/>\n" buf += f"op #{idx}, name: {op.name} <br/>\n"
buf += f"status: {op.status} <br/>\n" buf += f"status: {op.status}, weight: {weight} <br/>\n"
buf += f"pid: {op.pids} <br/>\n" buf += f"pid: {op.pids} <br/>\n"
buf += "log: <br/>\n" buf += "log: <br/>\n"
buf += "<pre>\n" buf += "<pre>\n"