import asyncio import os import signal import tempfile import time import logging from enum import Enum class ProcessManagerState(Enum): IDLE = 0 RUNNING = 1 class ProcessManager: """ Manages a process and auto restart if process exits. Running as group is supported. cmd: command of the process restart_interval: seconds to wait after process exits before restarting the process group: run command as group. group will not be changed if None is used. loop: asyncio event loop. default event loop will be used if None is provided. """ cmd: str task: asyncio.Task state: ProcessManagerState pid: int restart_interval: int def __init__(self, restart_interval, group=None, loop=None): self.cmd = None self.group = group self.restart_interval = restart_interval self.state = ProcessManagerState.IDLE self.pid = 0 if loop: self.loop = loop else: self.loop = asyncio.get_event_loop() def set_cmd(self, cmd): self.cmd = cmd async def run_cmd(self): logging.debug(f"Process Runner starting, cmd: {self.cmd}") while self.state == ProcessManagerState.RUNNING: if self.group: with tempfile.TemporaryDirectory() as tmpdirname: pid_fp = os.path.join(tmpdirname, "pid.txt") try: os.remove(pid_fp) except: pass shell = f"sg {self.group} -c \"echo \\$\\$ > {pid_fp}; exec {self.cmd}\"" proc = await asyncio.create_subprocess_shell( shell, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) logging.info(f"Process started, cmd: {self.cmd}") self.proc = proc for _ in range(5): await asyncio.sleep(0.2) got_pid = False try: PID = int(open(pid_fp, "r").read()) got_pid = True except: got_pid = False if got_pid: break if not got_pid: logging.error("cannot get pid") break else: proc = await asyncio.create_subprocess_shell( self.cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) PID = proc.pid logging.info(f"pid: {PID}") self.pid = PID stdout, stderr = await proc.communicate() logging.info(f'[{self.cmd!r} exited with {proc.returncode}]') if stdout: logging.info(f'[stdout]\n{stdout.decode()}') if stderr: logging.info(f'[stderr]\n{stderr.decode()}') self.pid = 0 await asyncio.sleep(self.restart_interval) def start(self, cmd=None): assert self.state == ProcessManagerState.IDLE, "process manager should be idle before start" if cmd: self.cmd = cmd assert self.cmd, "cmd has not been assigned" self.state = ProcessManagerState.RUNNING self.task = self.loop.create_task(self.run_cmd()) def stop(self): if self.state != ProcessManagerState.RUNNING: logging.debug( "trying to stop a process manager that is not running") return self.state = ProcessManagerState.IDLE self.task.cancel() if self.pid: try: os.kill(self.pid, signal.SIGTERM) # SIGINT was not enough to kill the go process... except Exception as err: logging.warning(f"kill failed: {err}") self.pid = 0 self.task = None def signal(self, signal): if self.state == ProcessManagerState.RUNNING: if self.pid!=0: # race condition possible when the pid has not been acquired os.kill(self.pid, signal) if __name__ == "__main__": logging.basicConfig(level=logging.DEBUG) pm = ProcessManager(5, group="mantao") pm.start("echo haha") asyncio.get_event_loop().run_until_complete(asyncio.sleep(5))