From 5602a20611ed835b77a3a0ae2db6da0cc511e245 Mon Sep 17 00:00:00 2001 From: Max Zheng Date: Thu, 28 Sep 2023 12:43:06 -0700 Subject: [PATCH] Check for the first finished parallel proccess (#1546) * Check for the first finished parallel proccess This fixes https://github.com/Netflix/metaflow/issues/1316 to optimize the parallel map runs by checking for the first finished process instead of waiting for the first process before checking others. * Fix code format with black * Reduce wait to 0.1 seconds --- metaflow/multicore_utils.py | 14 ++++++++++++-- test/unit/test_multicore_utils.py | 12 ++++++++++++ 2 files changed, 24 insertions(+), 2 deletions(-) create mode 100644 test/unit/test_multicore_utils.py diff --git a/metaflow/multicore_utils.py b/metaflow/multicore_utils.py index 1ac81039c1..73ad909b3a 100644 --- a/metaflow/multicore_utils.py +++ b/metaflow/multicore_utils.py @@ -4,6 +4,7 @@ from itertools import islice from multiprocessing import cpu_count from tempfile import NamedTemporaryFile +import time try: # Python 2 @@ -68,8 +69,17 @@ def parallel_imap_unordered(func, iterable, max_parallel=None, dir=None): while pids: - pid, output_file = pids.pop() - if os.waitpid(pid, 0)[1]: + for idx, pid_info in enumerate(pids): + pid, output_file = pid_info + pid, exit_code = os.waitpid(pid, os.WNOHANG) + if pid: + pids.pop(idx) + break + else: + time.sleep(0.1) # Wait a bit before re-checking + continue + + if exit_code: raise MulticoreException("Child failed") with open(output_file, "rb") as f: diff --git a/test/unit/test_multicore_utils.py b/test/unit/test_multicore_utils.py new file mode 100644 index 0000000000..9b14a20b99 --- /dev/null +++ b/test/unit/test_multicore_utils.py @@ -0,0 +1,12 @@ +from metaflow.multicore_utils import parallel_map + + +def test_parallel_map(): + assert parallel_map(lambda s: s.upper(), ["a", "b", "c", "d", "e", "f"]) == [ + "A", + "B", + "C", + "D", + "E", + "F", + ]