ovpn-lb-socks5/openvpn.py
2020-08-03 12:55:26 -04:00

311 lines
11 KiB
Python

"""
openvpn supervisor
"""
import asyncio
import tempfile
import os
import time
# import openvpn_api
import signal
import psutil
import subprocess
from datetime import datetime
import humanize
from tornado.template import Template
import stat
import shutil
import tailer
import logging
from process_manager import ProcessManager
def generate_config(in_fp, cfg):
def change_item(config, item_key, item_content):
output_lines = []
changed = False
for i in config.splitlines():
if i.startswith(item_key+" ") or i == item_key:
output_lines.append(f"{item_key} {item_content}")
changed = True
else:
output_lines.append(i)
if not changed:
output_lines.append(f"{item_key} {item_content}")
return "\n".join(output_lines)
# _, path = tempfile.mkstemp()
with open(in_fp, "r") as in_f:
config = in_f.read()
for key, value in cfg.items():
config = change_item(config, key, value)
return config
IDLE = "idle"
RUNNING = "running"
class Openvpn:
"""
ovpn_config:{
cfg_fp: file path of ovpn configuration
name: name of this openvpn instance
additional_cfg: additional changes to be made to the existing ovpn file, usually as a way to feed in auth details
enabled: whether this instance should be started right away, default: False
restart_interval: time to wait before restarting openvpn after it exits, unit is second, default: 300
ping_timeout: ping timeout for quality detection, unit is ms, default: 200
n_up: number of consecutive successful pings to bring connection status up, default: 5
n_down: number of consecutive failed pings to bring connection status down, default: 2
}
env_config:{
monitor_interval: time interval between connection status monitoring in second, default: 5
folder_path: path of the session folder
script_template_fp: file path of script template
proxycfg_template_fp: filepath of 3proxy config
}
"""
openvpn_pm: ProcessManager
proxy_pm: ProcessLookupError
def __init__(self, idx, ovpn_config, env_config, loop=None):
self.idx = idx
self.interface = f"tun{idx}"
self.status = IDLE
self.proc = None
self.exit_future = None
self.run_task = []
self.pid_fp = None
if loop:
self.loop = loop
else:
self.loop = asyncio.get_event_loop()
self.interface_up = False
self.load_cfg(ovpn_config, env_config)
def load_cfg(self, ovpn_config, env_config):
self.ovpn_config = ovpn_config
self.env_config = env_config
self.cfg_fp = ovpn_config["cfg_fp"]
self.name = ovpn_config["name"]
self.additional_cfg = ovpn_config.get("additional_cfg", {})
self.ping_timeout = int(ovpn_config.get("ping_timeout", 200))
self.n_up = int(ovpn_config.get("n_up", 5))
self.n_down = int(ovpn_config.get("n_down", 2))
assert self.ping_timeout > 0, "invalid ping_timeout value"
assert self.n_up >= 0 and self.n_down >= 0, "invalid n_up or n_down value"
self.restart_interval = int(ovpn_config.get("restart_interval", 300))
assert self.restart_interval > 0, "invalid restart_interval value"
self.monitor_interval = env_config.get("monitor_interval", 5)
self.folder_path = env_config["folder_path"]
self.script_template_fp = env_config["script_template_fp"]
self.proxycfg_template_fp = env_config["proxycfg_template_fp"]
self.io_stat_fp = os.path.join(self.folder_path, "io_stat.txt")
self.ping_stat_fp = os.path.join(self.folder_path, "ping_stat.txt")
enabled = ovpn_config.get("enabled", False)
self.openvpn_pm = ProcessManager(
self.restart_interval, group="openvpn", loop=self.loop)
self.proxy_pm = ProcessManager(
5, group=f"vpn{self.idx}", loop=self.loop)
if enabled:
self.start()
def export_cfg(self):
ovpn_config = {
"cfg_fp": self.cfg_fp,
"name": self.name,
"additional_cfg": self.additional_cfg,
"enabled": self.status is RUNNING,
"ping_timeout": self.ping_timeout,
"n_up": self.n_up,
"n_down": self.n_down
}
return ovpn_config
def generate_script(self):
self.script_fp = os.path.join(self.folder_path, "script.sh")
with open(self.script_template_fp, "r") as template_f:
buf = template_f.read()
template = Template(buf)
script_log_fp = os.path.join(self.folder_path, "script_log.txt")
items = {
"script_log_fp": script_log_fp,
"route_table_name": f"vpn{self.idx}",
"route_table_id": f"{300+self.idx}",
"rule_pref": f"{100+self.idx}"
}
script = template.generate(**items)
with open(self.script_fp, "wb") as script_f:
script_f.write(script)
# mark executable
st = os.stat(self.script_fp)
os.chmod(self.script_fp, st.st_mode | stat.S_IEXEC)
return self.script_fp
def generate_proxycfg(self):
# TODO: refactor code: proess manager
proxycfg_fp = os.path.join(
self.folder_path, "3proxy.cfg")
with open(self.proxycfg_template_fp, "r") as template_f:
buf = template_f.read()
template = Template(buf)
proxy_log_fp = os.path.join(self.folder_path, "proxy_log.txt")
items = {
"proxy_log_fp": proxy_log_fp,
"port": f"{1080 + self.idx}"
}
proxycfg = template.generate(**items)
with open(proxycfg_fp, "wb") as proxycfg_f:
proxycfg_f.write(proxycfg)
return proxycfg_fp
def get_cfg(self):
self.log_fp = os.path.join(self.folder_path, "log.txt")
cfg = {
"dev": self.interface,
"dev-type": "tun", # TODO: add code to read dev-type
# "management": f"localhost {self.management_port}", # disable management since we are not using it now
"log-append": self.log_fp
}
cfg.update(self.additional_cfg)
return cfg
def generate_config_file(self):
cfg = self.get_cfg()
self.config_fp = os.path.join(self.folder_path, "cfg.txt")
config = generate_config(self.cfg_fp, cfg)
with open(self.config_fp, "w") as config_f:
config_f.write(config)
return self.config_fp
def start(self):
if self.status == IDLE:
self.status = RUNNING
config_fp = self.generate_config_file()
script_fp = self.generate_script()
proxycfg_fp = self.generate_proxycfg()
openvpn_cmd = " ".join(["openvpn", "--config", config_fp, "--route-noexec",
"--route-up", script_fp, "--script-security",
"2", "--mute-replay-warnings"]) # TODO: remove --mute-replay-warnings
proxy_cmd = " ".join(["3proxy", proxycfg_fp])
self.openvpn_pm.start(openvpn_cmd)
self.proxy_pm.start(proxy_cmd)
self.run_task.append(self.loop.create_task(self.monitor_task()))
def get_log(self, lines=10):
# regenerate log_fp
_ = self.get_cfg()
try:
if not lines:
with open(self.log_fp, "r") as log_f:
return log_f.readlines()
else:
return tailer.tail(open(self.log_fp, "r"), lines)
except:
return ""
def clear_folder(self):
try:
# removing all files in folder path, not removing folders
dirpath = self.folder_path
for filename in os.listdir(dirpath):
filepath = os.path.join(dirpath, filename)
os.remove(filepath)
except Exception as err:
logging.error(f"cannot remove log file, error:{err}")
def stop(self, clear_folder=True):
if self.status == RUNNING:
self.proxy_pm.stop()
self.openvpn_pm.stop()
self.status = IDLE
for task in self.run_task:
task.cancel()
self.run_task = []
self.interface_up = False
if clear_folder:
self.clear_folder()
@property
def pids(self):
return [self.openvpn_pm.pid, self.proxy_pm.pid]
def restart(self):
self.stop()
self.start()
def get_io_stat(self, lines=5):
try:
if not lines:
return open(self.io_stat_fp, "r").readlines()
else:
return tailer.tail(open(self.io_stat_fp, "r"), lines)
except:
return ""
def get_ping_stat(self, lines=5):
try:
if not lines:
return open(self.ping_stat_fp, "r").readlines()
else:
return tailer.tail(open(self.ping_stat_fp, "r"), lines)
except:
return ""
def get_latest_speed(self):
try:
buf = self.get_io_stat(2)
buf = [[float(i) for i in l.split(",")] for l in buf]
assert len(buf) >= 2, "not enough points, need >=2"
delta_t = buf[1][0] - buf[0][0]
down_speed = (buf[1][1] - buf[0][1])/delta_t
up_speed = (buf[1][2] - buf[0][2])/delta_t
down_speed = humanize.naturalsize(down_speed)
up_speed = humanize.naturalsize(up_speed)
return f"Down: {down_speed}/s, Up: {up_speed}/s"
except:
return "no data"
def monitor_action(self):
"""
monitor the I/O statistics and connectivity
TODO: seperate module for monitoring
"""
io_count = psutil.net_io_counters(pernic=True)
if self.interface not in io_count:
self.interface_up = False
logging.debug(f"interface {self.interface} is not ready.")
return
self.interface_up = True
io_count = io_count[self.interface]
open(self.io_stat_fp, "a").write(
f"{datetime.utcnow().timestamp()},{io_count.bytes_recv},{io_count.bytes_sent}\n")
# command = ["ping", "-w", "1", "-c", "1", "8.8.8.8"]
command = ["fping", "-C", "1", f"-t{self.ping_timeout}",
"-I", self.interface, "-q", "8.8.8.8"] # TODO: configurable ping host
p = subprocess.Popen(
command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
result = p.communicate()
if p.returncode == 0:
result = result[1].decode().strip().split(':')[-1]
else:
result = "-1"
open(self.ping_stat_fp, "a").write(
f"{datetime.utcnow().timestamp()},{result}\n")
async def monitor_task(self):
while self.status == RUNNING:
try:
self.monitor_action()
except Exception as e:
logging.error(f"monitoring error: {e}")
await asyncio.sleep(self.monitor_interval)