From 8fe116473550a607b54fc32053063fdc6506ee0c Mon Sep 17 00:00:00 2001 From: Filip Cacky Date: Tue, 17 Sep 2024 20:44:24 +0200 Subject: [PATCH 1/3] Fix signal handling in SubprocessManager --- metaflow/runner/subprocess_manager.py | 127 +++++++++++++++++++++----- 1 file changed, 102 insertions(+), 25 deletions(-) diff --git a/metaflow/runner/subprocess_manager.py b/metaflow/runner/subprocess_manager.py index 12468ae450b..087a9f7b1e4 100644 --- a/metaflow/runner/subprocess_manager.py +++ b/metaflow/runner/subprocess_manager.py @@ -10,23 +10,58 @@ from typing import Callable, Dict, Iterator, List, Optional, Tuple -def kill_process_and_descendants(pid, termination_timeout): +def send_signals(pid, signal): # TODO: there's a race condition that new descendants might # spawn b/w the invocations of 'pkill' and 'kill'. # Needs to be fixed in future. try: - subprocess.check_call(["pkill", "-TERM", "-P", str(pid)]) - subprocess.check_call(["kill", "-TERM", str(pid)]) + subprocess.call(["pkill", signal, "-P", str(pid)]) + subprocess.check_call(["kill", signal, str(pid)]) except subprocess.CalledProcessError: pass + +def kill_process_and_descendants(pid, termination_timeout): + send_signals(pid, "-TERM") + time.sleep(termination_timeout) - try: - subprocess.check_call(["pkill", "-KILL", "-P", str(pid)]) - subprocess.check_call(["kill", "-KILL", str(pid)]) - except subprocess.CalledProcessError: - pass + send_signals(pid, "-KILL") + + +def kill_processes_and_descendants(pids, termination_timeout): + for pid in pids: + send_signals(pid, "-TERM") + + time.sleep(termination_timeout) + + for pid in pids: + send_signals(pid, "-KILL") + + +async def async_send_signals(pids, signal): + pkill_processes = [ + await asyncio.create_subprocess_exec("pkill", signal, "-P", str(pid)) + for pid in pids + ] + + for proc in pkill_processes: + await proc.wait() + + kill_processes = [ + await asyncio.create_subprocess_exec("kill", signal, str(pid)) for pid in pids + ] + + for proc in kill_processes: + await proc.wait() + + +async def async_kill_processes_and_descendants(pids, termination_timeout): + await async_send_signals(pids, "-TERM") + + await asyncio.sleep(termination_timeout) + + await async_send_signals(pids, "-KILL") class LogReadTimeoutError(Exception): @@ -42,6 +77,18 @@ class SubprocessManager(object): def __init__(self): self.commands: Dict[int, CommandManager] = {} + try: + + async def handle_sigint(): + await self._async_handle_sigint() + + asyncio.get_running_loop().add_signal_handler( + signal.SIGINT, lambda: asyncio.create_task(handle_sigint()) + ) + + except RuntimeError: + signal.signal(signal.SIGINT, self._handle_sigint) + async def __aenter__(self) -> "SubprocessManager": return self @@ -81,8 +128,14 @@ def run_command( """ command_obj = CommandManager(command, env, cwd) - pid = command_obj.run(show_output=show_output) + pid = command_obj.run(show_output=show_output, wait=False) + self.commands[pid] = command_obj + + command_obj.process.wait() + command_obj.stdout_thread.join() + command_obj.stderr_thread.join() + return pid async def async_run_command( @@ -138,6 +191,30 @@ def cleanup(self) -> None: for v in self.commands.values(): v.cleanup() + def kill(self, termination_timeout: float = 5): + """ + Kill all managed subprocesses and their descendants. + + Parameters + ---------- + termination_timeout : float, default 5 + The time to wait after sending a SIGTERM to a subprocess and its descendants + before sending a SIGKILL. + """ + pids = [v.process.pid for v in self.commands.values()] + + kill_processes_and_descendants( + pids, + termination_timeout, + ) + + def _handle_sigint(self, signum, frame): + self.kill() + + async def _async_handle_sigint(self): + pids = [v.process.pid for v in self.commands.values()] + await async_kill_processes_and_descendants(pids, 5) + class CommandManager(object): """A manager for an individual subprocess.""" @@ -169,11 +246,11 @@ def __init__( self.cwd = cwd if cwd is not None else os.getcwd() self.process = None + self.stdout_thread = None + self.stderr_thread = None self.run_called: bool = False self.log_files: Dict[str, str] = {} - signal.signal(signal.SIGINT, self._handle_sigint) - async def __aenter__(self) -> "CommandManager": return self @@ -221,12 +298,10 @@ async def wait( "within %s seconds." % (self.process.pid, command_string, timeout) ) - def run(self, show_output: bool = False): + def run(self, show_output: bool = False, wait: bool = True) -> int: """ Run the subprocess synchronously. This can only be called once. - This also waits on the process implicitly. - Parameters ---------- show_output : bool, default False @@ -234,6 +309,11 @@ def run(self, show_output: bool = False): They can be accessed later by reading the files present in: - self.log_files["stdout"] - self.log_files["stderr"] + wait : bool, default True + Wait for the process to finish before returning. + If false, the process will run in the background. You can then wait on + the process (using `wait`) or kill it (using `kill`). + Log forwarding threads `stdout_thread` and `stderr_thread` should be joined. """ if not self.run_called: @@ -265,22 +345,22 @@ def stream_to_stdout_and_file(pipe, log_file): self.run_called = True - stdout_thread = threading.Thread( + self.stdout_thread = threading.Thread( target=stream_to_stdout_and_file, args=(self.process.stdout, stdout_logfile), ) - stderr_thread = threading.Thread( + self.stderr_thread = threading.Thread( target=stream_to_stdout_and_file, args=(self.process.stderr, stderr_logfile), ) - stdout_thread.start() - stderr_thread.start() - - self.process.wait() + self.stdout_thread.start() + self.stderr_thread.start() - stdout_thread.join() - stderr_thread.join() + if wait: + self.process.wait() + self.stdout_thread.join() + self.stderr_thread.join() return self.process.pid except Exception as e: @@ -457,9 +537,6 @@ async def kill(self, termination_timeout: float = 5): else: print("No process to kill.") - def _handle_sigint(self, signum, frame): - asyncio.create_task(self.kill()) - async def main(): flow_file = "../try.py" From 477faec3e36201e760cd13fdf41aeef2367ec4a2 Mon Sep 17 00:00:00 2001 From: Filip Cacky Date: Wed, 18 Sep 2024 14:00:24 +0200 Subject: [PATCH 2/3] Improve error handling in send_signals --- metaflow/runner/subprocess_manager.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/metaflow/runner/subprocess_manager.py b/metaflow/runner/subprocess_manager.py index 087a9f7b1e4..368a2a93cfe 100644 --- a/metaflow/runner/subprocess_manager.py +++ b/metaflow/runner/subprocess_manager.py @@ -14,11 +14,15 @@ def send_signals(pid, signal): # TODO: there's a race condition that new descendants might # spawn b/w the invocations of 'pkill' and 'kill'. # Needs to be fixed in future. - try: - subprocess.call(["pkill", signal, "-P", str(pid)]) - subprocess.check_call(["kill", signal, str(pid)]) - except subprocess.CalledProcessError: - pass + retcode = subprocess.call(["pkill", signal, "-P", str(pid)]) + # 2: Invalid options + # 3: No processes matched + if retcode == 2 or retcode == 3: + print(f"'pkill {signal} -P {pid}' failed with return code: {retcode}.") + + retcode = subprocess.call(["kill", signal, str(pid)]) + if retcode != 0: + print(f"'kill {signal} {pid}' failed with return code: {retcode}.") def kill_process_and_descendants(pid, termination_timeout): From d6e35e4bedd7910744ab684869bf09d2dfd0d480 Mon Sep 17 00:00:00 2001 From: Filip Cacky Date: Wed, 18 Sep 2024 14:01:15 +0200 Subject: [PATCH 3/3] Cleanup SubprocessManager and CommandManager interfaces --- metaflow/runner/subprocess_manager.py | 63 ++++++++++++++++++++++----- 1 file changed, 53 insertions(+), 10 deletions(-) diff --git a/metaflow/runner/subprocess_manager.py b/metaflow/runner/subprocess_manager.py index 368a2a93cfe..5cd78730577 100644 --- a/metaflow/runner/subprocess_manager.py +++ b/metaflow/runner/subprocess_manager.py @@ -136,9 +136,7 @@ def run_command( self.commands[pid] = command_obj - command_obj.process.wait() - command_obj.stdout_thread.join() - command_obj.stderr_thread.join() + command_obj.sync_wait() return pid @@ -195,7 +193,7 @@ def cleanup(self) -> None: for v in self.commands.values(): v.cleanup() - def kill(self, termination_timeout: float = 5): + async def kill(self, termination_timeout: float = 5): """ Kill all managed subprocesses and their descendants. @@ -205,19 +203,31 @@ def kill(self, termination_timeout: float = 5): The time to wait after sending a SIGTERM to a subprocess and its descendants before sending a SIGKILL. """ - pids = [v.process.pid for v in self.commands.values()] + pids = [v.process.pid for v in self.commands.values() if v.process is not None] + await async_kill_processes_and_descendants(pids, 5) + + def sync_kill(self, termination_timeout: float = 5): + """ + Kill all managed subprocesses and their descendants synchronously. + + Parameters + ---------- + termination_timeout : float, default 5 + The time to wait after sending a SIGTERM to a subprocess and its descendants + before sending a SIGKILL. + """ + pids = [v.process.pid for v in self.commands.values() if v.process is not None] kill_processes_and_descendants( pids, termination_timeout, ) def _handle_sigint(self, signum, frame): - self.kill() + self.sync_kill() async def _async_handle_sigint(self): - pids = [v.process.pid for v in self.commands.values()] - await async_kill_processes_and_descendants(pids, 5) + await self.kill() class CommandManager(object): @@ -302,6 +312,20 @@ async def wait( "within %s seconds." % (self.process.pid, command_string, timeout) ) + def sync_wait(self): + """ + Wait for the subprocess to finish synchronously. + + You can only call `sync_wait` if `run` has already been called. + """ + + if not self.run_called: + raise RuntimeError("No command run yet to wait for...") + + self.process.wait() + self.stdout_thread.join() + self.stderr_thread.join() + def run(self, show_output: bool = False, wait: bool = True) -> int: """ Run the subprocess synchronously. This can only be called once. @@ -316,8 +340,7 @@ def run(self, show_output: bool = False, wait: bool = True) -> int: wait : bool, default True Wait for the process to finish before returning. If false, the process will run in the background. You can then wait on - the process (using `wait`) or kill it (using `kill`). - Log forwarding threads `stdout_thread` and `stderr_thread` should be joined. + the process (using `sync_wait`) or kill it (using `sync_kill`). """ if not self.run_called: @@ -541,6 +564,26 @@ async def kill(self, termination_timeout: float = 5): else: print("No process to kill.") + def sync_kill(self, termination_timeout: float = 5): + """ + Kill the subprocess and its descendants synchronously. + + Parameters + ---------- + termination_timeout : float, default 5 + The time to wait after sending a SIGTERM to the process and its descendants + before sending a SIGKILL. + """ + + if self.process is not None: + send_signals(self.process.pid, "-TERM") + + time.sleep(termination_timeout) + + send_signals(self.process.pid, "-KILL") + else: + print("No process to kill.") + async def main(): flow_file = "../try.py"