ovpn-lb-socks5/o_manager.py
2020-05-29 19:46:23 -04:00

247 lines
8.1 KiB
Python

import os
from openvpn import Openvpn, RUNNING
import asyncio
from tornado.template import Template
import signal
import json
import socket
import fcntl
import struct
def get_ip_address(ifname):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
return socket.inet_ntoa(fcntl.ioctl(
s.fileno(),
0x8915, # SIOCGIFADDR
struct.pack(b'256s', ifname[:15].encode())
)[20:24])
class OManager:
def __init__(self, base_folder, interface, base_port=8001, loop=None):
self.base_folder = base_folder
self.base_port = base_port
self.interface = interface
self.ip = get_ip_address(self.interface)
self.instances = []
self.new_idx = 0
self.weights = []
self.run_task = []
self.dynamic_weight_fp = None
self.pids = []
if loop:
self.loop = loop
else:
self.loop = asyncio.get_event_loop()
def new_op(self, cfg_fp, name=None, additional_cfg={}):
folder_path = os.path.join(self.base_folder, f"session{self.new_idx}")
if not os.path.isdir(folder_path):
os.makedirs(folder_path)
if not name:
name = f"openvpn-{self.new_idx}"
os.system(f"groupadd vpn{self.new_idx}")
op = Openvpn(cfg_fp, self.new_idx, folder_path,
f"{self.base_port + self.new_idx}", "script.sh.template", "3proxy.cfg.template", name=name,
additional_cfg=additional_cfg)
self.instances.append({
"op": op,
"idx": self.new_idx,
"weight": 1
})
self.new_idx += 1
return op
def generate_lb_cfg(self, load_balance_mode):
lb_cfg_fp = os.path.join(
self.base_folder, f"go-socks-lb-{load_balance_mode}.yml")
self.dynamic_weight_fp = os.path.join(self.base_folder, "weights.json")
with open(f"go-socks-lb.yml.template", "r") as template_f:
buf = template_f.read()
template = Template(buf)
items = {
"ip": self.ip,
"instances": self.instances,
"dynamic_weight_fp": self.dynamic_weight_fp,
"load_balance_mode": load_balance_mode
}
lb_cfg = template.generate(**items)
with open(lb_cfg_fp, "wb") as lb_cfg_f:
lb_cfg_f.write(lb_cfg)
return lb_cfg_fp
def generate_dynamic_weight(self, weights):
with open(self.dynamic_weight_fp, "w") as dynamic_weight_f:
dynamic_weight_f.write(json.dumps({
"weights": weights
}))
def get_instance(self, idx):
for instance in self.instances:
if instance["idx"] == idx:
return instance
return None
def start_op(self, idx):
instance = self.get_instance(idx)
if not instance:
return
instance["op"].start()
instance["weight"] = 1
self.reset_lb()
def stop_op(self, idx):
instance = self.get_instance(idx)
if not instance:
return
instance["op"].stop()
instance["weight"] = 0
self.reset_lb()
def remove_op(self, idx):
instance = self.get_instance(idx)
if not instance:
return
instance["op"].stop()
instance["weight"] = 0
self.instances.remove(instance)
if len(self.instances) == 0:
self.new_idx = 0
self.reset_lb()
def remove_all_op(self):
for instance in self.instances:
instance["op"].stop()
instance["weight"] = 0
self.instances = []
self.new_idx = 0
self.reset_lb()
def start_all(self):
for instance in self.instances:
instance["op"].start()
instance["weight"] = 1
self.reset_lb()
def stop_all(self):
for instance in self.instances:
instance["op"].stop()
instance["weight"] = 0
self.reset_lb()
def reset_lb(self):
for pid in self.pids:
try:
os.kill(pid, signal.SIGINT)
except Exception as err:
print("kill failed:", err)
for task in self.run_task:
task.cancel()
self.pids = []
self.run_task = []
# find running instances
exist_running_instance = False
for instance in self.instances:
if instance["op"].status == RUNNING:
exist_running_instance = True
break
if exist_running_instance:
# two modes on different ports
lb_cfg_fp = self.generate_lb_cfg("fallback")
lb_cmd = ["go-socks-lb/go-socks-lb", "-config",
lb_cfg_fp, "-bind", "0.0.0.0:7000"]
print("lb_cmd 0", lb_cmd)
self.run_task.append(self.loop.create_task(
self.run_cmd(lb_cmd)))
lb_cfg_fp = self.generate_lb_cfg("cached-shuffle")
lb_cmd = ["go-socks-lb/go-socks-lb", "-config",
lb_cfg_fp, "-bind", "0.0.0.0:7001"]
print("lb_cmd 1", lb_cmd)
self.run_task.append(self.loop.create_task(
self.run_cmd(lb_cmd)))
#print("pre calc weights:", self.calc_weights())
self.run_task.append(self.loop.create_task(
self.keep_applying_weights()))
def calc_weights(self):
n_up = 5
n_down = 2
weights = []
for i in self.instances:
ping_stat = i["op"].get_ping_stat(lines=max(n_up, n_down))
ping_stat = [i.split(",")[-1] != "-1" for i in ping_stat]
#print("ping status:", ping_stat)
def all_equal(n, status):
for j in range(min(n, len(ping_stat))):
if ping_stat[-(j+1)] != status:
return False
return True
# all up
if all_equal(n_up, True):
i["weight"] = 1
# print("all_up")
if all_equal(n_down, False):
i["weight"] = 0
# print("all_down")
weights.append(i["weight"])
return weights
async def keep_applying_weights(self):
old_weights = []
while True:
await asyncio.sleep(5)
#print("calculating weights")
new_weights = self.calc_weights()
#print("finished calc")
if old_weights != new_weights:
old_weights = new_weights
print("weight changed, new weights:", new_weights)
self.generate_dynamic_weight(new_weights)
for pid in self.pids:
try:
os.kill(pid, signal.SIGUSR1)
except:
print("error applying weights to", pid)
# else:
#print("weight not changed, weights:", new_weights)
def clear_cache(self):
for pid in self.pids:
try:
os.kill(pid, signal.SIGUSR2)
except:
print("error clear cache of", pid)
async def run_cmd(self, cmd):
while True:
print("Manager trying to start go")
proc = await asyncio.create_subprocess_exec(
cmd[0], *cmd[1:],
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE)
self.proc = proc
self.pids.append(proc.pid)
print(f"Load balancer started, pids: {self.pids}")
stdout, stderr = await proc.communicate()
print(f'[{cmd!r} exited with {proc.returncode}]')
if proc.pid in self.pids:
self.pids.remove(proc.pid)
if stdout:
print(f'[stdout]\n{stdout.decode()}')
if stderr:
print(f'[stderr]\n{stderr.decode()}')
await asyncio.sleep(5)
if __name__ == "__main__":
folder_fp = "/home/mantao/Desktop/t/"
cfg_fp = "/home/mantao/Desktop/t/TCP_Files/UK2-TCP.ovpn"
om = OManager(folder_fp, "eth0")
om.new_op(cfg_fp, "op1", {
"auth-user-pass": "/home/mantao/Desktop/t/fast.txt"})