two lb modes at the same time
This commit is contained in:
parent
e7f18216c8
commit
544aacf0fd
41
o_manager.py
41
o_manager.py
@ -15,7 +15,7 @@ class OManager:
|
|||||||
self.weights = []
|
self.weights = []
|
||||||
self.run_task = []
|
self.run_task = []
|
||||||
self.dynamic_weight_fp = None
|
self.dynamic_weight_fp = None
|
||||||
self.PID = None
|
self.pids = []
|
||||||
if loop:
|
if loop:
|
||||||
self.loop = loop
|
self.loop = loop
|
||||||
else:
|
else:
|
||||||
@ -39,17 +39,18 @@ class OManager:
|
|||||||
self.new_idx += 1
|
self.new_idx += 1
|
||||||
return op
|
return op
|
||||||
|
|
||||||
def generate_lb_cfg(self):
|
def generate_lb_cfg(self, load_balance_mode):
|
||||||
lb_cfg_fp = os.path.join(
|
lb_cfg_fp = os.path.join(
|
||||||
self.base_folder, "go-socks-lb.yml")
|
self.base_folder, "go-socks-lb.yml")
|
||||||
|
|
||||||
self.dynamic_weight_fp = os.path.join(self.base_folder, "weights.json")
|
self.dynamic_weight_fp = os.path.join(self.base_folder, "weights.json")
|
||||||
with open("go-socks-lb.yml.template", "r") as template_f:
|
with open(f"go-socks-lb-{load_balance_mode}.yml.template", "r") as template_f:
|
||||||
buf = template_f.read()
|
buf = template_f.read()
|
||||||
template = Template(buf)
|
template = Template(buf)
|
||||||
items = {
|
items = {
|
||||||
"instances": self.instances,
|
"instances": self.instances,
|
||||||
"dynamic_weight_fp": self.dynamic_weight_fp
|
"dynamic_weight_fp": self.dynamic_weight_fp,
|
||||||
|
"load_balance_mode": load_balance_mode
|
||||||
}
|
}
|
||||||
lb_cfg = template.generate(**items)
|
lb_cfg = template.generate(**items)
|
||||||
with open(lb_cfg_fp, "wb") as lb_cfg_f:
|
with open(lb_cfg_fp, "wb") as lb_cfg_f:
|
||||||
@ -91,7 +92,7 @@ class OManager:
|
|||||||
instance["op"].stop()
|
instance["op"].stop()
|
||||||
instance["weight"] = 0
|
instance["weight"] = 0
|
||||||
self.instances.remove(instance)
|
self.instances.remove(instance)
|
||||||
if len(self.instances)==0:
|
if len(self.instances) == 0:
|
||||||
self.new_idx = 0
|
self.new_idx = 0
|
||||||
self.reset_lb()
|
self.reset_lb()
|
||||||
|
|
||||||
@ -116,16 +117,15 @@ class OManager:
|
|||||||
self.reset_lb()
|
self.reset_lb()
|
||||||
|
|
||||||
def reset_lb(self):
|
def reset_lb(self):
|
||||||
|
for pid in self.pids:
|
||||||
try:
|
try:
|
||||||
if self.PID:
|
os.kill(pid, signal.SIGINT)
|
||||||
os.kill(self.PID, signal.SIGINT)
|
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
print("kill failed:", err)
|
print("kill failed:", err)
|
||||||
for task in self.run_task:
|
for task in self.run_task:
|
||||||
task.cancel()
|
task.cancel()
|
||||||
self.pids = []
|
self.pids = []
|
||||||
self.run_task = []
|
self.run_task = []
|
||||||
lb_cfg_fp = self.generate_lb_cfg()
|
|
||||||
|
|
||||||
# find running instances
|
# find running instances
|
||||||
exist_running_instance = False
|
exist_running_instance = False
|
||||||
@ -134,9 +134,18 @@ class OManager:
|
|||||||
exist_running_instance = True
|
exist_running_instance = True
|
||||||
break
|
break
|
||||||
if exist_running_instance:
|
if exist_running_instance:
|
||||||
|
# two modes on different ports
|
||||||
|
lb_cfg_fp = self.generate_lb_cfg("fallback")
|
||||||
lb_cmd = ["go-socks-lb/go-socks-lb", "-config",
|
lb_cmd = ["go-socks-lb/go-socks-lb", "-config",
|
||||||
lb_cfg_fp, "-bind", "0.0.0.0:7000"]
|
lb_cfg_fp, "-bind", "0.0.0.0:7000"]
|
||||||
#print("lb_cmd", lb_cmd)
|
print("lb_cmd 0", lb_cmd)
|
||||||
|
self.run_task.append(self.loop.create_task(
|
||||||
|
self.run_cmd(lb_cmd)))
|
||||||
|
|
||||||
|
lb_cfg_fp = self.generate_lb_cfg("cached-shuffle")
|
||||||
|
lb_cmd = ["go-socks-lb/go-socks-lb", "-config",
|
||||||
|
lb_cfg_fp, "-bind", "0.0.0.0:7001"]
|
||||||
|
print("lb_cmd 1", lb_cmd)
|
||||||
self.run_task.append(self.loop.create_task(
|
self.run_task.append(self.loop.create_task(
|
||||||
self.run_cmd(lb_cmd)))
|
self.run_cmd(lb_cmd)))
|
||||||
#print("pre calc weights:", self.calc_weights())
|
#print("pre calc weights:", self.calc_weights())
|
||||||
@ -177,12 +186,12 @@ class OManager:
|
|||||||
if old_weights != new_weights:
|
if old_weights != new_weights:
|
||||||
old_weights = new_weights
|
old_weights = new_weights
|
||||||
print("weight changed, new weights:", new_weights)
|
print("weight changed, new weights:", new_weights)
|
||||||
|
self.generate_dynamic_weight(new_weights)
|
||||||
|
for pid in self.pids:
|
||||||
try:
|
try:
|
||||||
self.generate_dynamic_weight(
|
os.kill(pid, signal.SIGUSR1)
|
||||||
new_weights)
|
|
||||||
os.kill(self.PID, signal.SIGUSR1)
|
|
||||||
except:
|
except:
|
||||||
print("error applying weights")
|
print("error applying weights to", pid)
|
||||||
# else:
|
# else:
|
||||||
#print("weight not changed, weights:", new_weights)
|
#print("weight not changed, weights:", new_weights)
|
||||||
|
|
||||||
@ -194,10 +203,12 @@ class OManager:
|
|||||||
stdout=asyncio.subprocess.PIPE,
|
stdout=asyncio.subprocess.PIPE,
|
||||||
stderr=asyncio.subprocess.PIPE)
|
stderr=asyncio.subprocess.PIPE)
|
||||||
self.proc = proc
|
self.proc = proc
|
||||||
self.PID = proc.pid
|
self.pids.append(proc.pid)
|
||||||
print(f"Load balancer started, pid: {self.PID}")
|
print(f"Load balancer started, pids: {self.pids}")
|
||||||
stdout, stderr = await proc.communicate()
|
stdout, stderr = await proc.communicate()
|
||||||
print(f'[{cmd!r} exited with {proc.returncode}]')
|
print(f'[{cmd!r} exited with {proc.returncode}]')
|
||||||
|
if proc.pid in self.pids:
|
||||||
|
self.pids.remove(proc.pid)
|
||||||
if stdout:
|
if stdout:
|
||||||
print(f'[stdout]\n{stdout.decode()}')
|
print(f'[stdout]\n{stdout.decode()}')
|
||||||
if stderr:
|
if stderr:
|
||||||
|
|||||||
4
test.py
4
test.py
@ -17,7 +17,7 @@ class MainHandler(tornado.web.RequestHandler):
|
|||||||
def get(self):
|
def get(self):
|
||||||
buf = "<html>\n<head><meta http-equiv=\"refresh\" content=\"5\"></head>\n"
|
buf = "<html>\n<head><meta http-equiv=\"refresh\" content=\"5\"></head>\n"
|
||||||
buf += "<body>\n"
|
buf += "<body>\n"
|
||||||
buf += f"ops len: {len(om.instances)} <br/>\n"
|
buf += f"ops len: {len(om.instances)}, lb pids: {om.pids} <br/>\n"
|
||||||
if len(om.instances):
|
if len(om.instances):
|
||||||
for i in om.instances:
|
for i in om.instances:
|
||||||
op = i["op"]
|
op = i["op"]
|
||||||
@ -25,7 +25,7 @@ class MainHandler(tornado.web.RequestHandler):
|
|||||||
weight = i["weight"]
|
weight = i["weight"]
|
||||||
buf += f"op #{idx}, name: {op.name} <br/>\n"
|
buf += f"op #{idx}, name: {op.name} <br/>\n"
|
||||||
buf += f"status: {op.status}, weight: {weight} <br/>\n"
|
buf += f"status: {op.status}, weight: {weight} <br/>\n"
|
||||||
buf += f"pid: {op.pids} <br/>\n"
|
buf += f"op pids: {op.pids}<br/>\n"
|
||||||
buf += "log: <br/>\n"
|
buf += "log: <br/>\n"
|
||||||
buf += "<pre>\n"
|
buf += "<pre>\n"
|
||||||
buf += "\n".join(op.get_log(lines=10))
|
buf += "\n".join(op.get_log(lines=10))
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user