Skip to content

Commit

Permalink
Check for the first finished parallel proccess
Browse files Browse the repository at this point in the history
This fixes #1316 to optimize
the parallel map runs by checking for the first finished process instead
of waiting for the first process before checking others.
  • Loading branch information
maxzheng committed Sep 28, 2023
1 parent f8490c2 commit c100972
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 2 deletions.
14 changes: 12 additions & 2 deletions metaflow/multicore_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from itertools import islice
from multiprocessing import cpu_count
from tempfile import NamedTemporaryFile
import time

try:
# Python 2
Expand Down Expand Up @@ -68,8 +69,17 @@ def parallel_imap_unordered(func, iterable, max_parallel=None, dir=None):

while pids:

pid, output_file = pids.pop()
if os.waitpid(pid, 0)[1]:
for idx, pid_info in enumerate(pids):
pid, output_file = pid_info
pid, exit_code = os.waitpid(pid, os.WNOHANG)
if pid:
pids.pop(idx)
break
else:
time.sleep(1) # Wait a bit before re-checking
continue

if exit_code:
raise MulticoreException("Child failed")

with open(output_file, "rb") as f:
Expand Down
5 changes: 5 additions & 0 deletions test/unit/test_multicore_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from metaflow.multicore_utils import parallel_map


def test_parallel_map():
assert parallel_map(lambda s: s.upper(), ['a', 'b', 'c', 'd', 'e', 'f']) == ['A', 'B', 'C', 'D', 'E', 'F']

0 comments on commit c100972

Please sign in to comment.