Skip to content

Commit

Permalink
Check for the first finished parallel proccess (#1546)
Browse files Browse the repository at this point in the history
* Check for the first finished parallel proccess

This fixes #1316 to optimize
the parallel map runs by checking for the first finished process instead
of waiting for the first process before checking others.

* Fix code format with black

* Reduce wait to 0.1 seconds
  • Loading branch information
maxzheng authored Sep 28, 2023
1 parent f8490c2 commit 5602a20
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 2 deletions.
14 changes: 12 additions & 2 deletions metaflow/multicore_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from itertools import islice
from multiprocessing import cpu_count
from tempfile import NamedTemporaryFile
import time

try:
# Python 2
Expand Down Expand Up @@ -68,8 +69,17 @@ def parallel_imap_unordered(func, iterable, max_parallel=None, dir=None):

while pids:

pid, output_file = pids.pop()
if os.waitpid(pid, 0)[1]:
for idx, pid_info in enumerate(pids):
pid, output_file = pid_info
pid, exit_code = os.waitpid(pid, os.WNOHANG)
if pid:
pids.pop(idx)
break
else:
time.sleep(0.1) # Wait a bit before re-checking
continue

if exit_code:
raise MulticoreException("Child failed")

with open(output_file, "rb") as f:
Expand Down
12 changes: 12 additions & 0 deletions test/unit/test_multicore_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from metaflow.multicore_utils import parallel_map


def test_parallel_map():
assert parallel_map(lambda s: s.upper(), ["a", "b", "c", "d", "e", "f"]) == [
"A",
"B",
"C",
"D",
"E",
"F",
]

0 comments on commit 5602a20

Please sign in to comment.